Ozgur Orhan's blog Small hacks and hints

1Aug/110

Flume rpcSource example

I searched the web to find a decent example of how to use rpcSource in flume but I failed to find one. Only found an avroSource usage here. But unfortunately avroSource is too slow for my taste so here is an example of ThriftFlumeEventServer Client.

package com.ozbuyucusu.flume;

import com.cloudera.flume.handlers.thrift.Priority;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Client;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

/*
flume sink would be : rpcSource(10400) for my example,
you can change the host and port accordingly
 */
public class FlumeThriftTest {

    public static void main(String args[]) {
        ThriftFlumeEvent tfe = new ThriftFlumeEvent();
        Map<String, java.nio.ByteBuffer> fields = new HashMap<String, ByteBuffer>();
        fields.put("topic", ByteBuffer.wrap("test".getBytes()));

        tfe.fields = fields;
        tfe.priority = Priority.INFO;
        tfe.timestamp = new Date().getTime();
        tfe.host = "localhost";

        tfe.body = ByteBuffer.wrap("test body".getBytes());
        Client client = getClient();
        try {
            client.append(tfe);
        } catch (TException ex) {
            Logger.getLogger(FlumeThriftTest.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            try {
                client.close();
            } catch (TException ex) {
                Logger.getLogger(FlumeThriftTest.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

    }

    public static Client getClient() {
        TTransport transport = new TSocket("localhost", 10400);
        if (!transport.isOpen()) {
            try {
                transport.open();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        TProtocol protocol = new TBinaryProtocol(transport);
        return new Client(protocol);
    }
}

 

As far as I see, Thrift client implementation is 10 times faster than avro client implementation. Hope they would fix avro implementation soon.

P.S : Client is not threadsafe so you would better make a GenericObjectPool implementation for production uses.

Filed under: Coding Leave a comment
Comments (0) Trackbacks (0)

No comments yet.


Leave a comment

(required)

No trackbacks yet.