You are mostly correct. The one thing I might add that may help is to know that the sink is consuming the events from the channel, writing them to the next hop source and then committing the transaction. As opposed to the channel pushing the events, as the channel is a passive component. You should be able to leave out the this.agent.put(null); and just sleep to allow the sink enough time to write the events in the channel to the next hop and commit. As long as the agent is running the sink will try to pull events off the channel. If there are no events left it will simply backoff and try again.

Another option would be to poll the ChannelSize metric to know when the channel is actually empty and then make the call to gracefully stop the agent. I know there is one open jira that relates to this here.

It might be worth opening another to improve the graceful shutdown of the embedded agent.



On Fri, Jul 4, 2014 at 9:08 AM, Adam Higginson <> wrote:



I’m currently attempting to make use of an Embedded Agent with a file based channel which will be used to write to another agent and then ultimately into a hdfs sequence file. The connection is successfully made, and data is sent across to the other agent (and then a sequence file). However, I’m trying to understand how to close the agent and flush out any ‘local’ data that still may need to be transferred on program close.


As I am using an AvroSink, I read that if a channel sends a null event the batch is immediately sent (read from My idea, then, was to pass null to the agent just as the program is requested to stop, like so:





This seemed to attempt to send the batch, however when the agent.stop() method is called, an error is printed in the logs:


org.apache.flume.EventDeliveryException: Failed to send events

       at org.apache.flume.sink.AbstractRpcSink.process( ~[flume-ng-core-1.4.0-cdh4.4.0.jar:na]

       at org.apache.flume.sink.FailoverSinkProcessor.process( ~[flume-ng-core-1.4.0-cdh4.4.0.jar:na]

       at org.apache.flume.SinkRunner$ [flume-ng-core-1.4.0-cdh4.4.0.jar:na]

       at [na:1.7.0_45]

Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: oraclelinux6.localdomain, port: 44444 }: Failed to send batch

       at org.apache.flume.api.NettyAvroRpcClient.appendBatch( ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]

       at org.apache.flume.sink.AbstractRpcSink.process( ~[flume-ng-core-1.4.0-cdh4.4.0.jar:na]

       ... 3 common frames omitted

Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: oraclelinux6.localdomain, port: 44444 }: Interrupted in handshake

       at org.apache.flume.api.NettyAvroRpcClient.appendBatch( ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]

       at org.apache.flume.api.NettyAvroRpcClient.appendBatch( ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]

       ... 4 common frames omitted

Caused by: java.lang.InterruptedException: null

       at java.util.concurrent.FutureTask.awaitDone( ~[na:1.7.0_45]

       at java.util.concurrent.FutureTask.get( ~[na:1.7.0_45]

       at org.apache.flume.api.NettyAvroRpcClient.appendBatch( ~[flume-ng-sdk-1.4.0-cdh4.4.0.jar:na]

       ... 5 common frames omitted


From the error message, “Interrupted in handshake”, I would suggest that whilst attempting to send the batch, the agent.stop() method interrupted it and caused the exception. When I put a Thread.sleep(10000) between putting a null event and the stop method, the problem seems to disappear and the batch is successfully flushed (from what I can see). I was wondering whether I have understood fully what is going on, and if so, whether there is a more elegant way of shutting down the embedded agent and flushing any events that are still stored locally?


Any help would be greatly appreciated.


Thanks and kind regards,



The information included in this email and any files transmitted with it may contain information that is confidential and it must not be used by, or its contents or attachments copied or disclosed, to persons other than the intended addressee. If you have received this email in error, please notify BJSS. In the absence of written agreement to the contrary BJSS' relevant standard terms of contract for any work to be undertaken will apply. Please carry out virus or such other checks as you consider appropriate in respect of this email. BJSS do not accept responsibility for any adverse effect upon your system or data in relation to this email or any files transmitted with it. BJSS Limited, a company registered in England and Wales (Company Number 2777575), VAT Registration Number 613295452, Registered Office Address, First Floor, Coronet House, Queen Street, Leeds, LS1 2TW