1Aug/110
Flume rpcSource example
I searched the web to find a decent example of how to use rpcSource in flume but I failed to find one. Only found an avroSource usage here. But unfortunately avroSource is too slow for my taste so here is an example of ThriftFlumeEventServer Client.
package com.ozbuyucusu.flume;
import com.cloudera.flume.handlers.thrift.Priority;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent;
import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Client;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
/*
flume sink would be : rpcSource(10400) for my example,
you can change the host and port accordingly
*/
public class FlumeThriftTest {
public static void main(String args[]) {
ThriftFlumeEvent tfe = new ThriftFlumeEvent();
Map<String, java.nio.ByteBuffer> fields = new HashMap<String, ByteBuffer>();
fields.put("topic", ByteBuffer.wrap("test".getBytes()));
tfe.fields = fields;
tfe.priority = Priority.INFO;
tfe.timestamp = new Date().getTime();
tfe.host = "localhost";
tfe.body = ByteBuffer.wrap("test body".getBytes());
Client client = getClient();
try {
client.append(tfe);
} catch (TException ex) {
Logger.getLogger(FlumeThriftTest.class.getName()).log(Level.SEVERE, null, ex);
} finally {
try {
client.close();
} catch (TException ex) {
Logger.getLogger(FlumeThriftTest.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
public static Client getClient() {
TTransport transport = new TSocket("localhost", 10400);
if (!transport.isOpen()) {
try {
transport.open();
} catch (Exception e) {
e.printStackTrace();
}
}
TProtocol protocol = new TBinaryProtocol(transport);
return new Client(protocol);
}
}
As far as I see, Thrift client implementation is 10 times faster than avro client implementation. Hope they would fix avro implementation soon.
P.S : Client is not threadsafe so you would better make a GenericObjectPool implementation for production uses.