flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tao Li <litao.bupt...@gmail.com>
Subject Re: [HDFSEventSink] Endless loop when HDFSEventSink.process() thorws exception
Date Fri, 17 Apr 2015 19:22:45 GMT
Why I design like this, it's on the follwing thoughts:
I want to take "KafkaChanel1 => Kafka Cluster => KafkaChannel2" as one
channel. So it will simply be ScribeSouce put events to it, and
HDFSEventSink take events from it. The kafka cluster provide a stable
storage, and be transparent on events delivery between source and sink. (If
I use a "KafkaSource=>MemoryChannel=>HDFSEventSink" to export data from
kafka to hdfs, the memory isn unstable, and not transparent)
So the work flow is simply like this:
ScribeClient =>ScribeSource => KafkaChannel(distributed) => HDFSEventSink

As Interceptor is following the source, *so maybe I should add the filter
interceptor after ScribeSource*, like this:
ScribeClient =>ScribeSource => *FilterInterceptor* =>
KafkaChannel(distributed) => HDFSEventSink => HDFS

2015-04-18 2:51 GMT+08:00 Tao Li <litao.buptsse@gmail.com>:

> @Gwen @Hari
> My use case is as follows:
> ScribeClient => [Agent1: ScribeSource => KafkaChannel1] => Kafka Cluster
> => [Agent2: KafkaCluster2 => HDFSEventSink] => HDFS
> The bad case is as follows:
> My HDFSEventSink need a header "*timestamp*", but some dirty data(by
> mistake) in Kafka doesn't has the "timestamp" headers, which cause the
> following BucketPath.escapeString thows *NullPointerException*.
> String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
> timeZone, needRounding, roundUnit, roundValue, useLocalTime);
> *I think Gwen's second point is OK, we can add a interceptor to do the
> filter job.*
> But my flume agent is kind of special:
> For Agent1, doesn't have sink, directly send message to kafak cluster by
> KafkaChannel1.
> For Agent2, doesn't have source, directly poll event from kafka cluster by
> KafkaChannel2.
> Agent1 and Agent2 is different JVM and deploy on different node.
> *I don't know if it's reasonable for a agent with no sink or no source?* But
> I have already build the whold work flow, and it's works well for me for
> regular cases.
> *For Agent2, because of without source, so I can't use Gwen's Interceptor
> suggestion.*
> 2015-04-18 2:30 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:
>> What I think he means is that a message in the channel that cannot be
>> serialized by the serializer because it is malformed causing the serializer
>> to fail and perhaps throw (think malformed Avro). Such a message basically
>> would be stuck in an infinite loop. So the workaround in (2) would work if
>> using a Kafka Source.
>> Thanks,
>> Hari
>> On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <litao.buptsse@gmail.com> wrote:
>>> OK, I got it, Thanks.
>>> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:
>>>> Are you using Kafka channel? The fix I mentioned was for file channel.
>>>> Unfortunately, we don't plan to introduce something that drops data in real
>>>> time. This makes it too easy for a misconfig to cause data loss. You'd have
>>>> to ensure the data in the Kafka channel is valid.
>>>> Thanks,
>>>> Hari
>>>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <litao.buptsse@gmail.com>
>>>> wrote:
>>>>> @Hari, you mean I need to ensure the data in kafka is OK by myself,
>>>>> right?
>>>>> How about we have a config to let user decide how to handle BACKOFF.
>>>>>  For example, we can config the max retry num in process(), and also
>>>>> config wether commit or not when exceed the max retry num.(In my kafka
>>>>> case, when meet dirty data, commit the comsume offset will be nice for
>>>>> than endless loop)
>>>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>
>>>>> :
>>>>>> We recently added functionality to the file channel integrity tool
>>>>>> that can be used to remove bad events from the channel - though you
>>>>>> need to write some code to validate events. It will be in the soon
to be
>>>>>> released 1.6.0
>>>>>> Thanks,
>>>>>> Hari
>>>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <litao.buptsse@gmail.com>
>>>>>> wrote:
>>>>>>> Hi all:
>>>>>>> My use case is KafkaChannel + HDFSEventSink.
>>>>>>> I found that SinkRunner.PollingRunner will call
>>>>>>> HDFSEventSink.process() in a while loop. For example, a message
in kafka
>>>>>>> contains dirty data, so HDFSEventSink.process() consume message
from kafka,
>>>>>>> throws exception because of *dirty data*, and *kafka offset doesn't
>>>>>>> commit*. And the outer loop, will continue call
>>>>>>> HDFSEventSink.process(). Because the kafka offset doesn't change,
>>>>>>> HDFSEventSink will consume the dirty data *again*. The bad loop
is *never
>>>>>>> stopped*.
>>>>>>>  *I want to know that if we have a **mechanism to cover this
>>>>>>> For example, we have a max retry num for a unique HDFSEventSink.process()
>>>>>>> call and give up when exceed max limit.

View raw message