flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hari Shreedharan <hshreedha...@cloudera.com>
Subject Re: Reading with Spark from KafkaChannel
Date Fri, 06 Nov 2015 08:57:11 GMT
Cornell's solution works. Just to avoid confusion, the body ehich is a bye
array is what your read from  kafka not the flume event body. Name or
something like message to avoid confusion

On Friday, November 6, 2015, Gonzalo Herreros <gherreros@gmail.com> wrote:

> Hola Guillermo,
>
> If I understand correctly you want Flume to write to kafka as a channel
> and then Spark to read from kafka.
> To do that you have two options:
> - Make Spark deserialize the FlumeEvent read from kafka. for instance in
> scala:
>
>       val parseFlumeEvent = { body: Array[Byte] =>
>         val reader = new
> SpecificDatumReader[AvroFlumeEvent](AvroFlumeEvent.getClassSchema)
>         val in = new ByteArrayInputStream(body);
>         val decoder =
> org.apache.avro.io.DecoderFactory.get().directBinaryDecoder(in, null);
>         val event = reader.read(null, decoder);
>         new String(event.getBody().array())
>       }
>
> -Or set parseAsFlumeEvent=false and merge fix FLUME-2781 (or wait for
> Flume 1.7.0 to be released).
> Without the fix, flume will still serialize avro kafka regardless of the
> property.
>
> Alternatively, you can write to kafka as a sink instead of a channel, so
> it doesn't wrap the message. But that adds some overhead because now you
> need a channel.
>
> Saludos,
> Gonzalo
>
>
> On 6 November 2015 at 08:38, Guillermo Ortiz <konstt2000@gmail.com
> <javascript:_e(%7B%7D,'cvml','konstt2000@gmail.com');>> wrote:
>
>> Hello,
>>
>> I'm using the KafkaChannel, I have a doubt about the documentation.
>>
>> Expecting Avro datums with FlumeEvent schema in the channel. This
>> should be true if Flume source is writing to the channel And false if
>> other producers are writing into the topic that the channel is using
>> Flume source messages to Kafka can be parsed outside of Flume by using
>> org.apache.flume.source.avro.AvroFlumeEvent provided by the
>> flume-ng-sdk artifact
>>
>> In my PoC, I'm writing just with Kafka to my topic but I want to read
>> from the KafkaChannel with Spark. I have configured parseAsFlumeEvent
>> as true because just Flume writes to this topic.
>> What should I do to read the "events" from Kafka? DO I have to use
>> org.apache.flume.source.avro.AvroFlumeEvent??. If I set the parameter
>> as false, how are the events inserted?
>>
>
>

-- 

Thanks,
Hari

Mime
View raw message