flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Silvio Di gregorio <silvio.digrego...@gmail.com>
Subject Re: duplicated events with kafka channel and hdfs sink
Date Fri, 02 Dec 2016 10:59:05 GMT
First of all I am not an expert I'm just sharing my experiences with
you. It should be :
"Delivey guarantees" of flume is "at least once", and when errors ooccurs
on sink what was written in hdfs, on restart, could it be replicated?
quote of "Using Flume Stream Data ...." Book :
"*Errors like network timeouts or partial writes to storage systems could
cause events to get written more than once, though, since Flume will retry
writes until they are completely successful*".

You could avoid the HDFS sink and read directly from Kafka Topics, then
configure a Flume with no Sink. Only Source and Kafka Channel.
have nice day


2016-12-02 11:12 GMT+01:00 Silvio Di gregorio <silvio.digregorio@gmail.com>:

> hi,
> are you ingesting streaming data?
>
> 2016-12-02 10:56 GMT+01:00 Roberto Coluccio <roberto.coluccio@eng.it>:
>
>> Hello again guys,
>>
>> sorry to bother, I'm kinda new to the Flume world and I'm running
>> experiments to evaluate pros and cons about diverse topologies and settings.
>>
>> I'm currently experiencing the following issue:
>>
>> Agent topology made of: JMS source --> Kafka Channel --> HDFS sink
>> Source and Sink batch size, and Channel transactionCapacity = 1000
>> Channel capacity = 10000
>> Channel kafka.consumer.auto.offset.reset = latest
>> Channel migrateZookeeperOffsets = false
>>
>> I'm trying to confirm the pair (channel, sink) is fault tolerant, and
>> such an agent is capable of being re-started after being terminated,
>> resuming its draining from the channel from the point (offset) where it was
>> left.
>>
>> I have a CDH5.8 cluster and launch my agent with a script that calls
>> flume-ng, passing it a custom configuration I make available on a certain
>> file.
>>
>> Given x messages previously pushed on the input JMS queue, I:
>>
>>
>>    1. start the agent through my script
>>    2. verify it opens up a new file on hdfs and starts consuming events
>>    (it writes 1 event per line)
>>    3. stop the agent (CTRL+C on the open shell session)
>>    4. re-start the agent through my script
>>    5. wait until it completes its draining
>>    6. count the lines written across all the generated files
>>
>> What I experienced is that:
>>
>>    1. when the script/process termination at step 3 is graceful (i.e. no
>>    exceptions are raised), I successfully verify that only x messages were
>>    written on HDFS;
>>    2. when the script/process termination at step 3 is followed by the
>>    exception:
>>
>> 16/12/02 09:19:46 WARN jms.JMSSource: JMSException consuming events
>> javax.jms.JMSException: InterruptedException has occurred while waiting
>> for server response
>>     at com.tibco.tibjms.Tibjmsx.buildException(Tibjmsx.java:502)
>>     at com.tibco.tibjms.TibjmsxLink.sendRequest(TibjmsxLink.java:364)
>>     at com.tibco.tibjms.TibjmsxLink.sendRequestMsg(TibjmsxLink.java:293)
>>     at com.tibco.tibjms.TibjmsxSessionImp._processNoWait(TibjmsxSes
>> sionImp.java:3548)
>>     at com.tibco.tibjms.TibjmsxSessionImp._receive(TibjmsxSessionIm
>> p.java:1947)
>>     at com.tibco.tibjms.TibjmsMessageConsumer._receive(
>> TibjmsMessageConsumer.java:240)
>>     at com.tibco.tibjms.TibjmsMessageConsumer.receiveNoWait(TibjmsM
>> essageConsumer.java:492)
>>     at org.apache.flume.source.jms.JMSMessageConsumer.take(JMSMessa
>> geConsumer.java:127)
>>     at org.apache.flume.source.jms.JMSSource.doProcess(JMSSource.ja
>> va:261)
>>     at org.apache.flume.source.AbstractPollableSource.process(Abstr
>> actPollableSource.java:58)
>>     at org.apache.flume.source.PollableSourceRunner$PollingRunner.
>> run(PollableSourceRunner.java:137)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> then I successfully verify that only x messages were written on HDFS;
>>
>>
>>    1. when the script/process termination at step 3 is followed by the
>>    exception:
>>
>> ^C16/12/02 09:10:28 INFO lifecycle.LifecycleSupervisor: Stopping
>> lifecycle supervisor 11
>> 16/12/02 09:10:28 ERROR hdfs.HDFSEventSink: process failed
>> java.lang.InterruptedException: Timed out before HDFS call was made.
>> Your hdfs.callTimeout might be set too low or HDFS calls are taking too
>> long.
>>     at org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterru
>> ptedException(BucketWriter.java:660)
>>     at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.
>> java:483)
>>     at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSi
>> nk.java:418)
>>     at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSi
>> nkProcessor.java:68)
>>     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>     at java.lang.Thread.run(Thread.java:745)
>> 16/12/02 09:10:28 ERROR flume.SinkRunner: Unable to deliver event.
>> Exception follows.
>> org.apache.flume.EventDeliveryException: java.lang.InterruptedException:
>> Timed out before HDFS call was made. Your hdfs.callTimeout might be set too
>> low or HDFS calls are taking too long.
>>     at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSi
>> nk.java:463)
>>     at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSi
>> nkProcessor.java:68)
>>     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>     at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.InterruptedException: Timed out before HDFS call
>> was made. Your hdfs.callTimeout might be set too low or HDFS calls are
>> taking too long.
>>     at org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterru
>> ptedException(BucketWriter.java:660)
>>     at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.
>> java:483)
>>     at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSi
>> nk.java:418)
>>     ... 3 more
>> 16/12/02 09:10:33 INFO hdfs.HDFSEventSink: Closing ...
>>
>> .... logs continue ... then:
>>
>> 16/12/02 09:10:34 INFO instrumentation.MonitoredCounterGroup: Shutdown
>> Metric for type: CHANNEL, name: chl_jms_kafka. channel.rollback.count ==
>> *<aNumberGreate**rTh**e**nZero>*
>>
>> then I get written on HDFS my initial x messages *PLUS* *<aNumberGreate*
>> *rTh**e*
>>
>> *nZero> !!! *
>>
>> This behaviour is really frustrating and I don't understand how to avoid
>> those duplicates. As a side note, the same experiment but with a brutal
>> agent termination at step 3 (related process's pid kill -9) does not
>> produce duplicates!
>>
>> I will appreciate any help on this (for me crucial) topic.
>>
>> Thank you,
>>
>> Roberto
>>
>>
>

Mime
View raw message