flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 周梦想 <abloz...@gmail.com>
Subject Re: Take list for MemoryTransaction, capacity 100 full?
Date Wed, 27 Feb 2013 02:29:28 GMT
Hi Hari,
here is my main configure:
agent46.sources = userlogsrc gamelogsrc
agent46.channels = memch1
agent46.sinks = myhdfssink

 agent46.sinks.myhdfssink.hdfs.batchSize = 50
agent46.channels.memch1.type = memory
agent46.channels.memch1.capacity = 100000
agent46.channels.memch1.transactionCapactiy = 1000
agent46.sources.userlogsrc.type = avro
agent46.sources.gamelogsrc.type = avro

I have 8 avro sink to send data to agent46 sources, and evero avro sink's
batchsize is set to 40.
but it report the HDFS IO error and MemoryChannel exception.
HDFS IO error only appears a time. After that,it's about 2 minutes to
report ChannelException,and disconnect all agents and reconnect again.

It is caused by HDFS is too slow to flush? or other problem? How about the
configure relationship between source and channel and sink?

thank you!
Andy

below is some log:
2013-02-26 19:00:39,757 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[WARN -
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]
HDFS IO error
java.io.IOException: Callable timed out after 10000 ms
        at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:352)
        at
org.apache.flume.sink.hdfs.HDFSEventSink.append(HDFSEventSink.java:727)
        at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:722)
Caused by: java.util.concurrent.TimeoutException
        at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:258)
        at java.util.concurrent.FutureTask.get(FutureTask.java:119)
        at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:345)
        ... 5 more

2013-02-26 19:00:39,758 (pool-8-thread-4) [ERROR -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
source userlogsrc: Unable to process event batch. Exception follows.
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel.MemoryChannel{name: memch1}
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
        at
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
        at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)

2013-02-26 19:02:16,023 (pool-8-thread-4) [DEBUG -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro
source userlogsrc: Received avro event batch of 40 events.
2013-02-26 19:02:23,602 (pool-8-thread-7) [ERROR -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
source userlogsrc: Unable to process event batch. Exception follows.
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel.MemoryChannel{name: memch1}
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
        at
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
        at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at
org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
        at org.apache.avro.ipc.Responder.respond(Responder.java:149)
        at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
        at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
        at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
        at
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelException: Space for commit to queue
couldn't be acquired Sinks are likely not keeping up with sources, or the
buffer size is too tight
        at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
        at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        ... 28 more
2013-02-26 19:02:23,603 (pool-8-thread-9) [ERROR -
org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
source userlogsrc: Unable to process event batch. Exception follows.
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at
org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
        at org.apache.avro.ipc.Responder.respond(Responder.java:149)
        at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
        at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
        at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
        at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
        at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
        at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
        at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
        at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
        at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
        at
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)
Caused by: org.apache.flume.ChannelException: Space for commit to queue
couldn't be acquired Sinks are likely not keeping up with sources, or the
buffer size is too tight
        at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
        at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
        ... 28 more
2013-02-26 19:02:28,132 (pool-8-thread-7) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x47e9e334, /192.168.133.44:17031 :> /192.168.133.46:5140] DISCONNECTED
2013-02-26 19:02:28,132 (pool-8-thread-7) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x47e9e334, /192.168.133.44:17031 :> /192.168.133.46:5140] UNBOUND
2013-02-26 19:02:28,132 (pool-8-thread-7) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)]
[id: 0x47e9e334, /192.168.133.44:17031 :> /192.168.133.46:5140] CLOSED
2013-02-26 19:02:28,133 (pool-8-thread-7) [INFO -
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)]
Connection to /192.168.133.44:17031 disconnected.


2013/2/26 Hari Shreedharan <hshreedharan@cloudera.com>

>  This is because the memory channel has a default transaction capacity of
> 100. Increasing it (or keeping sinks's batchSize < transaction capacity of
> the channel will fix the issue). See
> http://flume.apache.org/FlumeUserGuide.html#memory-channel for more
> details.
>
> Hari
>
> --
> Hari Shreedharan
>
> On Monday, February 25, 2013 at 11:41 PM, 周梦想 wrote:
>
> I found if agent46.sinks.myhdfssink.hdfs.batchSize >= 100, it will report
> this error.
> if I set this configure to 10, it's ok. but it's a bit slower.
>
> Best Regards,
> Andy
>
> 2013/2/26 周梦想 <ablozhou@gmail.com>
>
> more logs:
>
> 2013-02-26 14:37:00,380 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
> to deliver event. Exception follows.
> org.apache.flume.EventDeliveryException:
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:464)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Take list for
> MemoryTransaction, capacity 100 full, consider committing more frequently,
> increasing capacity, or increasing thread count
>          at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
>         ... 3 more
> 2013-02-26 14:37:02,854 (pool-7-thread-1) [ERROR -
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:261)] Avro
> source userlogsrc: Unable to process event batch. Exception follows.
> org.apache.flume.ChannelException: Unable to put batch on required
> channel: org.apache.flume.channel.MemoryChannel{name: memch1}
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
>         at
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:259)
>         at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at
> org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:88)
>         at org.apache.avro.ipc.Responder.respond(Responder.java:149)
>         at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
>         at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at
> org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
>         at
> org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at
> org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
>         at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
>         at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
>         at
> org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Space for commit to queue
> couldn't be acquired Sinks are likely not keeping up with sources, or the
> buffer size is too tight
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
>         ... 28 more
>
>
> 2013/2/26 周梦想 <ablozhou@gmail.com>
>
> hello,
> I using flume-ng send data from windows to linux hdfs through avro
> protocol, and encountered this error:
>
> 2013-02-26 12:21:02,908 (pool-8-thread-1) [DEBUG -
> org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:244)] Avro
> source userlogsrc: Received avro event batch of 100 events.
>
> 2013-02-26 12:21:03,107 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:460)]
> process failed
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:100)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
>         at
> org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:391)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: org.apache.flume.ChannelException: Space for commit to queue
> couldn't be acquired Sinks are likely not keeping up with sources, or the
> buffer size is too tight
>         at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
>         at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
>         ... 28 more
>
> I have set memory channel capacity to 1000, but it still report this error.
> some one can give me any advice?
>
> Thanks,
> Andy
>
> hdfs.conf:
>
> agent46.sources = userlogsrc gamelogsrc
> agent46.channels = memch1
> agent46.sinks = myhdfssink
>
> #channels:
> agent46.channels.memch1.type = memory
> agent46.channels.memch1.capacity = 10000
> agent46.channels.memch1.transactionCapactiy = 100
> #sources:
> #userlogsrc:
> #agent46.sources.userlogsrc.type = syslogTcp
> agent46.sources.userlogsrc.type = avro
> agent46.sources.userlogsrc.port = 5140
> agent46.sources.userlogsrc.bind= 0.0.0.0
> #agent46.sources.userlogsrc.host= hadoop48
> agent46.sources.userlogsrc.interceptors = i1 i2 i3
> agent46.sources.userlogsrc.interceptors.i1.type =
> org.apache.flume.interceptor.HostInterceptor$Builder
> agent46.sources.userlogsrc.interceptors.i1.preserveExisting = true
> #agent46.sources.userlogsrc.interceptors.i1.hostHeader = hostname
> agent46.sources.userlogsrc.interceptors.i1.useIP = false
>  agent46.sources.userlogsrc.interceptors.i2.type =
> org.apache.flume.interceptor.TimestampInterceptor$Builder
> agent46.sources.userlogsrc.interceptors.i3.type = static
> agent46.sources.userlogsrc.interceptors.i3.key = datacenter
> agent46.sources.userlogsrc.interceptors.i3.value = userdata
> agent46.sources.userlogsrc.channels = memch1
> #gamelogsrc:
> #agent46.sources.gamelogsrc.type = syslogTcp
> agent46.sources.gamelogsrc.type = avro
> agent46.sources.gamelogsrc.port = 5150
> agent46.sources.gamelogsrc.bind= 0.0.0.0
> agent46.sources.gamelogsrc.channels = memch1
> #sinks:
> agent46.sinks.myhdfssink.channel = memch1
>
> agent46.sinks.myhdfssink.type = hdfs
> agent46.sinks.myhdfssink.hdfs.rollInterval = 120
> agent46.sinks.myhdfssink.hdfs.appendTimeout = 1000
> agent46.sinks.myhdfssink.hdfs.rollSize = 209715200
> agent46.sinks.myhdfssink.hdfs.rollCount = 600000
> agent46.sinks.myhdfssink.hdfs.batchSize = 1000
> agent46.sinks.myhdfssink.hdfs.txnEventMax = 100000
> agent46.sinks.myhdfssink.hdfs.threadsPoolSize= 100
> agent46.sinks.myhdfssink.hdfs.path = hdfs://h46:9000/flume/%{filename}/%m%d
> #agent46.sinks.myhdfssink.hdfs.filePrefix = userlogsrc.%{host}
> #agent46.sinks.myhdfssink.hdfs.filePrefix =
> %{filename}.%{hostname}.%{datacenter}.%Y%m%d
> agent46.sinks.myhdfssink.hdfs.filePrefix = %{filename}.%{host}.%Y%m%d
> #agent46.sinks.myhdfssink.hdfs.rollInterval = 60
> #agent46.sinks.myhdfssink.hdfs.fileType = SequenceFile
> agent46.sinks.myhdfssink.hdfs.fileType = DataStream
> #agent46.sinks.myhdfssink.hdfs.file.writeFormat= Text
>
>
>
>
>
>

Mime
View raw message