flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Balazs Donat Bessenyei <bes...@cloudera.com>
Subject Re: CustomSerializer throws org.apache.avro.AvroRuntimeException: not open
Date Wed, 03 Aug 2016 20:46:16 GMT
Dear Sunita,

Thank you for the useful advice!

I've taken a look at the related source code.
All usages of abstract class AbstractAvroEventSerializer seem to be
calling public
void afterCreate() which has your addition (*dataFileWriter.create*).

Can you please provide a little more info about your custom serializer?
(I'm trying to find out if somehow we could improve the code or maybe if
this should go to the documentation.)


Thank you

Donat

On Wed, Jul 27, 2016 at 10:10 AM, Sunita Arvind <sunitarvind@gmail.com>
wrote:

> For benefit of anyone else hitting the same issue, here is what I found:
>
> The serializer I was using was extending AbstractAvroEventSerializer. This
> class has a lot of adoption, so its not likely to be an issue in the
> abstract class. However, I got rid of this issue by overriding the
> configure method in AbstractAvroEventSerializer in my custom serializer, as
> below:
>
>
> public void configure(Context context) {
>     int syncIntervalBytes = context.getInteger("syncIntervalBytes", Integer.valueOf(2048000)).intValue();
>     String compressionCodec = context.getString("compressionCodec", "null");
>     this.writer = new ReflectDatumWriter(this.getSchema());
>     this.dataFileWriter = new DataFileWriter(this.writer);
>     this.dataFileWriter.setSyncInterval(syncIntervalBytes);
>     try {
>         CodecFactory e = CodecFactory.fromString(compressionCodec);
>         this.dataFileWriter.setCodec(e);
>       *  this.dataFileWriter.create(schema,out); --> added the creation *
>     } catch (AvroRuntimeException var5) {
>         logger.warn("Unable to instantiate avro codec with name (" + compressionCodec
+ "). Compression disabled. Exception follows.", var5);
>     } catch (IOException io){
>         logger.warn("Could not open dataFileWriter Exception follows.", io.getStackTrace());
>     }
>
> }
>
> After this, the files are getting created in hdfs just right.
> I was also able to view the files in spark using spark-avro package.
> Hope this is the right way to do it and the solution helps someone.
> Would love to hear if anyone in avro or flume community knows of a better way to do it.
>
> regards
> Sunita
>
>
> On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <sunitarvind@gmail.com>
> wrote:
>
>> Hello Experts,
>>
>> I am trying to convert a custom data source received in flume into avro
>> and push to hdfs. What I am attempting to do is
>> syslog -> flume -> flume interceptor to convert into
>> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
>> to Avro
>>
>> The flume configuration looks like:
>>
>> tier1.sources.syslogsource.interceptors.i2.type=timestamp
>> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
>> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
>> tier1.sources.syslogsource.interceptors.i1.type =
>> com.flume.CustomToAvroConvertInterceptor$Builder
>>
>> #hdfs sink for archival and batch analysis
>> tier1.sinks.hdfssink.type = hdfs
>> tier1.sinks.hdfssink.hdfs.writeFormat = Text
>> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>>
>> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
>> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>>
>> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
>> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
>> # roll file if it's been 10 * 60 seconds = 600
>> tier1.sinks.hdfssink.hdfs.rollInterval=600
>> # roll file if we get 50,000 log lines (~25MB)
>> tier1.sinks.hdfssink.hdfs.rollCount=0
>> tier1.sinks.hdfssink.hdfs.batchSize = 100
>> tier1.sinks.hdfssink.hdfs.rollSize=0
>> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
>> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
>> tier1.sinks.hdfssink.channel = hdfsmem
>>
>> When I use tier1.sinks.hdfssink.serializer=avro_event
>> I get binary data stored into hdfs which is the
>> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
>> however this data cannot be parsed in hive. As a result, I see all nulls in
>> the column values.
>> Based on -
>> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
>> ?
>> all I am doing in RawAvroHiveSerializer.convert is to decode using binary
>> Decoder.
>> The exception I get seems to be unrelated to the code itself, hence
>> pasting the stack trace. Will share the code if it is required to identify
>> the rootcause:
>>
>> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
>> deliver event. Exception follows.
>> org.apache.flume.EventDeliveryException:
>> org.apache.avro.AvroRuntimeException: not open
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>>         at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>         at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: org.apache.avro.AvroRuntimeException: not open
>>         at
>> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
>>         at
>> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
>>         at
>> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>>         at
>> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
>>         at
>> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
>>         at
>> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
>>
>> I can reproduce this local file system as well. In the testcase, I tried
>> setting the file open to append=true and still encounter the same exception.
>>
>> Appreciate any guidance in this regard.
>>
>> regards
>> Sunita
>>
>
>

Mime
View raw message