flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sverre Bakke <sverre.ba...@gmail.com>
Subject Re: Flume error handling
Date Tue, 11 Nov 2014 08:42:23 GMT

Actually, I considered creating a deserializer instead of a source,
but this seems to have even less documentation and samples. Also, the
files I am reading are quite large, so this might have an impact on
the system performance. I will try to have another look at this as
this would simplify my job significantly.

Regarding the source shutdown issue, the current version is listed
here (loosely based on other Flume sources):

Except for the fact that the HDFS sink seems to have some issues from
time to time, this works great, until I press ctrl+c on Flume. The
problem is that it never exists if I press ctrl+c, but throws quite a
lot of exceptions, including Channel full. Seems as if the sink is
shut down before the source.

My theory is as follows:
1) shutdown on a scheduled executor will only cancel the next
scheduled task, not the current one
2) since there are a few thousand files in the input directory, the
source will have a long queue of files to process, even after shutdown
3) also, as the source might be in the middle of processing a file
with a few thousand lines, it might even continue doing this even
after shutdown
4) sink is shut down independently, while the source is still feeding
events, this complaining about full buffer

The problem is that I just don't quite know how to work around this
given that I am unable to signal the Runnable objects to gracefully
shut down in the middle of processing a list of files.

Any pointers would be appreciated.


On Mon, Nov 10, 2014 at 8:27 PM, Hari Shreedharan
<hshreedharan@cloudera.com> wrote:
> You could implement a deserializer that reads compressed files. You’d have
> to decompress the file and return the data event by event though. Take a
> look at something like the BlobSerializer as an example (it is a pretty
> simple one that demostrates the behavior).
> If you shutdown the executor which you started in the start() method, and
> you shut it down in the stop method, you probably will get an exception from
> the Executor framework complaining that the executor was shutdown and not a
> channel exception. If you get a channel exception it means that the channel
> is full or can’t accept events right now for some other reason — not that
> the source can’t handle it. If that is the case, the bug is in the source
> implementation. Do you see that behavior in a source bundled with flume?
> Thanks,
> Hari
> On Mon, Nov 10, 2014 at 4:01 AM, Sverre Bakke <sverre.bakke@gmail.com>
> wrote:
>> Hi,
>> When creating a new EventDrivenSource running as an executor, what is
>> the correct approach to handling shutdown gracefully?
>> I am writing a custom source that will poll a compressed file line by
>> line using BufferedReader and pushing these lines to a
>> ChannelProcessor using processEvent(). This as a result of Spooling
>> Directory not supporting compressed files. This also means that most
>> of the time, my Flume source will be blocking on
>> BufferedReader.readLine() or blocking on
>> ChannelProcessor.processEvent().
>> If I shutdown the executor from the stop() method of my source, the
>> typical response from Flume will be that the ChannelProcessor will
>> generate a ChannelException. In what situations can I expect that the
>> ChannelException actually is the result of a shutdown (e.g. ctrl+c)
>> rather than some other issue that should be handled as a truly
>> exceptional situation/error? Or am I approaching graceful shutdown
>> completely wrong?
>> Is there any specific order in which the Flume sources, interceptors
>> and sinks are signaled to shut down?
>> I feel that when it comes to error handling (and shutdowns), the
>> developer guide and javadoc is a bit lacking unfortunately.
>> Regards,
>> Sverre

View raw message