flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <gshap...@cloudera.com>
Subject Re: [HDFSEventSink] Endless loop when HDFSEventSink.process() thorws exception
Date Fri, 17 Apr 2015 19:26:17 GMT
This looks like the right design to me.

On Fri, Apr 17, 2015 at 12:22 PM, Tao Li <litao.buptsse@gmail.com> wrote:
> Why I design like this, it's on the follwing thoughts:
> I want to take "KafkaChanel1 => Kafka Cluster => KafkaChannel2" as one
> channel. So it will simply be ScribeSouce put events to it, and
> HDFSEventSink take events from it. The kafka cluster provide a stable
> storage, and be transparent on events delivery between source and sink. (If
> I use a "KafkaSource=>MemoryChannel=>HDFSEventSink" to export data from
> kafka to hdfs, the memory isn unstable, and not transparent)
> So the work flow is simply like this:
> ScribeClient =>ScribeSource => KafkaChannel(distributed) => HDFSEventSink =>
> HDFS
>
> As Interceptor is following the source, so maybe I should add the filter
> interceptor after ScribeSource, like this:
> ScribeClient =>ScribeSource => FilterInterceptor =>
> KafkaChannel(distributed) => HDFSEventSink => HDFS
>
> 2015-04-18 2:51 GMT+08:00 Tao Li <litao.buptsse@gmail.com>:
>>
>> @Gwen @Hari
>>
>> My use case is as follows:
>> ScribeClient => [Agent1: ScribeSource => KafkaChannel1] => Kafka Cluster
>> => [Agent2: KafkaCluster2 => HDFSEventSink] => HDFS
>>
>> The bad case is as follows:
>> My HDFSEventSink need a header "timestamp", but some dirty data(by
>> mistake) in Kafka doesn't has the "timestamp" headers, which cause the
>> following BucketPath.escapeString thows NullPointerException.
>> String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
>> timeZone, needRounding, roundUnit, roundValue, useLocalTime);
>>
>> I think Gwen's second point is OK, we can add a interceptor to do the
>> filter job.
>>
>> But my flume agent is kind of special:
>> For Agent1, doesn't have sink, directly send message to kafak cluster by
>> KafkaChannel1.
>> For Agent2, doesn't have source, directly poll event from kafka cluster by
>> KafkaChannel2.
>> Agent1 and Agent2 is different JVM and deploy on different node.
>>
>> I don't know if it's reasonable for a agent with no sink or no source? But
>> I have already build the whold work flow, and it's works well for me for
>> regular cases.
>>
>> For Agent2, because of without source, so I can't use Gwen's Interceptor
>> suggestion.
>>
>> 2015-04-18 2:30 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:
>>>
>>> What I think he means is that a message in the channel that cannot be
>>> serialized by the serializer because it is malformed causing the serializer
>>> to fail and perhaps throw (think malformed Avro). Such a message basically
>>> would be stuck in an infinite loop. So the workaround in (2) would work if
>>> using a Kafka Source.
>>>
>>> Thanks,
>>> Hari
>>>
>>>
>>> On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <litao.buptsse@gmail.com> wrote:
>>>>
>>>> OK, I got it, Thanks.
>>>>
>>>> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:
>>>>>
>>>>> Are you using Kafka channel? The fix I mentioned was for file channel.
>>>>> Unfortunately, we don't plan to introduce something that drops data in
real
>>>>> time. This makes it too easy for a misconfig to cause data loss. You'd
have
>>>>> to ensure the data in the Kafka channel is valid.
>>>>>
>>>>> Thanks,
>>>>> Hari
>>>>>
>>>>>
>>>>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <litao.buptsse@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>> @Hari, you mean I need to ensure the data in kafka is OK by myself,
>>>>>> right?
>>>>>>
>>>>>> How about we have a config to let user decide how to handle BACKOFF.
>>>>>> For example, we can config the max retry num in process(), and also
>>>>>> config wether commit or not when exceed the max retry num.(In my
kafka case,
>>>>>> when meet dirty data, commit the comsume offset will be nice for
me than
>>>>>> endless loop)
>>>>>>
>>>>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan
>>>>>> <hshreedharan@cloudera.com>:
>>>>>>>
>>>>>>> We recently added functionality to the file channel integrity
tool
>>>>>>> that can be used to remove bad events from the channel - though
you would
>>>>>>> need to write some code to validate events. It will be in the
soon to be
>>>>>>> released 1.6.0
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Hari
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <litao.buptsse@gmail.com>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi all:
>>>>>>>>
>>>>>>>> My use case is KafkaChannel + HDFSEventSink.
>>>>>>>>
>>>>>>>> I found that SinkRunner.PollingRunner will call
>>>>>>>> HDFSEventSink.process() in a while loop. For example, a message
in kafka
>>>>>>>> contains dirty data, so HDFSEventSink.process() consume message
from kafka,
>>>>>>>> throws exception because of dirty data, and kafka offset
doesn't commit. And
>>>>>>>> the outer loop, will continue call HDFSEventSink.process().
Because the
>>>>>>>> kafka offset doesn't change, so HDFSEventSink will consume
the dirty data
>>>>>>>> again. The bad loop is never stopped.
>>>>>>>>
>>>>>>>> I want to know that if we have a mechanism to cover this
case? For
>>>>>>>> example, we have a max retry num for a unique HDFSEventSink.process()
call
>>>>>>>> and give up when exceed max limit.
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message