flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Erik Bertrand <E...@bertpc.com>
Subject Re: Custom Sink
Date Mon, 11 Mar 2013 14:24:23 GMT
I wrote a custom TCP Sink recently and would be happy to share the code;
it's based off the Logger sink (Brock Noland gave me a helpful link to the
 My code is quite simple, pretty bare-bones at this point.  I'm planning to
make it more full-featured and robust, and will eventually put it out on
GitHub.  Msg me in the interim.


On Mon, Mar 11, 2013 at 2:06 AM, Hari Shreedharan <hshreedharan@cloudera.com
> wrote:

> HI Vivek,
> I cannot be sure of why that is happening. Channel.take() gets called even
> if there are no events in the channel. If the take() method returns null,
> then there are no events in the channel. You can use the Status.BACKOFF
> return value to tell the sink poller to not retry immediately. But
> eventually, the SinkRunner will poll the sink again. This is because the
> SinkRunner does not know the state of the channel, so by calling the
> process method, the sink can take events if they arrive. Generally, the
> sinks call Channel.take() and if an entire batch was non-empty it will
> return Status.READY, else (that is the batch is null), then return
> Status.BACKOFF. See the code from AvroSink as an example (I have taken out
> some error-handling and counter-handling stuff and added some comments):
>       transaction.begin();
>       for (int i = 0; i < client.getBatchSize(); i++) {
>         Event event = channel.take(); //Take an event from the channel
>         if (event == null) { //Channel returned null, did not have any
> more events.
>           break;
>         }
>         batch.add(event);
>       }
>       int size = batch.size();
>       int batchSize = client.getBatchSize();
>       if (size == 0) {           //The batch was empty, so backoff and try
> again later.
>         status = Status.BACKOFF;
>       } else {                   //Batch was not empty, don't backoff, try
> immediately after
>         client.appendBatch(batch);
>       }
>       transaction.commit();
>       transaction.close();
>       return status;
> I hope this helps. Another thing you could do is to take something like
> AvroSink/AbstractRpcSink and rip out all of the Avro/Rpc stuff and insert
> your logic into it without changing much of the channel/transaction stuff.
> Hope this helps.
> Hari
> On Sun, Mar 10, 2013 at 8:52 PM, Vikram Kulkarni <vikulkarni@expedia.com>wrote:
>> I am trying to write a custom sink for flume-ng. I looked at the existing
>> sinks and documentation and coded it up. However, the 'process()' method
>> that's supposed to receive the events always ends up with a null event. I
>> am doing Event event = channel.take(); but the event is null. I see in the
>> logs that this method is called repeatedly as the event is still in the
>> channel so I think it is reaching the sink but unable to take it out of the
>> channel.
>> Can someone point me in the right direction?
>> Thanks,
>> Vikram

View raw message