flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Silvio Di gregorio <silvio.digrego...@gmail.com>
Subject Re: duplicated events with kafka channel and hdfs sink
Date Fri, 02 Dec 2016 10:12:45 GMT
hi,
are you ingesting streaming data?

2016-12-02 10:56 GMT+01:00 Roberto Coluccio <roberto.coluccio@eng.it>:

> Hello again guys,
>
> sorry to bother, I'm kinda new to the Flume world and I'm running
> experiments to evaluate pros and cons about diverse topologies and settings.
>
> I'm currently experiencing the following issue:
>
> Agent topology made of: JMS source --> Kafka Channel --> HDFS sink
> Source and Sink batch size, and Channel transactionCapacity = 1000
> Channel capacity = 10000
> Channel kafka.consumer.auto.offset.reset = latest
> Channel migrateZookeeperOffsets = false
>
> I'm trying to confirm the pair (channel, sink) is fault tolerant, and such
> an agent is capable of being re-started after being terminated, resuming
> its draining from the channel from the point (offset) where it was left.
>
> I have a CDH5.8 cluster and launch my agent with a script that calls
> flume-ng, passing it a custom configuration I make available on a certain
> file.
>
> Given x messages previously pushed on the input JMS queue, I:
>
>
>    1. start the agent through my script
>    2. verify it opens up a new file on hdfs and starts consuming events
>    (it writes 1 event per line)
>    3. stop the agent (CTRL+C on the open shell session)
>    4. re-start the agent through my script
>    5. wait until it completes its draining
>    6. count the lines written across all the generated files
>
> What I experienced is that:
>
>    1. when the script/process termination at step 3 is graceful (i.e. no
>    exceptions are raised), I successfully verify that only x messages were
>    written on HDFS;
>    2. when the script/process termination at step 3 is followed by the
>    exception:
>
> 16/12/02 09:19:46 WARN jms.JMSSource: JMSException consuming events
> javax.jms.JMSException: InterruptedException has occurred while waiting
> for server response
>     at com.tibco.tibjms.Tibjmsx.buildException(Tibjmsx.java:502)
>     at com.tibco.tibjms.TibjmsxLink.sendRequest(TibjmsxLink.java:364)
>     at com.tibco.tibjms.TibjmsxLink.sendRequestMsg(TibjmsxLink.java:293)
>     at com.tibco.tibjms.TibjmsxSessionImp._processNoWait(
> TibjmsxSessionImp.java:3548)
>     at com.tibco.tibjms.TibjmsxSessionImp._receive(
> TibjmsxSessionImp.java:1947)
>     at com.tibco.tibjms.TibjmsMessageConsumer._
> receive(TibjmsMessageConsumer.java:240)
>     at com.tibco.tibjms.TibjmsMessageConsumer.receiveNoWait(
> TibjmsMessageConsumer.java:492)
>     at org.apache.flume.source.jms.JMSMessageConsumer.take(
> JMSMessageConsumer.java:127)
>     at org.apache.flume.source.jms.JMSSource.doProcess(JMSSource.java:261)
>     at org.apache.flume.source.AbstractPollableSource.process(
> AbstractPollableSource.java:58)
>     at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(
> PollableSourceRunner.java:137)
>     at java.lang.Thread.run(Thread.java:745)
>
> then I successfully verify that only x messages were written on HDFS;
>
>
>    1. when the script/process termination at step 3 is followed by the
>    exception:
>
> ^C16/12/02 09:10:28 INFO lifecycle.LifecycleSupervisor: Stopping lifecycle
> supervisor 11
> 16/12/02 09:10:28 ERROR hdfs.HDFSEventSink: process failed
> java.lang.InterruptedException: Timed out before HDFS call was made. Your
> hdfs.callTimeout might be set too low or HDFS calls are taking too long.
>     at org.apache.flume.sink.hdfs.BucketWriter.
> checkAndThrowInterruptedException(BucketWriter.java:660)
>     at org.apache.flume.sink.hdfs.BucketWriter.append(
> BucketWriter.java:483)
>     at org.apache.flume.sink.hdfs.HDFSEventSink.process(
> HDFSEventSink.java:418)
>     at org.apache.flume.sink.DefaultSinkProcessor.process(
> DefaultSinkProcessor.java:68)
>     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>     at java.lang.Thread.run(Thread.java:745)
> 16/12/02 09:10:28 ERROR flume.SinkRunner: Unable to deliver event.
> Exception follows.
> org.apache.flume.EventDeliveryException: java.lang.InterruptedException:
> Timed out before HDFS call was made. Your hdfs.callTimeout might be set too
> low or HDFS calls are taking too long.
>     at org.apache.flume.sink.hdfs.HDFSEventSink.process(
> HDFSEventSink.java:463)
>     at org.apache.flume.sink.DefaultSinkProcessor.process(
> DefaultSinkProcessor.java:68)
>     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.InterruptedException: Timed out before HDFS call was
> made. Your hdfs.callTimeout might be set too low or HDFS calls are taking
> too long.
>     at org.apache.flume.sink.hdfs.BucketWriter.
> checkAndThrowInterruptedException(BucketWriter.java:660)
>     at org.apache.flume.sink.hdfs.BucketWriter.append(
> BucketWriter.java:483)
>     at org.apache.flume.sink.hdfs.HDFSEventSink.process(
> HDFSEventSink.java:418)
>     ... 3 more
> 16/12/02 09:10:33 INFO hdfs.HDFSEventSink: Closing ...
>
> .... logs continue ... then:
>
> 16/12/02 09:10:34 INFO instrumentation.MonitoredCounterGroup: Shutdown
> Metric for type: CHANNEL, name: chl_jms_kafka. channel.rollback.count ==
> *<aNumberGreate**rTh**e**nZero>*
>
> then I get written on HDFS my initial x messages *PLUS* *<aNumberGreate*
> *rTh**e*
>
> *nZero> !!! *
>
> This behaviour is really frustrating and I don't understand how to avoid
> those duplicates. As a side note, the same experiment but with a brutal
> agent termination at step 3 (related process's pid kill -9) does not
> produce duplicates!
>
> I will appreciate any help on this (for me crucial) topic.
>
> Thank you,
>
> Roberto
>
>

Mime
View raw message