Thanks all,

Added a dedicated channel for hdfs and hbase and everything all events are making it into their sinks now.

What is the best tuning strategy for getting events from an exec source -> avro sink -> avro source -> hbase sink with the least amount of latency? Will batch size and transaction size have any effect on this latency?

Thanks again


On Mon, Apr 22, 2013 at 10:58 AM, Israel Ekpo <israel@aicer.org> wrote:
David,

In addition to what has already been said, if you take a look at your flume log files, you should be able to see exception messages that explain why this is happening.




On 22 April 2013 11:11, David Quigley <dquigley89@gmail.com> wrote:
Hi,

I am using flume to write events from webserver to both HDFS and HBase. All events are being written to HDFS but only about half are making it into HBase. Is there anything in my configurations which would be causing the issue? I have both HDFS and HBase sink reading from the same File Channel. Is it better to have one channel per sink?

Thanks,
Dave


# flume config on web server
agent.sources = sourceLog
agent.sources.sourceLog.type = exec
agent.sources.sourceLog.command = tail -F /var/log/clickServer/clicks_out
agent.sources.sourceLog.batchSize = 100
agent.sources.sourceLog.channels = fileChannel

agent.sources.sourceLog.interceptors = itime ihost idatatype idataparent
agent.sources.sourceLog.interceptors.itime.type = timestamp
agent.sources.sourceLog.interceptors.ihost.type = host
agent.sources.sourceLog.interceptors.ihost.useIP = false
agent.sources.sourceLog.interceptors.ihost.hostHeader = host
agent.sources.sourceLog.interceptors.idatatype.type = static
agent.sources.sourceLog.interceptors.idatatype.key = data_type
agent.sources.sourceLog.interceptors.idatatype.value = clicks
agent.sources.sourceLog.interceptors.idataparent.type = static
agent.sources.sourceLog.interceptors.idataparent.key = data_parent
agent.sources.sourceLog.interceptors.idataparent.value = *

agent.channels = fileChannel
agent.channels.fileChannel.type = file
agent.channels.fileChannel.transactionCapacity = 100
agent.channels.fileChannel.checkpointDir = /opt/flume/file-channel/checkpoint
agent.channels.fileChannel.dataDirs = /opt/flume/file-channel/data

agent.sinks = AvroSink_main AvroSink_backup_1 AvroSink_backup_2 AvroSink_backup_3
agent.sinks.AvroSink_main.type = avro
agent.sinks.AvroSink_main.channel = fileChannel
agent.sinks.AvroSink_main.hostname = *
agent.sinks.AvroSink_main.port = 35873
agent.sinks.AvroSink_main.batchSize = 100
agent.sinks.AvroSink_backup_1.type = avro
agent.sinks.AvroSink_backup_1.channel = fileChannel
agent.sinks.AvroSink_backup_1.hostname = *
agent.sinks.AvroSink_backup_1.port = 35873
agent.sinks.AvroSink_backup_1.batchSize = 100
agent.sinks.AvroSink_backup_2.type = avro
agent.sinks.AvroSink_backup_2.channel = fileChannel
agent.sinks.AvroSink_backup_2.hostname = *
agent.sinks.AvroSink_backup_2.port = 35873
agent.sinks.AvroSink_backup_2.batchSize = 100
agent.sinks.AvroSink_backup_3.type = avro
agent.sinks.AvroSink_backup_3.channel = fileChannel
agent.sinks.AvroSink_backup_3.hostname = *
agent.sinks.AvroSink_backup_3.port = 35873
agent.sinks.AvroSink_backup_3.batchSize = 100
agent.sinkgroups = failover
agent.sinkgroups.failover.sinks = AvroSink_main AvroSink_backup_1 AvroSink_backup_2 AvroSink_backup_3
agent.sinkgroups.failover.processor.type = failover
agent.sinkgroups.failover.processor.priority.AvroSink_main = 10
agent.sinkgroups.failover.processor.priority.AvroSink_backup_1 = 5
agent.sinkgroups.failover.processor.priority.AvroSink_backup_2 = 3
agent.sinkgroups.failover.processor.priority.AvroSink_backup_3 = 1
agent.sinkgroups.failover.processor.maxpenalty = 10000



# flume config on hadoop cluster

collector.sources=AvroIn

collector.sources.AvroIn.type=avro

collector.sources.AvroIn.bind=0.0.0.0

collector.sources.AvroIn.port=35873

collector.sources.AvroIn.channels=fileChannel


collector.channels=fileChannel

collector.channels.fileChannel.type=FILE

collector.channels.fileChannel.capacity=1000

collector.channels.fileChannel.checkpointDir=~/.flume/file-channel/checkpoint_%{data_type}

collector.channels.fileChannel.dataDirs=~/.flume/file-channel/data_%{data_type}

collector.sinks=hbaseSink hdfsSink

collector.sinks.hbaseSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink

collector.sinks.hbaseSink.channel=fileChannel

collector.sinks.hbaseSink.table=clicks

collector.sinks.hbaseSink.columnFamily=data

collector.sinks.hbaseSink.batchSize=100

collector.sinks.hbaseSink.serializer=com.*.serializer.HBaseClickSerializer

collector.sinks.hbaseSink.serializer.incrementColumn=icol


collector.sinks.hdfsSink.type=hdfs

collector.sinks.hdfsSink.channel=fileChannel

collector.sinks.hdfsSink.hdfs.path=/data/%{data_parent}/%{data_type}/month=%Y-%m/day=%d

collector.sinks.hdfsSink.hdfs.filePrefix=%{data_parent}_%{data_type}_%Y-%m-%d_%{host}

collector.sinks.hdfsSink.hdfs.timeZone=America/Los_Angeles

collector.sinks.hdfsSink.hdfs.fileType=DataStream

collector.sinks.hdfsSink.hdfs.writeFormat=Text

collector.sinks.hdfsSink.hdfs.rollSize=67100000

collector.sinks.hdfsSink.hdfs.rollCount=0

collector.sinks.hdfsSink.hdfs.rollInterval=3600