flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Laxman Ch <laxman....@gmail.com>
Subject Re: Avro configuration
Date Thu, 05 Jan 2017 21:49:31 GMT
> I noticed in the Jira that you pointed me at that there has been some
activity on the deserializer - will that do what I'm looking for now ?
without writing one ?

Yes Jim. I think patch in that jira will be helpful for you. Please give a
try.

On 6 January 2017 at 00:45, Jim Langston <jlangston@resolutebi.com> wrote:

> Hi Laxman - I noticed in the Jira that you pointed me at that there has
> been some activity on the deserializer - will that do what I'm looking for
> now ? without writing one ? If so, can I get ahold of it to implement ? I'm
> on apache-flume-1.6.0-bin
>
>
>
> I have tried to use this: agent.sinks.persistence-sink.sink.serializer =
> com.adaltas.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
>
>
> and it does put the files out to disk with the header and the body , tried
> with spooldir , but nothing on the other end. So, I take it that I'm not
> paired
>
> correctly.
>
>
>
> Thanks,
>
>
> Jim
> ------------------------------
> *From:* Jim Langston <jlangston@resolutebi.com>
> *Sent:* Tuesday, January 3, 2017 1:59:15 PM
>
> *To:* user@flume.apache.org
> *Subject:* Re: Avro configuration
>
>
> Hi - yes - got the gist from your initial response, went off on the track
> to find examples of writing and deploying a serializer before starting to
> do so when I realized what file_roll was really doing, that's why I got
> confused on your next response. I still haven't found anything that gives
> guidance on doing so.
>
>
> What I want to do is relatively simple (or so I thought), that is, a
> production stream is providing the initial avro data, the data needs to be
> used in a staging and a development environment, the problem is if staging
> is down or development is down, it affects the production flumes. I want to
> stop that, I was going to do so by saving the data to disk (it is not
> important to be real time), and scoop it out and move it on. file_roll
> loses the data integrity by wrapping it, and spooldir picks up the arvo
> headers of the wrapping , not of the original source.
>
>
> I will look into writing a serializer that file_roll can use that will
> place the events to disk without wrapping the event.
>
>
> Jim
> ------------------------------
> *From:* Laxman Ch <laxman.lux@gmail.com>
> *Sent:* Friday, December 30, 2016 11:19:35 PM
> *To:* user@flume.apache.org
> *Subject:* Re: Avro configuration
>
> Apologies for any confusion Jim.
>
> You need to make serializer and deserializer in sync. That's the gist of
> my first response.
>
> hdfs.AvroEventSerializer is expecting some headers. We may need to pass
> some headers in the event at source.
> Can you please post your sample code & config for end-to-end flow in git?
>
> On 31 December 2016 at 04:51, Jim Langston <jlangston@resolutebi.com>
> wrote:
>
>> Ok - after messing with this for quite awhile, I am getting this error
>>
>>
>> FlumeException: Could not find schema for event
>>
>>
>> I added a schema and placed in in hdfs
>>
>>
>> agent.sinks.persistence-sink.type = file_roll
>> agent.sinks.persistence-sink.sink.directory = /home/flume/persistence
>> agent.sinks.persistence-sink.sink.serializer =
>> org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
>> agent.sinks.persistence-sink.sink.serializer.schemaURL =
>> hdfs://namenode/persistence/persistence.avsc
>> # Holders , can be used for tuning, defaults are the values
>> agent.sinks.persistence-sink.batchSize = 1000
>> agent.sinks.persistence-sink.sink.rollInterval = 300
>>
>>
>> Looking at the configurations syntax for file_roll , can it traverse
>>  serializer.schemaURL ? I see where
>> HDFS has serializer and serializer.* , but file_roll does not
>>
>> Also, looking at the source code, it looks like the only way this error
>> message can be generated
>> is is the schemaURL is null
>>
>> private void initialize(Event event) throws IOException {
>> Schema schema = null;
>> String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER);
>> if (schemaUrl != null) {
>> schema = schemaCache.get(schemaUrl);
>> if (schema == null) {
>> schema = loadFromUrl(schemaUrl);
>> schemaCache.put(schemaUrl, schema);
>> }
>> }
>> if (schema == null) {
>> String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
>> if (schemaString == null) {
>> throw new FlumeException("Could not find schema for event " + event);
>> }
>> schema = new Schema.Parser().parse(schemaString);
>> }
>>
>>
>>
>> ------------------------------
>> *From:* Jim Langston <jlangston@resolutebi.com>
>> *Sent:* Friday, December 30, 2016 8:34:42 AM
>>
>> *To:* user@flume.apache.org
>> *Subject:* Re: Avro configuration
>>
>>
>> thanks - I was following your initial response
>>
>>
>> - Use a custom AvroEventSerializer to directly serialize the avro event
>> and use it with file_roll sink.
>> Reference implementation is available in hdfs sink
>> (org.apache.flume.sink.hdfs.AvroEventSerializer)
>> You may strip of hdfs dependencies from it achieve what you want.
>>
>> This seems to indicate that I need to do a custom serializer following
>> the constructs of the hdfs sink, what
>> I couldn't find were some good examples of creating custom serializers
>> and then deploying them. I was tempted to
>> post to the developers group, but chose to just keep the context within
>> this thread ..
>>
>> Your new response has me confused, this seems to indicate that all I need
>> to do is use the hdfs serializer
>>
>>
>> Jim
>>
>> ------------------------------
>> *From:* Laxman Ch <laxman.lux@gmail.com>
>> *Sent:* Thursday, December 29, 2016 3:22:04 PM
>> *To:* user@flume.apache.org
>> *Subject:* Re: Avro configuration
>>
>> Directly configure this serializer to file_roll sink in your
>> configuration.
>> >> agent.sinks.persistence-sink.sink.serializer
>> = org.apache.flume.sink.hdfs.AvroEventSerializer
>>
>> Please go through the documentation before you proceed.
>> https://flume.apache.org/FlumeUserGuide.html#avro-event-serializer
>>
>>
>> On 29 December 2016 at 23:29, Jim Langston <jlangston@resolutebi.com>
>> wrote:
>>
>>> Thanks - I have pulled the flume source, are there any good step-by-step
>>> examples of creating a custom Serializer and deploying it for flume to use
>>> ?
>>>
>>>
>>> Jim
>>> ------------------------------
>>> *From:* Laxman Ch <laxman.lux@gmail.com>
>>> *Sent:* Wednesday, December 28, 2016 12:05:08 AM
>>> *To:* user@flume.apache.org
>>> *Subject:* Re: Avro configuration
>>>
>>> Jim,
>>>
>>> In one-liner, FlumeEventAvroEventSerializer and AvroEventDeserializer
>>> are not in sync and they can't be used as a serde pair.
>>>
>>> Flume's built-in avro serializer FlumeEventAvroEventSerializer which
>>> serializes Flume events with shell. It's important to note that, here
>>> actual raw event is wrapped inside the flume shell object and this raw
>>> object is treated as binary (which can be thrift, avro, or just a byte
>>> array, etc).
>>> Flume's built-in avro deserializer AvroEventDeserializer which
>>> deserializes any generic event serialized and it wraps the deserialized
>>> event into another flume shell object.
>>>
>>> This means as per your configuration, spool directory source (
>>> persistence-dev-source) will get an double wrapped flume event
>>> (FlumeEvent -> FlumeEvent -> raw event body)
>>>
>>> To solve this problem, we need to have serializer and deserializer to be
>>> in sync. We can achieve it in either of the following approaches.
>>> - Use a custom FluemEventAvroEventDeserializer to extract directly
>>> FlumeEvent without double wrapper and use it with spool directory source.
>>>
>>> Similar attempt has already been made by Sebastian here.
>>> https://issues.apache.org/jira/browse/FLUME-2942
>>>
>>> I personally recommend to write a FlumeEventAvroEventDeserializer than
>>> to modify the existing one.
>>>
>>> - Use a custom AvroEventSerializer to directly serialize the avro event
>>> and use it with file_roll sink.
>>> Reference implementation is available in hdfs sink
>>> (org.apache.flume.sink.hdfs.AvroEventSerializer)
>>> You may strip of hdfs dependencies from it achieve what you want.
>>>
>>>
>>> On 28 December 2016 at 01:17, Jim Langston <jlangston@resolutebi.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>>
>>>> I'm looking for some guidance , I have been trying to get a flow
>>>> working that involves the following:
>>>>
>>>>
>>>> Source Avro --> mem channel --> file_roll
>>>>
>>>>
>>>> File Roll config
>>>>
>>>>
>>>> agent.sinks.persistence-sink.type = file_roll
>>>> agent.sinks.persistence-sink.sink.directory = /home/flume/persistence
>>>> agent.sinks.persistence-sink.sink.serializer = avro_event
>>>> agent.sinks.persistence-sink.batchSize = 1000
>>>> agent.sinks.persistence-sink.sink.rollInterval = 300
>>>>
>>>>
>>>> Once the data is on local disk, I want to flume the data to another
>>>> flume server
>>>>
>>>> Source spooldir --> mem channel -- Avro Sink (to another flume server)
>>>>
>>>> agent.sources.persistence-dev-source.type = spooldir
>>>> agent.sources.persistence-dev-source.spoolDir = /home/flume/ready
>>>> agent.sources.persistence-dev-source.deserializer = avro
>>>> agent.sources.persistence-dev-source.deserializer.schemaType = LITERAL
>>>> agent.sources.persistence-dev-source.batchSize = 1000
>>>>
>>>>
>>>>
>>>> The problem is that file_roll will put the incoming Avro data into a
>>>> Avro container before storing the data on the local file system. Then when
>>>> the data is picked up by the spooldir source , and sent to the flume
>>>> server, it will have the file_roll headers when being read by the
>>>> interceptor.
>>>>
>>>> Is there a recommended way to save the Avro data coming in, that will
>>>> maintain its integrity when sending on to another flume server, which is
>>>> waiting on Avro data to multiplex and send to its channels.
>>>>
>>>> I have tried many different variations, with the result of the above
>>>> configurations getting the Avro to the other server with the Avro data that
>>>> was received, but the problem is that the applications will see the
>>>> container headers from the file_roll , and not the headers from the records
>>>> from the initial Avro data.
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Jim
>>>>
>>>> schema that gets set by file_roll on its writes to disk:
>>>>
>>>> {
>>>>   "type" : "record",
>>>>   "name" : "Event",
>>>>   "fields" : [ {
>>>>     "name" : "headers",
>>>>     "type" : {
>>>>       "type" : "map",
>>>>       "values" : "string"
>>>>     }
>>>>   }, {
>>>>     "name" : "body",
>>>>     "type" : "bytes"
>>>>   } ]
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Laxman
>>>
>>
>>
>>
>> --
>> Thanks,
>> Laxman
>>
>
>
>
> --
> Thanks,
> Laxman
>



-- 
Thanks,
Laxman

Mime
View raw message