Sink Groups are designed to operate such that only one sink is chosen at any time out of the configured sinks in the group, to process data out of the channel. So this is the expected behavior. As you had noted, removing the sink group  will result in each sink processing the data out of the channel simultaneously.

Thanks,
Rufus

On Thu, Apr 16, 2015 at 3:30 PM, Rahul Ravindran <rahulrv@yahoo.com> wrote:
Looking further into jstack, it looks like in the default configuration (without sink groups), there is a thread per sink, while there is only one thread when using the sink group which would explain the drop in throughput when using sink groups. Am I missing something?

~Rahul.



On Thursday, April 16, 2015 12:47 PM, Rahul Ravindran <rahulrv@yahoo.com> wrote:


Hi,
Below is my flume config and I am attempting to get Load Balancing sink group to LB across multiple machines. I see only 2 threads created for the entire sink group when using load balancing sink group and see the below message in the logs(and I see no throughput on draining events from the channel). On the other hand, if I comment out the sink group definition from the flume config and thus use the DefaultSinkProcessor, I see a lot more threads and events draining a lot faster. I suspect this is a problem with my config, but I could not find anything obvious. Could anyone here help?

Flume log output:
flume.log:16 Apr 2015 17:18:07,549 INFO [main] (org.apache.flume.node.Application.startAllComponents:138) - Starting new configuration:{ sourceRunners:{netcat=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:netcat,state:IDLE} }, spool=EventDrivenSourceRunner: { source:org.apache.flume.source.SpoolDirectorySource{name:spool,state:IDLE} }} sinkRunners:{mainSinks=SinkRunner: { policy:org.apache.flume.sink.LoadBalancingSinkProcessor@15f66cff counterGroup:{ name:null counters:{} } }, replaySinks=SinkRunner: { policy:org.apache.flume.sink.LoadBalancingSinkProcessor@656de49c counterGroup:{ name:null counters:{} } }} channels:{mainChannel=org.apache.flume.channel.MemoryChannel{name: mainChannel}, replayChannel=org.apache.flume.channel.MemoryChannel{name: replayChannel}} }

Flume config:
agent1.channels.mainChannel.type = MEMORY
agent1.channels.mainChannel.capacity = 150000
agent1.channels.mainChannel.transactionCapacity = 10000

agent1.channels.replayChannel.type = MEMORY
agent1.channels.replayChannel.capacity = 50000
agent1.channels.replayChannel.transactionCapacity = 5000

# netcat source
agent1.sources.netcat.channels = mainChannel
agent1.sources.netcat.type= netcat
agent1.sources.netcat.bind = 127.0.0.1
agent1.sources.netcat.port = 44444
agent1.sources.netcat.ack-every-event = false
agent1.sources.netcat.max-line-length = 8192

# spool directory source
agent1.sources.spool.channels = replayChannel
agent1.sources.spool.type = spooldir
agent1.sources.spool.bufferMaxLineLength = 8192
agent1.sources.spool.bufferMaxLines = 1000
agent1.sources.spool.batchSize = 1000
agent1.sources.spool.spoolDir = /br/agent_aud/replay
agent1.sources.spool.inputCharset = ISO-8859-1
#Label the event as a replayed event
agent1.sources.spool.interceptors = staticInterceptor
agent1.sources.spool.interceptors.staticInterceptor.type = static
agent1.sources.spool.interceptors.staticInterceptor.key = t
agent1.sources.spool.interceptors.staticInterceptor.value = r


agent1.sinks.avroMainSink1.type = avro
agent1.sinks.avroMainSink1.channel = mainChannel
agent1.sinks.avroMainSink1.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroMainSink1.port = 4545
agent1.sinks.avroMainSink1.connect-timeout = 30000
agent1.sinks.avroMainSink1.request-timeout = 20000
agent1.sinks.avroMainSink1.batch-size = 200

agent1.sinks.avroReplaySink1.type = avro
agent1.sinks.avroReplaySink1.channel = replayChannel
agent1.sinks.avroReplaySink1.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroReplaySink1.port = 4545
agent1.sinks.avroReplaySink1.connect-timeout = 300000
agent1.sinks.avroReplaySink1.batch-size = 2000

agent1.sinks.avroMainSink2.type = avro
agent1.sinks.avroMainSink2.channel = mainChannel
agent1.sinks.avroMainSink2.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroMainSink2.port = 4546
agent1.sinks.avroMainSink2.connect-timeout = 30000
agent1.sinks.avroMainSink2.request-timeout = 20000
agent1.sinks.avroMainSink2.batch-size = 200

agent1.sinks.avroReplaySink2.type = avro
agent1.sinks.avroReplaySink2.channel = replayChannel
agent1.sinks.avroReplaySink2.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroReplaySink2.port = 4546
agent1.sinks.avroReplaySink2.connect-timeout = 300000
agent1.sinks.avroReplaySink2.batch-size = 2000

agent1.sinks.avroMainSink3.type = avro
agent1.sinks.avroMainSink3.channel = mainChannel
agent1.sinks.avroMainSink3.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroMainSink3.port = 4547
agent1.sinks.avroMainSink3.connect-timeout = 30000
agent1.sinks.avroMainSink3.request-timeout = 20000
agent1.sinks.avroMainSink3.batch-size = 200

agent1.sinks.avroReplaySink3.type = avro
agent1.sinks.avroReplaySink3.channel = replayChannel
agent1.sinks.avroReplaySink3.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroReplaySink3.port = 4547
agent1.sinks.avroReplaySink3.connect-timeout = 300000
agent1.sinks.avroReplaySink3.batch-size = 2000

agent1.sinks.avroMainSink4.type = avro
agent1.sinks.avroMainSink4.channel = mainChannel
agent1.sinks.avroMainSink4.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroMainSink4.port = 4548
agent1.sinks.avroMainSink4.connect-timeout = 30000
agent1.sinks.avroMainSink4.request-timeout = 20000
agent1.sinks.avroMainSink4.batch-size = 200

agent1.sinks.avroReplaySink4.type = avro
agent1.sinks.avroReplaySink4.channel = replayChannel
agent1.sinks.avroReplaySink4.hostname = flumefs-v01-00a.bento.btrll.com
agent1.sinks.avroReplaySink4.port = 4548
agent1.sinks.avroReplaySink4.connect-timeout = 300000
agent1.sinks.avroReplaySink4.batch-size = 2000


agent1.sinkgroups.mainSinks.sinks = avroMainSink1 avroMainSink2 avroMainSink3 avroMainSink4
agent1.sinkgroups.mainSinks.processor.type = load_balance
agent1.sinkgroups.mainSinks.processor.selector = random
agent1.sinkgroups.mainSinks.processor.maxTimeOut = 1000
agent1.sinkgroups.mainSinks.processor.backoff = true

agent1.sinkgroups.replaySinks.sinks = avroReplaySink1 avroReplaySink2 avroReplaySink3 avroReplaySink4
agent1.sinkgroups.replaySinks.processor.type = load_balance
agent1.sinkgroups.replaySinks.processor.selector = random
agent1.sinkgroups.replaySinks.processor.maxTimeOut = 1000
agent1.sinkgroups.replaySinks.processor.backoff = true

agent1.channels = mainChannel replayChannel
agent1.sinkgroups = mainSinks replaySinks
agent1.sources = netcat spool
agent1.sinks = avroMainSink1 avroReplaySink1 avroMainSink2 avroReplaySink2 avroMainSink3 avroReplaySink3 avroMainSink4 avroReplaySink4