flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hari Shreedharan <hshreedha...@cloudera.com>
Subject Re:
Date Sun, 22 Nov 2015 17:45:38 GMT
I think there is a bug in the documentation that does not close the
transaction in the sink. You should look at any one of the sinks in the
code base and follow that in your own sink

On Sunday, November 22, 2015, beargan <beargan@hotmail.com> wrote:

> Sink
>
>
>
> *public* *class* *MySink* *extends* AbstractSink *implements* Configurable *{*
>
>   *private* String myProp*;*
>
>
>
>   @Override
>
>   *public* *void* *configure**(*Context context*)* *{*
>
>     String myProp *=* context*.*getString*(*"myProp"*,* "defaultValue"*);*
>
>
>
>     *// Process the myProp value (e.g. validation)*
>
>
>
>     *// Store myProp for later retrieval by process() method*
>
>     *this**.*myProp *=* myProp*;*
>
>   *}*
>
>
>
>   @Override
>
>   *public* *void* *start**()* *{*
>
>     *// Initialize the connection to the external repository (e.g. HDFS) that*
>
>     *// this Sink will forward Events to ..*
>
>   *}*
>
>
>
>   @Override
>
>   *public* *void* *stop* *()* *{*
>
>     *// Disconnect from the external respository and do any*
>
>     *// additional cleanup (e.g. releasing resources or nulling-out*
>
>     *// field values) ..*
>
>   *}*
>
>
>
>   @Override
>
>   *public* Status *process**()* *throws* EventDeliveryException *{*
>
>     Status status *=* *null**;*
>
>
>
>     *// Start transaction*
>
>     Channel ch *=* getChannel*();*
>
>     Transaction txn *=* ch*.*getTransaction*();*
>
>     txn*.*begin*();*
>
>     *try* *{*
>
>       *// This try clause includes whatever Channel operations you want to do*
>
>
>
>       Event event *=* ch*.*take*();*
>
>
>
>       *// Send the Event to the external repository.*
>
>       *// storeSomeData(e);*
>
>
>
>       txn*.*commit*();*
>
>       status *=* Status*.*READY*;*
>
>     *}* *catch* *(*Throwable t*)* *{*
>
>       txn*.*rollback*();*
>
>
>
>       *// Log exception, handle individual exceptions as needed*
>
>
>
>       status *=* Status*.*BACKOFF*;*
>
>
>
>       *// re-throw all Errors*
>
>       *if* *(*t *instanceof* Error*)* *{*
>
>         *throw* *(*Error*)*t*;*
>
>       *}*
>
>     *}*
>
>     *return* status*;*
>
>   *}*
>
> *}*
>
>
>
> *Compile and execute the following error:*
>
>
>
> 2015-11-22 15:30:58,773 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
> to deliver event. Exception follows.
>
> java.lang.IllegalStateException: begin() called when transaction is
> COMPLETED!
>
>         at
> com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>
>         at
> org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
>
>         at Toturial.Flume.MySink.process(MySink.java:54)
>
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
>         at java.lang.Thread.run(Unknown Source)
>


-- 

Thanks,
Hari

Mime
View raw message