flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cameron Gandevia <cgande...@gmail.com>
Subject Re: Flume configuration fail-over problems
Date Tue, 16 Oct 2012 19:25:31 GMT
I changed my configuration to write to two HDFS instances to eliminate the
ElasticSearch sink.

I noticed the following exception on the source agent.

*2012-10-16 12:03:49,989 (SinkRunner-PollingRunner-FailoverSinkProcessor)
[DEBUG -
org.apache.flume.sink.AvroSink.destroyConnection(AvroSink.java:199)] Avro
sink avroSink-1 closing avro client: NettyAvroRpcClient { host: localhost,
port: 4545 }*
*2012-10-16 12:03:49,993 (SinkRunner-PollingRunner-FailoverSinkProcessor)
[WARN -
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)]
Sink avroSink-1 failed and has been sent to failover list*
*org.apache.flume.EventDeliveryException: Failed to send events*
* at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)*
* at
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
*
* at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)*
* at java.lang.Thread.run(Thread.java:662)*
*Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
host: localhost, port: 4545 }: Failed to send batch*
* at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
*
* at org.apache.flume.sink.AvroSink.process(AvroSink.java:309)*
* ... 3 more*
*Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
host: localhost, port: 4545 }: Avro RPC call returned Status: FAILED*
* at
org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:312)
*
* at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
*
* at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
*
* ... 4 more*
*2012-10-16 12:03:51,998 (SinkRunner-PollingRunner-FailoverSinkProcessor)
[INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
Avro sink avroSink-1: Building RpcClient with hostname: 127.0.0.1, port:
4545*
*2012-10-16 12:03:51,999 (SinkRunner-PollingRunner-FailoverSinkProcessor)
[DEBUG -
org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:420)]
Batch size string = null*
*2012-10-16 12:03:52,001 (SinkRunner-PollingRunner-FailoverSinkProcessor)
[DEBUG -
org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:192)] Avro
sink avroSink-1: Created RpcClient: NettyAvroRpcClient { host: localhost,
port: 4545 }*
*2012-10-16 12:03:55,008 (SinkRunner-PollingRunner-FailoverSinkProcessor)
[DEBUG -
org.apache.flume.sink.AvroSink.destroyConnection(AvroSink.java:199)] Avro
sink avroSink-1 closing avro client: NettyAvroRpcClient { host: localhost,
port: 4545 }*
*2012-10-16 12:03:55,009 (SinkRunner-PollingRunner-FailoverSinkProcessor)
[DEBUG -
org.apache.flume.sink.FailoverSinkProcessor$FailedSink.incFails(FailoverSinkProcessor.java:97)]
Sink avroSink-1 failed again, new refresh is at 1350414239009, current time
1350414235009*
*2012-10-16 12:03:59,341 (SinkRunner-PollingRunner-FailoverSinkProcessor)
[INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
Avro sink avroSink-1: Building RpcClient with hostname: 127.0.0.1, port:
4545*
*2012-10-16 12:03:59,342 (SinkRunner-PollingRunner-FailoverSinkProcessor)
[DEBUG -
org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:420)]
Batch size string = null*
*2012-10-16 12:03:59,347 (SinkRunner-PollingRunner-FailoverSinkProcessor)
[DEBUG -
org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:192)] Avro
sink avroSink-1: Created RpcClient: NettyAvroRpcClient { host: localhost,
port: 4545 }*
*2012-10-16 12:04:13,849 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:../conf/flume.conf for changes*
*2012-10-16 12:04:43,850 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:../conf/flume.conf for changes*
*2012-10-16 12:05:13,850 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:../conf/flume.conf for changes*

Then on the first collector

*2012-10-16 12:03:52,003 (New I/O server worker #1-2) [DEBUG -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro
source avroSource-1: Received avro event batch of 100 events.*
*2012-10-16 12:03:55,004 (New I/O server worker #1-2) [ERROR -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
source avroSource-1: Unable to process event batch. Exception follows.*
*org.apache.flume.ChannelException: Unable to put batch on required
channel: org.apache.flume.channel.MemoryChannel{name: memoryChannel-1}*
* at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
*
* at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)*
* at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)*
* at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
*
* at java.lang.reflect.Method.invoke(Method.java:597)*
* at
org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
*
* at org.apache.avro.ipc.Responder.respond(Responder.java:149)*
* at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
*
* at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
*
* at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
*
* at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
*
* at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783)
*
* at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)
*
* at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
*
* at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)
*
* at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:214)
*
* at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
*
* at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
*
* at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
*
* at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
*
* at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
*
* at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)*
* at
org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
*
* at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)*
* at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
*
* at
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
*
* at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
*
* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
*
* at java.lang.Thread.run(Thread.java:662)*
*Caused by: org.apache.flume.ChannelException: Space for commit to queue
couldn't be acquired Sinks are likely not keeping up with sources, or the
buffer size is too tight*
* at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:91)
*
* at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
*
* at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
*
* ... 28 more*
*2012-10-16 12:03:55,009 (New I/O server worker #1-2) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x023461d1, /127.0.0.1:44380 :> /127.0.0.1:4545] DISCONNECTED*
*2012-10-16 12:03:55,009 (New I/O server worker #1-2) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x023461d1, /127.0.0.1:44380 :> /127.0.0.1:4545] UNBOUND*
*2012-10-16 12:03:55,009 (New I/O server worker #1-2) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x023461d1, /127.0.0.1:44380 :> /127.0.0.1:4545] CLOSED*
*2012-10-16 12:03:55,009 (New I/O server worker #1-2) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)]
Connection to /127.0.0.1:44380 disconnected.*
*2012-10-16 12:03:59,344 (New I/O server boss #1 ([id: 0x42aa0877,
/127.0.0.1:4545])) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x428d5aad, /127.0.0.1:44381 => /127.0.0.1:4545] OPEN*
*2012-10-16 12:03:59,345 (New I/O server worker #1-3) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x428d5aad, /127.0.0.1:44381 => /127.0.0.1:4545] BOUND: /127.0.0.1:4545
*
*2012-10-16 12:03:59,346 (New I/O server worker #1-3) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x428d5aad, /127.0.0.1:44381 => /127.0.0.1:4545] CONNECTED: /
127.0.0.1:44381*
*
*
I didn't see an exception on the second collector but everything seems to
stop. All I see on the source and two collectors is.

*2012-10-16 12:18:04,142 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:../conf/flume.conf for changes*
*2012-10-16 12:18:34,143 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:../conf/flume.conf for changes*
*2012-10-16 12:19:04,143 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:../conf/flume.conf for changes*
*2012-10-16 12:19:34,144 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:../conf/flume.conf for changes*
*2012-10-16 12:20:04,144 (conf-file-poller-0) [DEBUG -
org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:../conf/flume.conf for changes*

I also noticed I don't see any mention of the avro-2 sink on the source
agent.

I looked at the exposed JSON stats and after 15 mins nothing has changed.

This is from the collector


   -
   {
      - OpenConnectionCount: "1",
      - AppendBatchAcceptedCount: "1000",
      - AppendBatchReceivedCount: "1002",
      - Type: "SOURCE",
      - EventAcceptedCount: "100000",
      - AppendReceivedCount: "0",
      - StopTime: "0",
      - EventReceivedCount: "100200",
      - StartTime: "1350414218690",
      - AppendAcceptedCount: "0"
      },
   - CHANNEL.memoryChannel-1:
   {
      - EventPutSuccessCount: "100000",
      - ChannelFillPercentage: "100.0",
      - Type: "CHANNEL",
      - StopTime: "0",
      - EventPutAttemptCount: "100200",
      - ChannelSize: "100000",
      - StartTime: "1350414218432",
      - EventTakeSuccessCount: "0",
      - ChannelCapacity: "100000",
      - EventTakeAttemptCount: "0"
      },
   - CHANNEL.memoryChannel-2:
   {
      - EventPutSuccessCount: "100200",
      - ChannelFillPercentage: "0.0",
      - Type: "CHANNEL",
      - StopTime: "0",
      - EventPutAttemptCount: "100200",
      - ChannelSize: "0",
      - StartTime: "1350414218432",
      - EventTakeSuccessCount: "100200",
      - ChannelCapacity: "100000",
      - EventTakeAttemptCount: "100470"
      },
   - SINK.hdfs-sink-1:
   {
      - BatchCompleteCount: "501",
      - ConnectionFailedCount: "0",
      - EventDrainAttemptCount: "50203",
      - ConnectionCreatedCount: "1",
      - BatchEmptyCount: "133",
      - Type: "SINK",
      - ConnectionClosedCount: "1",
      - EventDrainSuccessCount: "50203",
      - StopTime: "0",
      - StartTime: "1350414218434",
      - BatchUnderflowCount: "2"
      },
   - SINK.hdfs-sink-2:
   {
      - BatchCompleteCount: "499",
      - ConnectionFailedCount: "0",
      - EventDrainAttemptCount: "49997",
      - ConnectionCreatedCount: "1",
      - BatchEmptyCount: "133",
      - Type: "SINK",
      - ConnectionClosedCount: "1",
      - EventDrainSuccessCount: "49997",
      - StopTime: "0",
      - StartTime: "1350414218434",
      - BatchUnderflowCount: "2"
      }

}

and the source

{

   - SINK.avroSink-2:
   {
      - BatchCompleteCount: "1000",
      - ConnectionFailedCount: "0",
      - EventDrainAttemptCount: "100000",
      - ConnectionCreatedCount: "1",
      - BatchEmptyCount: "1",
      - Type: "SINK",
      - ConnectionClosedCount: "0",
      - EventDrainSuccessCount: "100000",
      - StopTime: "0",
      - StartTime: "1350414224029",
      - BatchUnderflowCount: "0"
      },
   - SINK.avroSink-1:
   {
      - BatchCompleteCount: "1002",
      - ConnectionFailedCount: "0",
      - EventDrainAttemptCount: "100200",
      - ConnectionCreatedCount: "3",
      - BatchEmptyCount: "137",
      - Type: "SINK",
      - ConnectionClosedCount: "2",
      - EventDrainSuccessCount: "100000",
      - StopTime: "0",
      - StartTime: "1350414223694",
      - BatchUnderflowCount: "0"
      },
   - CHANNEL.memoryChannel-1:
   {
      - EventPutSuccessCount: "200000",
      - ChannelFillPercentage: "0.0",
      - Type: "CHANNEL",
      - StopTime: "0",
      - EventPutAttemptCount: "200010",
      - ChannelSize: "0",
      - StartTime: "1350414223192",
      - EventTakeSuccessCount: "200000",
      - ChannelCapacity: "100000",
      - EventTakeAttemptCount: "200338"
      }

}

On Tue, Oct 16, 2012 at 11:37 AM, Brock Noland <brock@cloudera.com> wrote:

> Hi,
>
> Can you confirm that ElasticSearchSink is consuming events?
>
> > Once this happens flume never resumes.
>
> It doesn't bring anything after that? Even minutes later it's still
> doing nothing?
>
> brock
>
> On Tue, Oct 16, 2012 at 1:23 PM, Cameron Gandevia <cgandevia@gmail.com>
> wrote:
> > Hi
> >
> > I am trying to setup a configuration where a single source agent load
> > balances between two aggregate agents which multiplex the flow to two end
> > points. I don't think I have the channel capacity properly configured but
> > flume always seems to end up hanging for me. If all channels are at
> capacity
> > should the source try and send again once the they have emptied?
> >
> > Here is my configuration.
> >
> > #
> > # Properties of memoryChannel
> > #
> > local_agent.channels.memoryChannel-1.type = memory
> > local_agent.channels.memoryChannel-1.capacity = 100000
> > local_agent.channels.memoryChannel-1.transactionCapacity = 1000
> >
> > collector_agent_1.channels.memoryChannel-1.type = memory
> > collector_agent_1.channels.memoryChannel-1.capacity = 100000
> > collector_agent_1.channels.memoryChannel-1.transactionCapacity = 1000
> >
> > collector_agent_1.channels.memoryChannel-2.type = memory
> > collector_agent_1.channels.memoryChannel-2.capacity = 100000
> > collector_agent_1.channels.memoryChannel-2.transactionCapacity = 1000
> >
> > collector_agent_2.channels.memoryChannel-1.type = memory
> > collector_agent_2.channels.memoryChannel-1.capacity = 100000
> > collector_agent_2.channels.memoryChannel-1.transactionCapacity = 1000
> >
> > collector_agent_2.channels.memoryChannel-2.type = memory
> > collector_agent_2.channels.memoryChannel-2.capacity = 100000
> > collector_agent_2.channels.memoryChannel-2.transactionCapacity = 1000
> >
> > #
> > # Properties for spooling directory source
> > #
> > local_agent.sources.spooldir-1.type = spooldir
> > local_agent.sources.spooldir-1.spoolDir = ~/flume_test/ready
> > local_agent.sources.spooldir-1.fileHeader = true
> > local_agent.sources.spooldir-1.channels = memoryChannel-1
> >
> > #
> > # Properties for the avro sink 1 agent to collector 1
> > #
> > local_agent.sinks.avroSink-1.type = avro
> > local_agent.sinks.avroSink-1.hostname = 127.0.0.1
> > local_agent.sinks.avroSink-1.port = 4545
> > local_agent.sinks.avroSink-1.channel = memoryChannel-1
> >
> > #
> > # Properties for the avro sink agent to collector 2
> > #
> > local_agent.sinks.avroSink-2.type = avro
> > local_agent.sinks.avroSink-2.hostname = 127.0.0.1
> > local_agent.sinks.avroSink-2.port = 4546
> > local_agent.sinks.avroSink-2.channel = memoryChannel-1
> >
> > #
> > # Properties for the avro source collector 1
> > #
> > collector_agent_1.sources.avroSource-1.type = avro
> > collector_agent_1.sources.avroSource-1.bind = 127.0.0.1
> > collector_agent_1.sources.avroSource-1.port = 4545
> > collector_agent_1.sources.avroSource-1.channels = memoryChannel-1
> > memoryChannel-2
> >
> > #
> > # Properties for the avro source collector 2
> > #
> > collector_agent_2.sources.avroSource-2.type = avro
> > collector_agent_2.sources.avroSource-2.bind = 127.0.0.1
> > collector_agent_2.sources.avroSource-2.port = 4546
> > collector_agent_2.sources.avroSource-2.channels = memoryChannel-1
> > memoryChannel-2
> >
> > # End points for collector 1
> >
> > # ElasticSearch endpoint collector 1
> >
> > collector_agent_1.sinks.elastic-search-sink-1.type =
> > org.apache.flume.sink.elasticsearch.ElasticSearchSink
> > collector_agent_1.sinks.elastic-search-sink-1.hostNames = 127.0.0.1:9300
> > collector_agent_1.sinks.elastic-search-sink-1.clusterName = elasticsearch
> > collector_agent_1.sinks.elastic-search-sink-1.batchSize = 10
> > collector_agent_1.sinks.elastic-search-sink-1.channel = memoryChannel-1
> >
> > # HDFS endpoint collector 1
> >
> > collector_agent_1.sinks.sink1.type = hdfs
> > collector_agent_1.sinks.sink1.hdfs.path =
> > hdfs://hadoop-name-node1.dc1.ci-mang.van.dev.net:8020/flumeng_test
> > collector_agent_1.sinks.sink1.hdfs.fileType = DataStream
> > collector_agent_1.sinks.sink1.hdfs.rollInterval = 300
> > collector_agent_1.sinks.sink1.hdfs.rollSize = 0
> > collector_agent_1.sinks.sink1.hdfs.rollCount = 0
> > collector_agent_1.sinks.sink1.hdfs.batchSize = 1000
> > collector_agent_1.sinks.sink1.txnEventMax = 1000
> > collector_agent_1.sinks.sink1.serializer = avro_event
> > collector_agent_1.sinks.sink1.channel = memoryChannel-2
> >
> > # ElasticSearch endpoint collector 2
> >
> > collector_agent_2.sinks.elastic-search-sink-1.type =
> > org.apache.flume.sink.elasticsearch.ElasticSearchSink
> > collector_agent_2.sinks.elastic-search-sink-1.hostNames = 127.0.0.1:9300
> > collector_agent_2.sinks.elastic-search-sink-1.clusterName = elasticsearch
> > collector_agent_2.sinks.elastic-search-sink-1.batchSize = 10
> > collector_agent_2.sinks.elastic-search-sink-1.channel = memoryChannel-1
> >
> > # HDFS endpoint collector 2
> >
> > collector_agent_2.sinks.sink1.type = hdfs
> > collector_agent_2.sinks.sink1.hdfs.path =
> > hdfs://hadoop-name-node1.dc1.ci-mang.van.dev.net:8020/flumeng_test_3
> > collector_agent_2.sinks.sink1.hdfs.fileType = DataStream
> > collector_agent_2.sinks.sink1.hdfs.rollInterval = 300
> > collector_agent_2.sinks.sink1.hdfs.rollSize = 0
> > collector_agent_2.sinks.sink1.hdfs.rollCount = 0
> > collector_agent_2.sinks.sink1.hdfs.batchSize = 1000
> > collector_agent_2.sinks.sink1.txnEventMax = 1000
> > collector_agent_2.sinks.sink1.serializer = avro_event
> > collector_agent_2.sinks.sink1.channel = memoryChannel-2
> >
> > # Specify priorities for the sinks on the agent
> >
> > local_agent.sinkgroups.ha.sinks = avroSink-1 avroSink-2
> > local_agent.sinkgroups.ha.processor.type = failover
> > local_agent.sinkgroups.ha.priority.avroSink-1 = 2
> > local_agent.sinkgroups.ha.priority.avroSink-2 = 1
> >
> > # Wire the source agents up
> >
> > local_agent.sources = spooldir-1
> > local_agent.sinks = avroSink-1 avroSink-2
> > local_agent.sinkgroups = ha
> > local_agent.channels = memoryChannel-1
> >
> > # Wire the collector agents up
> >
> > collector_agent_1.sources = avroSource-1
> > collector_agent_1.sinks = elastic-search-sink-1 sink1
> > collector_agent_1.channels = memoryChannel-1 memoryChannel-2
> >
> > collector_agent_2.sources = avroSource-2
> > collector_agent_2.sinks = elastic-search-sink-1 sink1
> > collector_agent_2.channels = memoryChannel-1 memoryChannel-2
> >
> > I will get the following exceptions on the collector nodes
> >
> > org.apache.flume.ChannelException: Unable to put batch on required
> channel:
> > org.apache.flume.channel.MemoryChannel{name: memoryChannel-1}
> > at
> >
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
> > at org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
> > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> > at java.lang.reflect.Method.invoke(Method.java:597)
> > at
> >
> org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
> > at org.apache.avro.ipc.Responder.respond(Responder.java:149)
> > at
> >
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
> > at
> >
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
> > at
> >
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
> > at
> >
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> > at
> >
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783)
> > at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)
> > at
> >
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
> > at
> >
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)
> > at
> >
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
> > at
> >
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)
> > at
> >
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> > at
> >
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
> > at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
> > at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
> > at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
> > at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
> > at
> >
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> > at
> >
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > at java.lang.Thread.run(Thread.java:662)
> > Caused by: org.apache.flume.ChannelException: Space for commit to queue
> > couldn't be acquired Sinks are likely not keeping up with sources, or the
> > buffer size is too tight
> > at
> >
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:91)
> > at
> >
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> > at
> >
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> > ... 28 more
> >
> > then the following exceptions on the agent
> >
> > 2012-10-16 10:51:02,942 (SinkRunner-PollingRunner-FailoverSinkProcessor)
> > [WARN -
> >
> org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)]
> > Sink avroSink-1 failed and has been sent to failover list
> > org.apache.flume.EventDeliveryException: Failed to send events
> > at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
> > at
> >
> org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
> > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> > at java.lang.Thread.run(Thread.java:662)
> > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> > host: localhost, port: 4545 }: Failed to send batch
> > at
> >
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
> > at org.apache.flume.sink.AvroSink.process(AvroSink.java:309)
> > ... 3 more
> > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> > host: localhost, port: 4545 }: Exception thrown from remote handler
> > at
> >
> org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:318)
> > at
> >
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
> > at
> >
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
> > ... 4 more
> > Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> > NettyTransceiver closed
> > at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
> > at
> >
> org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:310)
> > ... 6 more
> > Caused by: java.io.IOException: NettyTransceiver closed
> > at
> >
> org.apache.avro.ipc.NettyTransceiver.disconnect(NettyTransceiver.java:338)
> > at
> org.apache.avro.ipc.NettyTransceiver.access$200(NettyTransceiver.java:59)
> > at
> >
> org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:496)
> > at
> >
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> > at
> >
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:783)
> > at
> >
> org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:348)
> > at
> >
> org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:232)
> > at
> >
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:98)
> > at
> >
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
> > at
> >
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
> > at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:404)
> > at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:602)
> > at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:358)
> > at
> >
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
> > at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
> > at
> >
> org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> > at
> >
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:44)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > ... 1 more
> > 2012-10-16 10:51:04,040 (pool-4-thread-1) [INFO -
> >
> org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:201)]
> > Preparing to move file
> /home/cgandevia/Desktop/flume_test/ready/test.log.27
> > to /home/cgandevia/Desktop/flume_test/ready/test.log.27.COMPLETED
> > 2012-10-16 10:51:07,119 (SinkRunner-PollingRunner-FailoverSinkProcessor)
> > [WARN -
> >
> org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)]
> > Sink avroSink-2 failed and has been sent to failover list
> > org.apache.flume.EventDeliveryException: Failed to send events
> > at org.apache.flume.sink.AvroSink.process(AvroSink.java:325)
> > at
> >
> org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)
> > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> > at java.lang.Thread.run(Thread.java:662)
> > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> > host: localhost, port: 4546 }: Failed to send batch
> > at
> >
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:236)
> > at org.apache.flume.sink.AvroSink.process(AvroSink.java:309)
> > ... 3 more
> > Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient {
> > host: localhost, port: 4546 }: Avro RPC call returned Status: FAILED
> > at
> >
> org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:312)
> > at
> >
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:295)
> > at
> >
> org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:224)
> > ... 4 more
> > 2012-10-16 10:51:07,120 (SinkRunner-PollingRunner-FailoverSinkProcessor)
> > [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]
> > Unable to deliver event. Exception follows.
> > org.apache.flume.EventDeliveryException: All sinks failed to process,
> > nothing left to failover to
> > at
> >
> org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:191)
> > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> > at java.lang.Thread.run(Thread.java:662)
> > 2012-10-16 10:51:12,120 (SinkRunner-PollingRunner-FailoverSinkProcessor)
> > [INFO -
> org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
> > Avro sink avroSink-1: Building RpcClient with hostname: 127.0.0.1, port:
> > 4545
> > 2012-10-16 10:51:12,188 (SinkRunner-PollingRunner-FailoverSinkProcessor)
> > [INFO -
> org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
> > Avro sink avroSink-2: Building RpcClient with hostname: 127.0.0.1, port:
> > 4546
> >
> > Once this happens flume never resumes.
> >
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce -
> http://incubator.apache.org/mrunit/
>



-- 
Thanks

Cameron Gandevia

Mime
View raw message