flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thanh Hong Dai" <hdth...@tma.com.vn>
Subject RE: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch has been closed()
Date Wed, 06 Jul 2016 04:00:43 GMT
Thanks for your reply. I guess I will have to wait for HDP to include the new version of Hive.

 

Best regards,

Thanh Hong.

 

From: Joe Lawson [mailto:jlawson@opensourceconnections.com] 
Sent: Wednesday, 6 July, 2016 2:41 AM
To: user@flume.apache.org
Subject: Re: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
TransactionBatch has been closed()

 

You may want to look here: https://community.hortonworks.com/content/kbentry/4321/hive-acid-current-state.html

 

Flume 1.5.2 doesn't include Hive support AFAIK so whatever they built for Hortonworks is their
own build. Note on the sink docs it says, "This sink is provided as a preview feature and
not recommended for use in production." (https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.0/ds_flume/FlumeUserGuide.html)
 It appears they are using hive version 1.2.1. Not sure what version the Sink lines up to.


 

Looking here: http://hortonworks.com/blog/adding-acid-to-apache-hive/

 

I appears that Hive go support in 0.14.0 for ACID inserts (https://issues.apache.org/jira/browse/HIVE-5317)
but has a but https://issues.apache.org/jira/browse/HIVE-12307?jql=project%20%3D%20HIVE%20AND%20text%20~%20%22TransactionBatch%20closed%22
about transactions closing that fixes in hive 1.3.0.

 

On Tue, Jul 5, 2016 at 1:01 AM, Thanh Hong Dai <hdthanh@tma.com.vn <mailto:hdthanh@tma.com.vn>
> wrote:

I forgot to include the version information. I’m currently using Flume 1.5.2 from HDP 2.4.2.

 

Looking at the changelog of Flume 1.6.0, the latest version, there seems to be some improvements
for Hive support.

This makes me wondering - does Flume 1.5.2 support Hive streaming to ACID table?

 

Best regards,

Thanh Hong.

 

From: Thanh Hong Dai [mailto:hdthanh@tma.com.vn <mailto:hdthanh@tma.com.vn> ] 
Sent: Tuesday, 5 July, 2016 11:47 AM
To: user@flume.apache.org <mailto:user@flume.apache.org> 
Subject: Unable to deliver event. org.apache.flume.EventDeliveryException: java.lang.IllegalStateException:
TransactionBatch has been closed()

 

Does anyone knows the cause of this exception when using Hive Sink, and how to fix it?

 

The Hive Sink managed to write data in the Hive table for a few minutes (which I can confirm
by querying the table), but then it shows the Exception below in the log file (/var/log/flume/flume-<streamname>.log)
for all the nodes.

 

05 Jul 2016 04:24:22,737 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)
 - Unable to deliver event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch
TxnIds=[29489...30488] on endPoint = {metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch TxnIds=[29489...30488] on endPoint
= {metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', partitionVals=[0804]
} has been closed()

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690)

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729)

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686)

        at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        ... 1 more

05 Jul 2016 04:24:27,891 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)
 - Unable to deliver event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.IllegalStateException: TransactionBatch
TxnIds=[29489...30488] on endPoint = {metaStoreUri='thrift://hive.metastore:9083', database='default',
table='acid', partitionVals=[0804] } has been closed()

        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:375)

        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.IllegalStateException: TransactionBatch TxnIds=[29489...30488] on endPoint
= {metaStoreUri='thrift://hive.metastore:9083', database='default', table='acid', partitionVals=[0804]
} has been closed()

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:690)

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:729)

        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:686)

        at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:48)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:161)

        at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:155)

        at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:425)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        ... 1 more

 

My flume.conf file:

 

# acidstream - streaming data from Kafka into Hive transactional table

acidstream.sources = kafka-source

acidstream.sinks = hive-sink

acidstream.channels = gutter

 

acidstream.sources.kafka-source.channels = gutter

acidstream.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource

acidstream.sources.kafka-source.zookeeperConnect = chdhost125.vitaldev.tma.com.vn:2181 <http://chdhost125.vitaldev.tma.com.vn:2181>
,chdhost27.vitaldev.tma.com.vn:2181 <http://chdhost27.vitaldev.tma.com.vn:2181> ,chdhost185.vitaldev.tma.com.vn:2181
<http://chdhost185.vitaldev.tma.com.vn:2181> 

acidstream.sources.kafka-source.topic = lan

acidstream.sources.kafka-source.groupId = acid

acidstream.sources.kafka-source.batchSize = 10000

acidstream.sources.kafka-source.batchDurationMillis = 60000

acidstream.sources.kafka-source.kafka.consumer.timeout.ms <http://acidstream.sources.kafka-source.kafka.consumer.timeout.ms>
 = 200

 

acidstream.sources.kafka-source.interceptors = i1

acidstream.sources.kafka-source.interceptors.i1.type = regex_extractor

acidstream.sources.kafka-source.interceptors.i1.regex = ^(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}
<file:///\\d%7b4%7d-\d%7b2%7d-\d%7b2%7d\s\d%7b2%7d:\d%7b2%7d:\d%7b2%7d> )

acidstream.sources.kafka-source.interceptors.i1.serializers = s1

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name <http://acidstream.sources.kafka-source.interceptors.i1.serializers.s1.name>
 = timestamp

acidstream.sources.kafka-source.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm:ss

 

acidstream.sinks.hive-sink.channel = gutter

acidstream.sinks.hive-sink.type = hive

acidstream.sinks.hive-sink.hive.metastore = thrift://hive.metastore:9083

acidstream.sinks.hive-sink.hive.database = default

acidstream.sinks.hive-sink.hive.table = acid

acidstream.sinks.hive-sink.hive.partition = %m%d

acidstream.sinks.hive-sink.heartBeatInterval = 10

acidstream.sinks.hive-sink.useLocalTimeStamp = false

acidstream.sinks.hive-sink.round = false

acidstream.sinks.hive-sink.hive.txnsPerBatchAsk = 1000

acidstream.sinks.hive-sink.batchSize = 10000

acidstream.sinks.hive-sink.callTimeout = 30000

acidstream.sinks.hive-sink.serializer = DELIMITED

acidstream.sinks.hive-sink.serializer.delimiter = "\t"

acidstream.sinks.hive-sink.serializer.serdeSeparator = '\t'

acidstream.sinks.hive-sink.serializer.fieldnames = timestamp,id,data

 

acidstream.channels.gutter.type = memory

acidstream.channels.gutter.capacity = 100000

acidstream.channels.gutter.transactionCapacity = 50000

 

My flume-env file has this line added:

 

export JAVA_OPTS="-Xms100m -Xmx3g"

 

My table on Hive has the following properties:

 

PARTITIONED BY (md string)

CLUSTERED BY (id) INTO 10 BUCKETS

STORED AS ORC

TBLPROPERTIES ('transactional' = 'true');

 

Hive has Tez engine set as the default execution engine.

 

Could this error be caused by low number of threads? (NameNode has 100 server threads available)

 

Best regards,

Thanh Hong.

 

 


Mime
View raw message