flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Laxman Ch <laxman....@gmail.com>
Subject Re: Avro configuration
Date Sat, 31 Dec 2016 04:19:35 GMT
Apologies for any confusion Jim.

You need to make serializer and deserializer in sync. That's the gist of my
first response.

hdfs.AvroEventSerializer is expecting some headers. We may need to pass
some headers in the event at source.
Can you please post your sample code & config for end-to-end flow in git?

On 31 December 2016 at 04:51, Jim Langston <jlangston@resolutebi.com> wrote:

> Ok - after messing with this for quite awhile, I am getting this error
>
>
> FlumeException: Could not find schema for event
>
>
> I added a schema and placed in in hdfs
>
>
> agent.sinks.persistence-sink.type = file_roll
> agent.sinks.persistence-sink.sink.directory = /home/flume/persistence
> agent.sinks.persistence-sink.sink.serializer = org.apache.flume.sink.hdfs.
> AvroEventSerializer$Builder
> agent.sinks.persistence-sink.sink.serializer.schemaURL =
> hdfs://namenode/persistence/persistence.avsc
> # Holders , can be used for tuning, defaults are the values
> agent.sinks.persistence-sink.batchSize = 1000
> agent.sinks.persistence-sink.sink.rollInterval = 300
>
>
> Looking at the configurations syntax for file_roll , can it traverse
>  serializer.schemaURL ? I see where
> HDFS has serializer and serializer.* , but file_roll does not
>
> Also, looking at the source code, it looks like the only way this error
> message can be generated
> is is the schemaURL is null
>
> private void initialize(Event event) throws IOException {
> Schema schema = null;
> String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER);
> if (schemaUrl != null) {
> schema = schemaCache.get(schemaUrl);
> if (schema == null) {
> schema = loadFromUrl(schemaUrl);
> schemaCache.put(schemaUrl, schema);
> }
> }
> if (schema == null) {
> String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
> if (schemaString == null) {
> throw new FlumeException("Could not find schema for event " + event);
> }
> schema = new Schema.Parser().parse(schemaString);
> }
>
>
>
> ------------------------------
> *From:* Jim Langston <jlangston@resolutebi.com>
> *Sent:* Friday, December 30, 2016 8:34:42 AM
>
> *To:* user@flume.apache.org
> *Subject:* Re: Avro configuration
>
>
> thanks - I was following your initial response
>
>
> - Use a custom AvroEventSerializer to directly serialize the avro event
> and use it with file_roll sink.
> Reference implementation is available in hdfs sink
> (org.apache.flume.sink.hdfs.AvroEventSerializer)
> You may strip of hdfs dependencies from it achieve what you want.
>
> This seems to indicate that I need to do a custom serializer following the
> constructs of the hdfs sink, what
> I couldn't find were some good examples of creating custom serializers and
> then deploying them. I was tempted to
> post to the developers group, but chose to just keep the context within
> this thread ..
>
> Your new response has me confused, this seems to indicate that all I need
> to do is use the hdfs serializer
>
>
> Jim
>
> ------------------------------
> *From:* Laxman Ch <laxman.lux@gmail.com>
> *Sent:* Thursday, December 29, 2016 3:22:04 PM
> *To:* user@flume.apache.org
> *Subject:* Re: Avro configuration
>
> Directly configure this serializer to file_roll sink in your
> configuration.
> >> agent.sinks.persistence-sink.sink.serializer
> = org.apache.flume.sink.hdfs.AvroEventSerializer
>
> Please go through the documentation before you proceed.
> https://flume.apache.org/FlumeUserGuide.html#avro-event-serializer
>
>
> On 29 December 2016 at 23:29, Jim Langston <jlangston@resolutebi.com>
> wrote:
>
>> Thanks - I have pulled the flume source, are there any good step-by-step
>> examples of creating a custom Serializer and deploying it for flume to use
>> ?
>>
>>
>> Jim
>> ------------------------------
>> *From:* Laxman Ch <laxman.lux@gmail.com>
>> *Sent:* Wednesday, December 28, 2016 12:05:08 AM
>> *To:* user@flume.apache.org
>> *Subject:* Re: Avro configuration
>>
>> Jim,
>>
>> In one-liner, FlumeEventAvroEventSerializer and AvroEventDeserializer
>> are not in sync and they can't be used as a serde pair.
>>
>> Flume's built-in avro serializer FlumeEventAvroEventSerializer which
>> serializes Flume events with shell. It's important to note that, here
>> actual raw event is wrapped inside the flume shell object and this raw
>> object is treated as binary (which can be thrift, avro, or just a byte
>> array, etc).
>> Flume's built-in avro deserializer AvroEventDeserializer which
>> deserializes any generic event serialized and it wraps the deserialized
>> event into another flume shell object.
>>
>> This means as per your configuration, spool directory source (
>> persistence-dev-source) will get an double wrapped flume event
>> (FlumeEvent -> FlumeEvent -> raw event body)
>>
>> To solve this problem, we need to have serializer and deserializer to be
>> in sync. We can achieve it in either of the following approaches.
>> - Use a custom FluemEventAvroEventDeserializer to extract directly
>> FlumeEvent without double wrapper and use it with spool directory source.
>>
>> Similar attempt has already been made by Sebastian here.
>> https://issues.apache.org/jira/browse/FLUME-2942
>>
>> I personally recommend to write a FlumeEventAvroEventDeserializer than
>> to modify the existing one.
>>
>> - Use a custom AvroEventSerializer to directly serialize the avro event
>> and use it with file_roll sink.
>> Reference implementation is available in hdfs sink
>> (org.apache.flume.sink.hdfs.AvroEventSerializer)
>> You may strip of hdfs dependencies from it achieve what you want.
>>
>>
>> On 28 December 2016 at 01:17, Jim Langston <jlangston@resolutebi.com>
>> wrote:
>>
>>> Hi all,
>>>
>>>
>>> I'm looking for some guidance , I have been trying to get a flow working
>>> that involves the following:
>>>
>>>
>>> Source Avro --> mem channel --> file_roll
>>>
>>>
>>> File Roll config
>>>
>>>
>>> agent.sinks.persistence-sink.type = file_roll
>>> agent.sinks.persistence-sink.sink.directory = /home/flume/persistence
>>> agent.sinks.persistence-sink.sink.serializer = avro_event
>>> agent.sinks.persistence-sink.batchSize = 1000
>>> agent.sinks.persistence-sink.sink.rollInterval = 300
>>>
>>>
>>> Once the data is on local disk, I want to flume the data to another
>>> flume server
>>>
>>> Source spooldir --> mem channel -- Avro Sink (to another flume server)
>>>
>>> agent.sources.persistence-dev-source.type = spooldir
>>> agent.sources.persistence-dev-source.spoolDir = /home/flume/ready
>>> agent.sources.persistence-dev-source.deserializer = avro
>>> agent.sources.persistence-dev-source.deserializer.schemaType = LITERAL
>>> agent.sources.persistence-dev-source.batchSize = 1000
>>>
>>>
>>>
>>> The problem is that file_roll will put the incoming Avro data into a
>>> Avro container before storing the data on the local file system. Then when
>>> the data is picked up by the spooldir source , and sent to the flume
>>> server, it will have the file_roll headers when being read by the
>>> interceptor.
>>>
>>> Is there a recommended way to save the Avro data coming in, that will
>>> maintain its integrity when sending on to another flume server, which is
>>> waiting on Avro data to multiplex and send to its channels.
>>>
>>> I have tried many different variations, with the result of the above
>>> configurations getting the Avro to the other server with the Avro data that
>>> was received, but the problem is that the applications will see the
>>> container headers from the file_roll , and not the headers from the records
>>> from the initial Avro data.
>>>
>>>
>>> Thanks,
>>>
>>> Jim
>>>
>>> schema that gets set by file_roll on its writes to disk:
>>>
>>> {
>>>   "type" : "record",
>>>   "name" : "Event",
>>>   "fields" : [ {
>>>     "name" : "headers",
>>>     "type" : {
>>>       "type" : "map",
>>>       "values" : "string"
>>>     }
>>>   }, {
>>>     "name" : "body",
>>>     "type" : "bytes"
>>>   } ]
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Thanks,
>> Laxman
>>
>
>
>
> --
> Thanks,
> Laxman
>



-- 
Thanks,
Laxman

Mime
View raw message