flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brock Noland <br...@cloudera.com>
Subject Re: Custom sink - "close() called when transaction is OPEN" error
Date Fri, 16 Nov 2012 16:02:09 GMT
If channel.take() returns null, no commit or rollback is called....

Checkout how logger sink handles this:

https://git-wip-us.apache.org/repos/asf?p=flume.git;a=blob;f=flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java;h=128fa8427af633c0c7c50093f8f6c4ef9bb8ae76;hb=HEAD

brock

On Fri, Nov 16, 2012 at 9:45 AM, Andrew Jones <andrewjones86@gmail.com> wrote:
> Sure.
>
> Sink: http://pastebin.com/N6zh73hU
> Config: http://pastebin.com/Tc2MH9iV
>
> Thanks.
>
>
> On 16 November 2012 15:32, Brock Noland <brock@cloudera.com> wrote:
>>
>> Would you be able to send the source of your sink via pastbin in
>> addition to your config?
>>
>> On Fri, Nov 16, 2012 at 9:21 AM, Andrew Jones <andrewjones86@gmail.com>
>> wrote:
>> > I tried logging the first throwable, but now that is just the
>> > IllegalStateException.
>> >
>> > Today I have been looking at Flume-1.3.0rc3 and I have noticed the same
>> > problem. This is using the Avro source, File channel and our custom
>> > sink.
>> > After Flume reloads its config, the first error message comes when the
>> > Avro
>> > source starts up:
>> >
>> > 16 Nov 2012 16:04:25,237 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.source.AvroSource.start:142)  - Starting Avro source
>> > source: { bindAddress: 0.0.0.0, port: 36060 }...
>> > 16 Nov 2012 16:04:25,484 ERROR
>> > [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> > (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>> > event. Exception follows.
>> > java.lang.IllegalStateException: close() called when transaction is OPEN
>> > -
>> > you must either commit or rollback first
>> >         at
>> > com.google.common.base.Preconditions.checkState(Preconditions.java:176)
>> >         at
>> >
>> > org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>> >         at
>> >
>> > com.arm.pd.brodie.flume.sink.ResultSink.processSingle(ResultSink.java:440)
>> >         at
>> > com.arm.pd.brodie.flume.sink.ResultSink.process(ResultSink.java:172)
>> >         at
>> >
>> > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>> >         at
>> > org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>> >         at java.lang.Thread.run(Thread.java:636)
>> > 16 Nov 2012 16:04:25,594 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.instrumentation.MonitoredCounterGroup.register:89)  -
>> > Monitoried counter group for type: SOURCE, name: source, registered
>> > successfully.
>> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.instrumentation.MonitoredCounterGroup.start:73)  -
>> > Component type: SOURCE, name: source started
>> > 16 Nov 2012 16:04:25,595 INFO  [lifecycleSupervisor-1-4]
>> > (org.apache.flume.source.AvroSource.start:168)  - Avro source source
>> > started.
>> >
>> > I then continually get errors from the Sink, presumably as its been
>> > called
>> > periodically to check for events in the channel. So is it possible its
>> > the
>> > Avro source causing the issue?
>> >
>> > There should have been nothing persisted in the file channel when
>> > restarting.
>> >
>> > When the transaction gets messed up like this, is there a way to refresh
>> > it,
>> > preferably without losing any data?
>> >
>> > I am still able to send things to flume and they get processed and
>> > inserted
>> > by my sink, so it still seems to work OK.
>> >
>> > Thanks,
>> > Andrew
>> >
>> >
>> >
>> > On 15 November 2012 12:50, Brock Noland <brock@cloudera.com> wrote:
>> >>
>> >> Can you log the Throwable as the first thing in the catch block to see
>> >> if something and what it is, is being thrown?
>> >>
>> >> Transactions are thread local so if for some reason the the sequencing
>> >> gets messed up on an earlier call the process, every call to
>> >> transaction will thrown an exception including begin.
>> >>
>> >>
>> >>
>> >> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
>> >>
>> >> As I stated in FLUME-1089 I think that when close is called it should
>> >> forcefully destroy the transaction like JDBC close() but I have not
>> >> got much agreement.
>> >>
>> >>
>> >> On Thu, Nov 15, 2012 at 5:24 AM, Andrew Jones <andrewjones86@gmail.com>
>> >> wrote:
>> >> > We are using Flume 1.2.0. We have a custom source, although it passes
>> >> > through an Avro Sink and Source before getting to the sink. We are
>> >> > now
>> >> > using
>> >> > the memory channel, although had just switched from the JDBC channel
>> >> > when we
>> >> > started seeing these errors, so maybe that's something to do with it?
>> >> >
>> >> > I tried wrapping transaction.rollback(); in a try catch and logging
>> >> > in
>> >> > the
>> >> > catch, but it wasn't called, so I don't think the rollback is
>> >> > throwing
>> >> > an
>> >> > error.
>> >> >
>> >> > I think it may have something to do with switching channels, as right
>> >> > after
>> >> > Flume reloaded the config we started getting errors. I have restarted
>> >> > the
>> >> > flume node manually and we are still getting the error.
>> >> >
>> >> > Thanks,
>> >> > Andrew
>> >> >
>> >> >
>> >> > On 14 November 2012 20:02, Hari Shreedharan
>> >> > <hshreedharan@cloudera.com>
>> >> > wrote:
>> >> >>
>> >> >> Which version of Flume are you using? It looks like the transaction
>> >> >> was
>> >> >> never rolled back or committed. It is likely that the rollback
>> >> >> method
>> >> >> too
>> >> >> threw some exception, and the rollback was not successful. Also,
>> >> >> what
>> >> >> channel are you using?
>> >> >>
>> >> >>
>> >> >> Thanks,
>> >> >> Hari
>> >> >>
>> >> >> --
>> >> >> Hari Shreedharan
>> >> >>
>> >> >> On Wednesday, November 14, 2012 at 8:55 AM, Andrew Jones wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> I have a custom sink which has been working fine, but recently
I
>> >> >> have
>> >> >> started seeing this error in the logs:
>> >> >>
>> >> >> Unable to deliver event. Exception follows.
>> >> >> java.lang.IllegalStateException: close() called when transaction
is
>> >> >> OPEN -
>> >> >> you must either commit or rollback first
>> >> >>         at
>> >> >>
>> >> >> com.google.common.base.Preconditions.checkState(Preconditions.java:176)
>> >> >> ...
>> >> >>
>> >> >>
>> >> >> After having a google and finding
>> >> >> https://issues.apache.org/jira/browse/FLUME-1089, I have double
>> >> >> checked
>> >> >> I am
>> >> >> using the correct try, catch, finally idiom that other sinks use,
>> >> >> and I
>> >> >> seem
>> >> >> to be doing the same. I do the following:
>> >> >>
>> >> >> public Status process() throws EventDeliveryException {
>> >> >> Status status = Status.READY;
>> >> >>
>> >> >> Channel channel = getChannel();
>> >> >> Transaction transaction = channel.getTransaction();
>> >> >>
>> >> >> try {
>> >> >> transaction.begin();
>> >> >>
>> >> >>                         // does a bit of processing and
>> >> >>                         // writes out the event to MongoDB
>> >> >>
>> >> >>                         transaction.commit();
>> >> >>
>> >> >> } catch (Throwable t) {
>> >> >> transaction.rollback();
>> >> >>
>> >> >> if (t instanceof Error) {
>> >> >> throw (Error) t;
>> >> >> } else if  (t instanceof EventDeliveryException) {
>> >> >> throw (EventDeliveryException) t;
>> >> >> } else if (t instanceof ChannelException) {
>> >> >> logger.error("Brodie Log Sink " + getName() + ": Unable to get
event
>> >> >> from"
>> >> >> +
>> >> >> " channel " + channel.getName() + ". Exception follows.", t);
>> >> >> status = Status.BACKOFF;
>> >> >> } else {
>> >> >> throw new EventDeliveryException("Failed to send events", t);
>> >> >> }
>> >> >> } finally {
>> >> >> transaction.close();
>> >> >> }
>> >> >>
>> >> >> return status;
>> >> >> }
>> >> >>
>> >> >> }
>> >> >>
>> >> >> All of this code came from looking at other sinks (Avro and HDFS),
>> >> >> so I
>> >> >> am
>> >> >> pretty sure its correct.
>> >> >>
>> >> >> Can anyone see anything that might be a problem, or is there
>> >> >> anything
>> >> >> else
>> >> >> I can do to avoid this error?
>> >> >>
>> >> >> Thanks,
>> >> >> Andrew
>> >> >>
>> >> >>
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Apache MRUnit - Unit testing MapReduce -
>> >> http://incubator.apache.org/mrunit/
>> >
>> >
>>
>>
>>
>> --
>> Apache MRUnit - Unit testing MapReduce -
>> http://incubator.apache.org/mrunit/
>
>



-- 
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/

Mime
View raw message