I'm seeing the same thing :)

Mine is all on a local LAN though, so the fact that an RPC call doesn't reply in 10000ms or 20000ms is quite odd.  My configuration is for the most part the same as Denis' configuration.  Two tiered system, ExecSources running tail -F on log files to an AvroSink, to an AvroSource, loading into HDFS on the back tier.

I, too, see this on the AvroSink

Either (A):
[2013-04-15 23:57:14.827] [org.apache.flume.sink.LoadBalancingSinkProcessor] [ WARN] [SinkRunner-PollingRunner-LoadBalancingSinkProcessor] []  (LoadBalancingSinkProcessor.java:process:154) Sink failed to consume event. Attempting next sink if available.
org.apache.flume.EventDeliveryException: Failed to send events
        at org.apache.flume.sink.AvroSink.process(AvroSink.java:324)
        at org.apache.flume.sink.LoadBalancingSinkProcessor.process(LoadBalancingSinkProcessor.java:151)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: hadoopjt01.pegs.com, port: 10000 }: Failed to send batch
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
        at org.apache.flume.sink.AvroSink.process(AvroSink.java:308)
        ... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: hadoopjt01.pegs.com, port: 10000 }: Handshake timed out after 20000ms
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:280)
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
        ... 4 more
Caused by: java.util.concurrent.TimeoutException
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
        at java.util.concurrent.FutureTask.get(FutureTask.java:91)
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:278)
        ... 5 more

or (B):
[2013-04-15 19:49:01.135] [org.apache.flume.sink.LoadBalancingSinkProcessor] [ WARN] [SinkRunner-PollingRunner-LoadBalancingSinkProcessor] []  (LoadBalancingSinkProcessor.java:process:154) Sink failed to consume event. Attempting next sink if available.
org.apache.flume.EventDeliveryException: Failed to send events
        at org.apache.flume.sink.AvroSink.process(AvroSink.java:324)
        at org.apache.flume.sink.LoadBalancingSinkProcessor.process(LoadBalancingSinkProcessor.java:151)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:619)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: hadoopjt01.pegs.com, port: 10000 }: Failed to send batch
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
        at org.apache.flume.sink.AvroSink.process(AvroSink.java:308)
        ... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: hadoopjt01.pegs.com, port: 10000 }: RPC request timed out
        at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:321)
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
        ... 4 more
Caused by: java.util.concurrent.TimeoutException
        at org.apache.avro.ipc.CallFuture.get(CallFuture.java:132)
        at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310)
        ... 6 more

The only thing I see on the AvroSource tier is the disconnect/reconnect happening:

[2013-04-15 19:49:00.992] [org.apache.avro.ipc.NettyServer] [ INFO] [pool-11-thread-10] []  (NettyServer.java:handleUpstream:171) [id: 0x2a24ed78, /138.113.127.4:34481 :> /138.113.127.72:10000] DISCONNECTED
[2013-04-15 19:49:00.992] [org.apache.avro.ipc.NettyServer] [ INFO] [pool-11-thread-10] []  (NettyServer.java:handleUpstream:171) [id: 0x2a24ed78, /138.113.127.4:34481 :> /138.113.127.72:10000] UNBOUND
[2013-04-15 19:49:00.992] [org.apache.avro.ipc.NettyServer] [ INFO] [pool-11-thread-10] []  (NettyServer.java:handleUpstream:171) [id: 0x2a24ed78, /138.113.127.4:34481 :> /138.113.127.72:10000] CLOSED
[2013-04-15 19:49:00.993] [org.apache.avro.ipc.NettyServer] [ INFO] [pool-11-thread-10] []  (NettyServer.java:channelClosed:209) Connection to /138.113.127.4:34481 disconnected.
[2013-04-15 19:49:03.331] [org.apache.avro.ipc.NettyServer] [ INFO] [pool-10-thread-1] []  (NettyServer.java:handleUpstream:171) [id: 0x3883b82e, /138.113.127.4:62442 => /138.113.127.72:10000] OPEN
[2013-04-15 19:49:03.332] [org.apache.avro.ipc.NettyServer] [ INFO] [pool-11-thread-13] []  (NettyServer.java:handleUpstream:171) [id: 0x3883b82e, /138.113.127.4:62442 => /138.113.127.72:10000] BOUND: /138.113.127.72:10000
[2013-04-15 19:49:03.333] [org.apache.avro.ipc.NettyServer] [ INFO] [pool-11-thread-13] []  (NettyServer.java:handleUpstream:171) [id: 0x3883b82e, /138.113.127.4:62442 => /138.113.127.72:10000] CONNECTED: /138.113.127.4:62442

Is there some way to determine exactly where this bottleneck is?  The config options for AvroSource/Sink are quite short, so there is not much tuning to do.  Here is what I have:

# avro-hadoopjt01-sink properties
udprodae01.sinks.avro-hadoopjt01-sink.type = avro
udprodae01.sinks.avro-hadoopjt01-sink.hostname = hadoopjt01.pegs.com
udprodae01.sinks.avro-hadoopjt01-sink.port = 10000
udprodae01.sinks.avro-hadoopjt01-sink.batch-size = 100

# avro-hadoopjt01-source properties
hadoopjt01-1.sources.avro-hadoopjt01-source.type = avro
hadoopjt01-1.sources.avro-hadoopjt01-source.bind = hadoopjt01.pegs.com
hadoopjt01-1.sources.avro-hadoopjt01-source.port = 10000
hadoopjt01-1.sources.avro-hadoopjt01-source.threads = 64

I can try increasing the AvroSink timeout values, but they seem quite adequate at the defaults.  Maybe more threads on the AvroSource?

Any advice would be much appreciated!
Chris


On Wed, Feb 6, 2013 at 12:10 PM, Denis Lowe <denis.lowe@gmail.com> wrote:
I noticed that the logs in the destination server were reporting dropped connections from the upstream server:

NettyServer$NettyServerAvroHandler.handleUpstream:171)  - [id: 0x08e56d76, /SOURCE_HOST:43599 :> /LOCAL_HOST:4003] DISCONNECTED
NettyServer$NettyServerAvroHandler.handleUpstream:171)  - [id: 0x08e56d76, /SOURCE_HOST:43599 :> /LOCAL_HOST:4003] UNBOUND
NettyServer$NettyServerAvroHandler.handleUpstream:171)  - [id: 0x08e56d76, /SOURCE_HOST:43599 :> /LOCAL_HOST:4003] CLOSED
NettyServer$NettyServerAvroHandler.channelClosed:209)  - Connection to /SOURCE_HOST:43599 disconnected.

The other thing I observed is that these errors only ever occur from our EU servers (connecting to our centralized downstream collectors in US West) - We are running Flume in Amazon EC2

I can see in the log that the connection is restored quite quickly.

I gather that the network latency between Europe and US West is causing the connection between the 2 servers to 'appear' lost, thus resulting in the above errors?

Are there any recommended config settings to compensate for this?

On 6 February 2013 00:21, Juhani Connolly <juhani_connolly@cyberagent.co.jp> wrote:
Is there anything unusual in the logs for the destination(avroSource) server

Since the error is happening in the AvroSink, no data is getting lost. The failed data will get rolled back, removal from the local channel is cancelled, and it will attempt to resend it.


On 02/06/2013 03:23 PM, Denis Lowe wrote:
We are running Flume-NG 1.3.1 and have noticed periodically the following ERROR occurring (a few times daily):

We are using the File Channel connecting to 2 downstream collector agents in 'round_robin' mode, using avro source/sinks.

We are using the config described below to deliver 5 different log types (to 5 different ports downstream) and have observed the below error occurring randomly across all the ports.

We tried doubling the connect-timeout to 40000 (from the default of 20000) with no success.
The agent appears to recover and keep on processing data.

My question is:
Has this data been lost? or will flume eventually retry until a successfull delivery has been made?
Are there any other config changes I can make to prevent/reduce this occurring in the future?

05 Feb 2013 23:12:21,650 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
        at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:662)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: collector1, port: 4003 }: Failed to send batch
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
        at org.apache.flume.sink.AvroSink.process(AvroSink.java:309)
        ... 3 more
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: collector2, port: 4003 }: RPC request timed out
        at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:321)
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
        at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
        ... 4 more
Caused by: java.util.concurrent.TimeoutException
        at org.apache.avro.ipc.CallFuture.get(CallFuture.java:132)
        at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310)
        ... 6 more

Below is a snapshot the current config:

agent.sources.eventdata.command = tail -qn +0 -F /var/log/event-logs/live/eventdata.log
agent.sources.eventdata.channels = eventdata-avro-channel
agent.sources.eventdata.batchSize = 10
agent.sources.eventdata.restart = true

## event source interceptor
agent.sources.eventdata.interceptors.host-interceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent.sources.eventdata.interceptors.host-interceptor.hostHeader = source-host
agent.sources.eventdata.interceptors.host-interceptor.useIP = false

## eventdata channel
agent.channels.eventdata-avro-channel.type = file
agent.channels.eventdata-avro-channel.checkpointDir = /mnt/flume-ng/checkpoint/eventdata-avro-channel
agent.channels.eventdata-avro-channel.dataDirs = /mnt/flume-ng/data1/eventdata-avro-channel,/mnt/flume-ng/data2/eventdata-avro-channel
agent.channels.eventdata-avro-channel.maxFileSize = 210000000
agent.channels.eventdata-avro-channel.capacity = 2000000
agent.channels.eventdata-avro-channel.checkpointInterval = 300000
agent.channels.eventdata-avro-channel.transactionCapacity = 10000
agent.channels.eventdata-avro-channel.keep-alive = 20
agent.channels.eventdata-avro-channel.write-timeout = 20

## 2 x downstream click sinks for load balancing and failover
agent.sinks.eventdata-avro-sink-1.type = avro
agent.sinks.eventdata-avro-sink-1.channel = eventdata-avro-channel
agent.sinks.eventdata-avro-sink-1.hostname = collector1
agent.sinks.eventdata-avro-sink-1.port = 4003
agent.sinks.eventdata-avro-sink-1.batch-size = 100
agent.sinks.eventdata-avro-sink-1.connect-timeout = 40000

agent.sinks.eventdata-avro-sink-2.type = avro
agent.sinks.eventdata-avro-sink-2.channel = eventdata-avro-channel
agent.sinks.eventdata-avro-sink-2.hostname = collector2
agent.sinks.eventdata-avro-sink-2.port = 4003
agent.sinks.eventdata-avro-sink-2.batch-size = 100
agent.sinks.eventdata-avro-sink-2.connect-timeout = 40000

## load balance config
agent.sinkgroups = eventdata-avro-sink-group
agent.sinkgroups.eventdata-avro-sink-group.sinks = eventdata-avro-sink-1 eventdata-avro-sink-2
agent.sinkgroups.eventdata-avro-sink-group.processor.type = load_balance
agent.sinkgroups.eventdata-avro-sink-group.processor.selector = round_robin

Denis.