flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Johny Rufus <jru...@cloudera.com>
Subject Re: Load Balancing sink group not working
Date Thu, 16 Apr 2015 23:02:16 GMT
Sink Groups are designed to operate such that only one sink is chosen at
any time out of the configured sinks in the group, to process data out of
the channel. So this is the expected behavior. As you had noted, removing
the sink group  will result in each sink processing the data out of the
channel simultaneously.

Thanks,
Rufus

On Thu, Apr 16, 2015 at 3:30 PM, Rahul Ravindran <rahulrv@yahoo.com> wrote:

> Looking further into jstack, it looks like in the default configuration
> (without sink groups), there is a thread per sink, while there is only one
> thread when using the sink group which would explain the drop in throughput
> when using sink groups. Am I missing something?
>
> ~Rahul.
>
>
>
>   On Thursday, April 16, 2015 12:47 PM, Rahul Ravindran <rahulrv@yahoo.com>
> wrote:
>
>
> Hi,
> Below is my flume config and I am attempting to get Load Balancing sink
> group to LB across multiple machines. I see only 2 threads created for the
> entire sink group when using load balancing sink group and see the below
> message in the logs(and I see no throughput on draining events from the
> channel). On the other hand, if I comment out the sink group definition
> from the flume config and thus use the DefaultSinkProcessor, I see a lot
> more threads and events draining a lot faster. I suspect this is a problem
> with my config, but I could not find anything obvious. Could anyone here
> help?
>
> *Flume log output:*
> flume.log:16 Apr 2015 17:18:07,549 INFO [main]
> (org.apache.flume.node.Application.startAllComponents:138) - Starting new
> configuration:{ sourceRunners:{netcat=EventDrivenSourceRunner: {
> source:org.apache.flume.source.NetcatSource{name:netcat,state:IDLE} },
> spool=EventDrivenSourceRunner: {
> source:org.apache.flume.source.SpoolDirectorySource{name:spool,state:IDLE}
> }} sinkRunners:{mainSinks=SinkRunner: {
> policy:org.apache.flume.sink.LoadBalancingSinkProcessor@15f66cff
> counterGroup:{ name:null counters:{} } }, replaySinks=SinkRunner: {
> policy:org.apache.flume.sink.LoadBalancingSinkProcessor@656de49c
> counterGroup:{ name:null counters:{} } }}
> channels:{mainChannel=org.apache.flume.channel.MemoryChannel{name:
> mainChannel}, replayChannel=org.apache.flume.channel.MemoryChannel{name:
> replayChannel}} }
>
> *Flume config:*
> agent1.channels.mainChannel.type = MEMORY
> agent1.channels.mainChannel.capacity = 150000
> agent1.channels.mainChannel.transactionCapacity = 10000
>
> agent1.channels.replayChannel.type = MEMORY
> agent1.channels.replayChannel.capacity = 50000
> agent1.channels.replayChannel.transactionCapacity = 5000
>
> # netcat source
> agent1.sources.netcat.channels = mainChannel
> agent1.sources.netcat.type= netcat
> agent1.sources.netcat.bind = 127.0.0.1
> agent1.sources.netcat.port = 44444
> agent1.sources.netcat.ack-every-event = false
> agent1.sources.netcat.max-line-length = 8192
>
> # spool directory source
> agent1.sources.spool.channels = replayChannel
> agent1.sources.spool.type = spooldir
> agent1.sources.spool.bufferMaxLineLength = 8192
> agent1.sources.spool.bufferMaxLines = 1000
> agent1.sources.spool.batchSize = 1000
> agent1.sources.spool.spoolDir = /br/agent_aud/replay
> agent1.sources.spool.inputCharset = ISO-8859-1
> #Label the event as a replayed event
> agent1.sources.spool.interceptors = staticInterceptor
> agent1.sources.spool.interceptors.staticInterceptor.type = static
> agent1.sources.spool.interceptors.staticInterceptor.key = t
> agent1.sources.spool.interceptors.staticInterceptor.value = r
>
>
> agent1.sinks.avroMainSink1.type = avro
> agent1.sinks.avroMainSink1.channel = mainChannel
> agent1.sinks.avroMainSink1.hostname = flumefs-v01-00a.bento.btrll.com
> agent1.sinks.avroMainSink1.port = 4545
> agent1.sinks.avroMainSink1.connect-timeout = 30000
> agent1.sinks.avroMainSink1.request-timeout = 20000
> agent1.sinks.avroMainSink1.batch-size = 200
>
> agent1.sinks.avroReplaySink1.type = avro
> agent1.sinks.avroReplaySink1.channel = replayChannel
> agent1.sinks.avroReplaySink1.hostname = flumefs-v01-00a.bento.btrll.com
> agent1.sinks.avroReplaySink1.port = 4545
> agent1.sinks.avroReplaySink1.connect-timeout = 300000
> agent1.sinks.avroReplaySink1.batch-size = 2000
>
> agent1.sinks.avroMainSink2.type = avro
> agent1.sinks.avroMainSink2.channel = mainChannel
> agent1.sinks.avroMainSink2.hostname = flumefs-v01-00a.bento.btrll.com
> agent1.sinks.avroMainSink2.port = 4546
> agent1.sinks.avroMainSink2.connect-timeout = 30000
> agent1.sinks.avroMainSink2.request-timeout = 20000
> agent1.sinks.avroMainSink2.batch-size = 200
>
> agent1.sinks.avroReplaySink2.type = avro
> agent1.sinks.avroReplaySink2.channel = replayChannel
> agent1.sinks.avroReplaySink2.hostname = flumefs-v01-00a.bento.btrll.com
> agent1.sinks.avroReplaySink2.port = 4546
> agent1.sinks.avroReplaySink2.connect-timeout = 300000
> agent1.sinks.avroReplaySink2.batch-size = 2000
>
> agent1.sinks.avroMainSink3.type = avro
> agent1.sinks.avroMainSink3.channel = mainChannel
> agent1.sinks.avroMainSink3.hostname = flumefs-v01-00a.bento.btrll.com
> agent1.sinks.avroMainSink3.port = 4547
> agent1.sinks.avroMainSink3.connect-timeout = 30000
> agent1.sinks.avroMainSink3.request-timeout = 20000
> agent1.sinks.avroMainSink3.batch-size = 200
>
> agent1.sinks.avroReplaySink3.type = avro
> agent1.sinks.avroReplaySink3.channel = replayChannel
> agent1.sinks.avroReplaySink3.hostname = flumefs-v01-00a.bento.btrll.com
> agent1.sinks.avroReplaySink3.port = 4547
> agent1.sinks.avroReplaySink3.connect-timeout = 300000
> agent1.sinks.avroReplaySink3.batch-size = 2000
>
> agent1.sinks.avroMainSink4.type = avro
> agent1.sinks.avroMainSink4.channel = mainChannel
> agent1.sinks.avroMainSink4.hostname = flumefs-v01-00a.bento.btrll.com
> agent1.sinks.avroMainSink4.port = 4548
> agent1.sinks.avroMainSink4.connect-timeout = 30000
> agent1.sinks.avroMainSink4.request-timeout = 20000
> agent1.sinks.avroMainSink4.batch-size = 200
>
> agent1.sinks.avroReplaySink4.type = avro
> agent1.sinks.avroReplaySink4.channel = replayChannel
> agent1.sinks.avroReplaySink4.hostname = flumefs-v01-00a.bento.btrll.com
> agent1.sinks.avroReplaySink4.port = 4548
> agent1.sinks.avroReplaySink4.connect-timeout = 300000
> agent1.sinks.avroReplaySink4.batch-size = 2000
>
>
> agent1.sinkgroups.mainSinks.sinks = avroMainSink1 avroMainSink2
> avroMainSink3 avroMainSink4
> agent1.sinkgroups.mainSinks.processor.type = load_balance
> agent1.sinkgroups.mainSinks.processor.selector = random
> agent1.sinkgroups.mainSinks.processor.maxTimeOut = 1000
> agent1.sinkgroups.mainSinks.processor.backoff = true
>
> agent1.sinkgroups.replaySinks.sinks = avroReplaySink1 avroReplaySink2
> avroReplaySink3 avroReplaySink4
> agent1.sinkgroups.replaySinks.processor.type = load_balance
> agent1.sinkgroups.replaySinks.processor.selector = random
> agent1.sinkgroups.replaySinks.processor.maxTimeOut = 1000
> agent1.sinkgroups.replaySinks.processor.backoff = true
>
> agent1.channels = mainChannel replayChannel
> agent1.sinkgroups = mainSinks replaySinks
> agent1.sources = netcat spool
> agent1.sinks = avroMainSink1 avroReplaySink1 avroMainSink2 avroReplaySink2
> avroMainSink3 avroReplaySink3 avroMainSink4 avroReplaySink4
>
>
>
>
>

Mime
View raw message