flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sunita Arvind <sunitarv...@gmail.com>
Subject Re: CustomSerializer throws org.apache.avro.AvroRuntimeException: not open
Date Wed, 03 Aug 2016 22:43:56 GMT
Thanks Balazs,

I do see what you mentioned in the AbstractAvroEventSerializer, but when I
run with Visual VM debugger, I do not see control going to the aftercreate
method of AbstractAvroEventSerializer at all.From what it looked like to
me, there is an attempt to configure an output writer based on what is
passed as argument to configure(), convert it to whatever is required and
append (before creating). This is when I was not overriding configure(),
write() and afterCreate() methods and just overriding convert() and
providing the builder class.

Attached is the java serializer class which works fine now. Basically, to
give you some context on what I am deserializing, I have a flume
interceptor which converts protobuffer to an avro object. In the hdfs
serializer, I decode the bytearrays emitted by the flume interceptor into
avro object.

Please note, the below method in RawAvroHiveSerializer has no effect. I
added it while troubleshooting and did not originally override this method
when I was encountering the problem.

@Override
public void afterCreate() throws IOException {
    //noop
}

Probably I must've missed something else as no one else seems to have
encountered this problem.

Thanks for your feedback. Appreciate it. Do let me know if there is a
better way to handle the issue.

regards
Sunita


On Wed, Aug 3, 2016 at 1:46 PM, Balazs Donat Bessenyei <bessbd@cloudera.com>
wrote:

> Dear Sunita,
>
> Thank you for the useful advice!
>
> I've taken a look at the related source code.
> All usages of abstract class AbstractAvroEventSerializer seem to be
> calling public void afterCreate() which has your addition (
> *dataFileWriter.create*).
>
> Can you please provide a little more info about your custom serializer?
> (I'm trying to find out if somehow we could improve the code or maybe if
> this should go to the documentation.)
>
>
> Thank you
>
> Donat
>
> On Wed, Jul 27, 2016 at 10:10 AM, Sunita Arvind <sunitarvind@gmail.com>
> wrote:
>
>> For benefit of anyone else hitting the same issue, here is what I found:
>>
>> The serializer I was using was extending AbstractAvroEventSerializer.
>> This class has a lot of adoption, so its not likely to be an issue in the
>> abstract class. However, I got rid of this issue by overriding the
>> configure method in AbstractAvroEventSerializer in my custom serializer, as
>> below:
>>
>>
>> public void configure(Context context) {
>>     int syncIntervalBytes = context.getInteger("syncIntervalBytes", Integer.valueOf(2048000)).intValue();
>>     String compressionCodec = context.getString("compressionCodec", "null");
>>     this.writer = new ReflectDatumWriter(this.getSchema());
>>     this.dataFileWriter = new DataFileWriter(this.writer);
>>     this.dataFileWriter.setSyncInterval(syncIntervalBytes);
>>     try {
>>         CodecFactory e = CodecFactory.fromString(compressionCodec);
>>         this.dataFileWriter.setCodec(e);
>>       *  this.dataFileWriter.create(schema,out); --> added the creation *
>>     } catch (AvroRuntimeException var5) {
>>         logger.warn("Unable to instantiate avro codec with name (" + compressionCodec
+ "). Compression disabled. Exception follows.", var5);
>>     } catch (IOException io){
>>         logger.warn("Could not open dataFileWriter Exception follows.", io.getStackTrace());
>>     }
>>
>> }
>>
>> After this, the files are getting created in hdfs just right.
>> I was also able to view the files in spark using spark-avro package.
>> Hope this is the right way to do it and the solution helps someone.
>> Would love to hear if anyone in avro or flume community knows of a better way to
do it.
>>
>> regards
>> Sunita
>>
>>
>> On Tue, Jul 26, 2016 at 12:45 PM, Sunita Arvind <sunitarvind@gmail.com>
>> wrote:
>>
>>> Hello Experts,
>>>
>>> I am trying to convert a custom data source received in flume into avro
>>> and push to hdfs. What I am attempting to do is
>>> syslog -> flume -> flume interceptor to convert into
>>> avroObject.toByteArray -> hdfs serializer which decodes the byteArray back
>>> to Avro
>>>
>>> The flume configuration looks like:
>>>
>>> tier1.sources.syslogsource.interceptors.i2.type=timestamp
>>> tier1.sources.syslogsource.interceptors.i2.preserveExisting=true
>>> tier1.sources.syslogsource.interceptors.i1.dataSourceType=DataSource1
>>> tier1.sources.syslogsource.interceptors.i1.type =
>>> com.flume.CustomToAvroConvertInterceptor$Builder
>>>
>>> #hdfs sink for archival and batch analysis
>>> tier1.sinks.hdfssink.type = hdfs
>>> tier1.sinks.hdfssink.hdfs.writeFormat = Text
>>> tier1.sinks.hdfssink.hdfs.fileType = DataStream
>>>
>>> tier1.sinks.hdfssink.hdfs.filePrefix=%{flumeHost}-%{host}%{customerId}-%Y%m%d-%H
>>> tier1.sinks.hdfssink.hdfs.inUsePrefix=_
>>>
>>> tier1.sinks.hdfssink.hdfs.path=/hive/rawavro/customer_id=%{customerId}/date=%Y%m%d/hr=%H
>>> tier1.sinks.hdfssink.hdfs.fileSuffix=.avro
>>> # roll file if it's been 10 * 60 seconds = 600
>>> tier1.sinks.hdfssink.hdfs.rollInterval=600
>>> # roll file if we get 50,000 log lines (~25MB)
>>> tier1.sinks.hdfssink.hdfs.rollCount=0
>>> tier1.sinks.hdfssink.hdfs.batchSize = 100
>>> tier1.sinks.hdfssink.hdfs.rollSize=0
>>> tier1.sinks.hdfssink.serializer=com.flume.RawAvroHiveSerializer$Builder
>>> tier1.sinks.hdfssink.serializer.compressionCodec=snappy
>>> tier1.sinks.hdfssink.channel = hdfsmem
>>>
>>> When I use tier1.sinks.hdfssink.serializer=avro_event
>>> I get binary data stored into hdfs which is the
>>> CustomToAvroConvertInterceptor.intercept(event.getbody).toByteArray ,
>>> however this data cannot be parsed in hive. As a result, I see all nulls in
>>> the column values.
>>> Based on -
>>> https://cwiki.apache.org/confluence/display/AVRO/FAQ#FAQ-HowcanIserializedirectlyto/fromabytearray
>>> ?
>>> all I am doing in RawAvroHiveSerializer.convert is to decode using
>>> binary Decoder.
>>> The exception I get seems to be unrelated to the code itself, hence
>>> pasting the stack trace. Will share the code if it is required to identify
>>> the rootcause:
>>>
>>> 2016-07-26 19:15:27,187 ERROR org.apache.flume.SinkRunner: Unable to
>>> deliver event. Exception follows.
>>> org.apache.flume.EventDeliveryException:
>>> org.apache.avro.AvroRuntimeException: not open
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
>>>         at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>         at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Caused by: org.apache.avro.AvroRuntimeException: not open
>>>         at
>>> org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:82)
>>>         at
>>> org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:299)
>>>         at
>>> org.apache.flume.serialization.AbstractAvroEventSerializer.write(AbstractAvroEventSerializer.java:108)
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.append(HDFSDataStream.java:124)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:550)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:547)
>>>         at
>>> org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
>>>         at
>>> org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
>>>
>>> I can reproduce this local file system as well. In the testcase, I tried
>>> setting the file open to append=true and still encounter the same exception.
>>>
>>> Appreciate any guidance in this regard.
>>>
>>> regards
>>> Sunita
>>>
>>
>>
>

Mime
View raw message