Ozgur Orhan's blog Small hacks and hints

1Aug/111

Flume rpcSource example

I searched the web to find a decent example of how to use rpcSource in flume but I failed to find one. Only found an avroSource usage here. But unfortunately avroSource is too slow for my taste so here is an example of ThriftFlumeEventServer Client.

package com.ozbuyucusu.flume;

import com.cloudera.flume.handlers.thrift.Priority;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Client;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

/*
flume sink would be : rpcSource(10400) for my example,
you can change the host and port accordingly
 */
public class FlumeThriftTest {

    public static void main(String args[]) {
        ThriftFlumeEvent tfe = new ThriftFlumeEvent();
        Map<String, java.nio.ByteBuffer> fields = new HashMap<String, ByteBuffer>();
        fields.put("topic", ByteBuffer.wrap("test".getBytes()));

        tfe.fields = fields;
        tfe.priority = Priority.INFO;
        tfe.timestamp = new Date().getTime();
        tfe.host = "localhost";

        tfe.body = ByteBuffer.wrap("test body".getBytes());
        Client client = getClient();
        try {
            client.append(tfe);
        } catch (TException ex) {
            Logger.getLogger(FlumeThriftTest.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            try {
                client.close();
            } catch (TException ex) {
                Logger.getLogger(FlumeThriftTest.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

    }

    public static Client getClient() {
        TTransport transport = new TSocket("localhost", 10400);
        if (!transport.isOpen()) {
            try {
                transport.open();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        TProtocol protocol = new TBinaryProtocol(transport);
        return new Client(protocol);
    }
}

 

As far as I see, Thrift client implementation is 10 times faster than avro client implementation. Hope they would fix avro implementation soon.

P.S : Client is not threadsafe so you would better make a GenericObjectPool implementation for production uses.

Filed under: Coding 1 Comment
21Jul/113

Using Hbase TableMapper inside Oozie Workflow

I want to share a little hack I used to be able to consume TableMapper with or without filters in Oozie Workflow. The first think to understand is how TableMapReduceUtil.initTableMapperJob works.

TableMapReduceUtil.initTableMapperJob(tableName, scan, MyTableMapper.class, Writable.class, Writable.class, job);

tableName goes to : hbase.mapreduce.inputtable property
scan goes to : hbase.mapreduce.scan property (its converted to string)
TableMapper class goes to : mapreduce.map.class property (if you are not using new-api use mapred.mapper.class)
Mapper Key Output Class : mapred.mapoutput.key.class (it works for both new and old api)
Mapper Value Output Class : mapred.mapoutput.value.class

So basicly we are going to set these values in oozie workflow.xml and we would be able to run our TableMapper via oozie. The only tricky here is to pass scan to oozie in which I used a java job to get the string value.

package com.ozbuyucusu.hbase.helper;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Properties;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;

public class ScanStringGenerator {

    public static void main(String[] args) throws FileNotFoundException, IOException {
        Scan scan = new Scan();
        scan.setCacheBlocks(false);
        scan.setCaching(500);

        File file = new File(System.getProperty("oozie.action.output.properties"));
        Properties props = new Properties();
        String scanString = convertScanToString(scan);
        props.setProperty("scan", scanString);
        OutputStream os = new FileOutputStream(file);

        props.store(os, "");
        os.close();
        System.out.println("Empty Scanner Generated : " + scanString);
    }

    private static String convertScanToString(Scan scan) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(out);
        scan.write(dos);
        return Base64.encodeBytes(out.toByteArray());
    }
}

You can modify the Scan to add filters and giving a start and an end (which I did in most cases). The scan parameters can be passed to this helper class via arguments easily. And finally we are gonna put this helper class and in workflow.

<workflow-app xmlns='uri:oozie:workflow:0.2' name='sample-job-wf'>
	<start to='get-scanner' />
	<action name='get-scanner'>
		<java>
			<job-tracker>${jobTracker}</job-tracker>
			<name-node>${nameNode}</name-node>
			<configuration>
				<property>
					<name>mapred.job.queue.name</name>
					<value>${queueName}</value>
				</property>
			</configuration>
			<main-class>com.ozbuyucusu.hbase.helper.ScanStringGenerator</main-class>
			<capture-output />
		</java>
		<ok to="sample-job" />
		<error to="fail" />
	</action>
	<action name='sample-job'>
		<map-reduce>
			<job-tracker>${jobTracker}</job-tracker>
			<name-node>${nameNode}</name-node>
			<configuration>
				<!-- This is required for new api usage -->
				<property>
					<name>mapred.mapper.new-api</name>
					<value>true</value>
				</property>
				<property>
					<name>mapred.reducer.new-api</name>
					<value>true</value>
				</property>
				<!-- HBASE CONFIGURATIONS -->
				<property>
					<name>hbase.mapreduce.inputtable</name>
					<value>${MAPPER_INPUT_TABLE}</value>
				</property>
				<property>
					<name>hbase.mapreduce.scan</name>
					<value>${wf:actionData('get-scanner')['scan']}</value>
				</property>
				<property>
					<name>hbase.zookeeper.property.clientPort</name>
					<value>${hbaseZookeeperClientPort}</value>
				</property>
				<property>
					<name>hbase.zookeeper.quorum</name>
					<value>${hbaseZookeeperQuorum}</value>
				</property>
				<!-- MAPPER CONFIGURATIONS -->
				<property>
					<name>mapreduce.inputformat.class</name>
					<value>org.apache.hadoop.hbase.mapreduce.TableInputFormat</value>
				</property>
				<property>
					<name>mapred.mapoutput.key.class</name>
					<value>org.apache.hadoop.io.Text</value>
				</property>
				<property>
					<name>mapred.mapoutput.value.class</name>
					<value>org.apache.hadoop.io.Text</value>
				</property>
				<property>
					<name>mapreduce.map.class</name>
					<value>com.ozbuyucusu.hbase.mapper.MyTableMapper</value>
				</property>
				<!-- REDUCER CONFIGURATIONS -->
				<property>
					<name>mapreduce.reduce.class</name>
					<value>com.ozbuyucusu.hbase.reducer.MyReducer</value>
				</property>
				<property>
					<name>mapreduce.outputformat.class</name>
					<value>org.apache.hadoop.mapreduce.lib.output.NullOutputFormat</value>
				</property>
				<property>
					<name>mapred.map.tasks</name>
					<value>${mapperCount}</value>
				</property>
				<property>
					<name>mapred.reduce.tasks</name>
					<value>${reducerCount}</value>
				</property>
				<property>
					<name>mapred.job.queue.name</name>
					<value>${queueName}</value>
				</property>
			</configuration>
		</map-reduce>
		<ok to="end" />
		<error to="fail" />
	</action>
	<kill name="fail">
		<message>Map/Reduce failed, error
			message[${wf:errorMessage(wf:lastErrorNode())}]</message>
	</kill>
	<end name='end' />
</workflow-app>

I guess there would be better ways to use hbase tablemapper inside oozie workflow (which I couldn't find) but this small hack (it looks ugly indeed) works like a charm for me.

Filed under: Coding 3 Comments