I'm using Memory channel along with Kite dataset sink and keep running into
this error:
~~~~
ERROR kafka.KafkaSource: KafkaSource EXCEPTION, {}
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel.MemoryChannel{name: tracksChannel}
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:123)
at
org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue
couldn't be acquired. Sinks are likely not keeping up with sources, or the
buffer size is too tight
at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
~~~~
I tried different combinations of channel capacity and transactionCapacity
for the channel and various sink's batchSize < channel's
transactionCapacity.
Here is one of those combinations of configs:
~~~~~~
myAgent.sinks.mySink.kite.batchSize = 10000
myAgent.sinks.myHdfsSink.kite.rollInterval = 300
myAgent.channels.myChannel.type = memory
myAgent.channels.myChannel.capacity = 200000
myAgent.channels.myChannel.transactionCapacity = 20000
~~~~~~
Its currently a single flume agent and I look for recommendations on
different tiered architectures involving Flume, Kafka and ultimately to the
Kite dataset sink to help overcome this issue.
Thanks!
|