Looking further into jstack, it looks like in the default configuration (without sink groups), there is a thread per sink, while there is only one thread when using the sink group which would explain the drop in throughput when using sink groups. Am I missing something?

~Rahul.



On Thursday, April 16, 2015 12:47 PM, Rahul Ravindran <rahulrv@yahoo.com> wrote:


Hi,
Below is my flume config and I am attempting to get Load Balancing sink group to LB across multiple machines. I see only 2 threads created for the entire sink group when using load balancing sink group and see the below message in the logs(and I see no throughput on draining events from the channel). On the other hand, if I comment out the sink group definition from the flume config and thus use the DefaultSinkProcessor, I see a lot more threads and events draining a lot faster. I suspect this is a problem with my config, but I could not find anything obvious. Could anyone here help?

Flume log output:
flume.log:16 Apr 2015 17:18:07,549 INFO [main] (org.apache.flume.node.Application.startAllComponents:138) - Starting new configuration:{ sourceRunners:{netcat=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcat,state:IDLE} }, spool=EventDrivenSourceRunner: { source:org.apache.flume.source.SpoolDirectorySource{name:spool,state:IDLE} }} sinkRunners:{mainSinks=SinkRunner: { policy:org.apache.flume.sink.LoadBalancingSinkProcessor@15f66cff counterGroup:{ name:null counters:{} } }, replaySinks=SinkRunner: { policy:org.apache.flume.sink.LoadBalancingSinkProcessor@656de49c counterGroup:{ name:null counters:{} } }} channels:{mainChannel=org.apache.flume.channel.MemoryChannel{name: mainChannel}, replayChannel=org.apache.flume.channel.MemoryChannel{name: replayChannel}} }

Flume config:
agent1.channels.mainChannel.type = MEMORY
agent1.channels.mainChannel.capacity = 150000
agent1.channels.mainChannel.transactionCapacity = 10000

agent1.channels.replayChannel.type = MEMORY
agent1.channels.replayChannel.capacity = 50000
agent1.channels.replayChannel.transactionCapacity = 5000

# netcat source
agent1.sources.netcat.channels = mainChannel
agent1.sources.netcat.type= netcat
agent1.sources.netcat.bind = 127.0.0.1
agent1.sources.netcat.port = 44444
agent1.sources.netcat.ack-every-event = false
agent1.sources.netcat.max-line-length = 8192

# spool directory source
agent1.sources.spool.channels = replayChannel
agent1.sources.spool.type = spooldir
agent1.sources.spool.bufferMaxLineLength = 8192
agent1.sources.spool.bufferMaxLines = 1000
agent1.sources.spool.batchSize = 1000
agent1.sources.spool.spoolDir = /br/agent_aud/replay
agent1.sources.spool.inputCharset = ISO-8859-1
#Label the event as a replayed event
agent1.sources.spool.interceptors = staticInterceptor
agent1.sources.spool.interceptors.staticInterceptor.type = static
agent1.sources.spool.interceptors.staticInterceptor.key = t
agent1.sources.spool.interceptors.staticInterceptor.value = r


agent1.sinks.avroMainSink1.type = avro
agent1.sinks.avroMainSink1.channel = mainChannel
agent1.sinks.avroMainSink1.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroMainSink1.port = 4545
agent1.sinks.avroMainSink1.connect-timeout = 30000
agent1.sinks.avroMainSink1.request-timeout = 20000
agent1.sinks.avroMainSink1.batch-size = 200

agent1.sinks.avroReplaySink1.type = avro
agent1.sinks.avroReplaySink1.channel = replayChannel
agent1.sinks.avroReplaySink1.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroReplaySink1.port = 4545
agent1.sinks.avroReplaySink1.connect-timeout = 300000
agent1.sinks.avroReplaySink1.batch-size = 2000

agent1.sinks.avroMainSink2.type = avro
agent1.sinks.avroMainSink2.channel = mainChannel
agent1.sinks.avroMainSink2.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroMainSink2.port = 4546
agent1.sinks.avroMainSink2.connect-timeout = 30000
agent1.sinks.avroMainSink2.request-timeout = 20000
agent1.sinks.avroMainSink2.batch-size = 200

agent1.sinks.avroReplaySink2.type = avro
agent1.sinks.avroReplaySink2.channel = replayChannel
agent1.sinks.avroReplaySink2.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroReplaySink2.port = 4546
agent1.sinks.avroReplaySink2.connect-timeout = 300000
agent1.sinks.avroReplaySink2.batch-size = 2000

agent1.sinks.avroMainSink3.type = avro
agent1.sinks.avroMainSink3.channel = mainChannel
agent1.sinks.avroMainSink3.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroMainSink3.port = 4547
agent1.sinks.avroMainSink3.connect-timeout = 30000
agent1.sinks.avroMainSink3.request-timeout = 20000
agent1.sinks.avroMainSink3.batch-size = 200

agent1.sinks.avroReplaySink3.type = avro
agent1.sinks.avroReplaySink3.channel = replayChannel
agent1.sinks.avroReplaySink3.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroReplaySink3.port = 4547
agent1.sinks.avroReplaySink3.connect-timeout = 300000
agent1.sinks.avroReplaySink3.batch-size = 2000

agent1.sinks.avroMainSink4.type = avro
agent1.sinks.avroMainSink4.channel = mainChannel
agent1.sinks.avroMainSink4.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroMainSink4.port = 4548
agent1.sinks.avroMainSink4.connect-timeout = 30000
agent1.sinks.avroMainSink4.request-timeout = 20000
agent1.sinks.avroMainSink4.batch-size = 200

agent1.sinks.avroReplaySink4.type = avro
agent1.sinks.avroReplaySink4.channel = replayChannel
agent1.sinks.avroReplaySink4.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroReplaySink4.port = 4548
agent1.sinks.avroReplaySink4.connect-timeout = 300000
agent1.sinks.avroReplaySink4.batch-size = 2000


agent1.sinkgroups.mainSinks.sinks = avroMainSink1 avroMainSink2 avroMainSink3 avroMainSink4
agent1.sinkgroups.mainSinks.processor.type = load_balance
agent1.sinkgroups.mainSinks.processor.selector = random
agent1.sinkgroups.mainSinks.processor.maxTimeOut = 1000
agent1.sinkgroups.mainSinks.processor.backoff = true

agent1.sinkgroups.replaySinks.sinks = avroReplaySink1 avroReplaySink2 avroReplaySink3 avroReplaySink4
agent1.sinkgroups.replaySinks.processor.type = load_balance
agent1.sinkgroups.replaySinks.processor.selector = random
agent1.sinkgroups.replaySinks.processor.maxTimeOut = 1000
agent1.sinkgroups.replaySinks.processor.backoff = true

agent1.channels = mainChannel replayChannel
agent1.sinkgroups = mainSinks replaySinks
agent1.sources = netcat spool
agent1.sinks = avroMainSink1 avroReplaySink1 avroMainSink2 avroReplaySink2 avroMainSink3 avroReplaySink3 avroMainSink4 avroReplaySink4