flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roberto Coluccio <roberto.coluc...@eng.it>
Subject duplicated events with kafka channel and hdfs sink
Date Fri, 02 Dec 2016 09:56:25 GMT
Hello again guys,

sorry to bother, I'm kinda new to the Flume world and I'm running 
experiments to evaluate pros and cons about diverse topologies and settings.

I'm currently experiencing the following issue:

Agent topology made of: JMS source --> Kafka Channel --> HDFS sink
Source and Sink batch size, and Channel transactionCapacity = 1000
Channel capacity = 10000
Channel kafka.consumer.auto.offset.reset = latest
Channel migrateZookeeperOffsets = false

I'm trying to confirm the pair (channel, sink) is fault tolerant, and 
such an agent is capable of being re-started after being terminated, 
resuming its draining from the channel from the point (offset) where it 
was left.

I have a CDH5.8 cluster and launch my agent with a script that calls 
flume-ng, passing it a custom configuration I make available on a 
certain file.

Given x messages previously pushed on the input JMS queue, I:

 1. start the agent through my script
 2. verify it opens up a new file on hdfs and starts consuming events
    (it writes 1 event per line)
 3. stop the agent (CTRL+C on the open shell session)
 4. re-start the agent through my script
 5. wait until it completes its draining
 6. count the lines written across all the generated files

What I experienced is that:

 1. when the script/process termination at step 3 is graceful (i.e. no
    exceptions are raised), I successfully verify that only x messages
    were written on HDFS;
 2. when the script/process termination at step 3 is followed by the
    exception:

    16/12/02 09:19:46 WARN jms.JMSSource: JMSException consuming events
    javax.jms.JMSException: InterruptedException has occurred while
    waiting for server response
         at com.tibco.tibjms.Tibjmsx.buildException(Tibjmsx.java:502)
         at com.tibco.tibjms.TibjmsxLink.sendRequest(TibjmsxLink.java:364)
         at
    com.tibco.tibjms.TibjmsxLink.sendRequestMsg(TibjmsxLink.java:293)
         at
    com.tibco.tibjms.TibjmsxSessionImp._processNoWait(TibjmsxSessionImp.java:3548)
         at
    com.tibco.tibjms.TibjmsxSessionImp._receive(TibjmsxSessionImp.java:1947)
         at
    com.tibco.tibjms.TibjmsMessageConsumer._receive(TibjmsMessageConsumer.java:240)
         at
    com.tibco.tibjms.TibjmsMessageConsumer.receiveNoWait(TibjmsMessageConsumer.java:492)
         at
    org.apache.flume.source.jms.JMSMessageConsumer.take(JMSMessageConsumer.java:127)
         at
    org.apache.flume.source.jms.JMSSource.doProcess(JMSSource.java:261)
         at
    org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:58)
         at
    org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:137)
         at java.lang.Thread.run(Thread.java:745)

    then I successfully verify that only x messages were written on HDFS;

 3. when the script/process termination at step 3 is followed by the
    exception:

    ^C16/12/02 09:10:28 INFO lifecycle.LifecycleSupervisor: Stopping
    lifecycle supervisor 11
    16/12/02 09:10:28 ERROR hdfs.HDFSEventSink: process failed
    java.lang.InterruptedException: Timed out before HDFS call was made.
    Your hdfs.callTimeout might be set too low or HDFS calls are taking
    too long.
         at
    org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterruptedException(BucketWriter.java:660)
         at
    org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:483)
         at
    org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:418)
         at
    org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
         at
    org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
         at java.lang.Thread.run(Thread.java:745)
    16/12/02 09:10:28 ERROR flume.SinkRunner: Unable to deliver event.
    Exception follows.
    org.apache.flume.EventDeliveryException:
    java.lang.InterruptedException: Timed out before HDFS call was made.
    Your hdfs.callTimeout might be set too low or HDFS calls are taking
    too long.
         at
    org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
         at
    org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
         at
    org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
         at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.InterruptedException: Timed out before HDFS
    call was made. Your hdfs.callTimeout might be set too low or HDFS
    calls are taking too long.
         at
    org.apache.flume.sink.hdfs.BucketWriter.checkAndThrowInterruptedException(BucketWriter.java:660)
         at
    org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:483)
         at
    org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:418)
         ... 3 more
    16/12/02 09:10:33 INFO hdfs.HDFSEventSink: Closing ...

    .... logs continue ... then:

    16/12/02 09:10:34 INFO instrumentation.MonitoredCounterGroup:
    Shutdown Metric for type: CHANNEL, name: chl_jms_kafka.
    channel.rollback.count == *<aNumberGreate**rTh**e**nZero>*

    then I get written on HDFS my initial x messages *_PLUS_*
    *<aNumberGreate**rTh**e**nZero> !!!

    *

This behaviour is really frustrating and I don't understand how to avoid 
those duplicates. As a side note, the same experiment but with a brutal 
agent termination at step 3 (related process's pid kill -9) does not 
produce duplicates!

I will appreciate any help on this (for me crucial) topic.

Thank you,

Roberto


Mime
View raw message