Hola Guillermo,

If I understand correctly you want Flume to write to kafka as a channel and then Spark to read from kafka.
To do that you have two options:
- Make Spark deserialize the FlumeEvent read from kafka. for instance in scala:

      val parseFlumeEvent = { body: Array[Byte] =>
        val reader = new SpecificDatumReader[AvroFlumeEvent](AvroFlumeEvent.getClassSchema)
        val in = new ByteArrayInputStream(body);
        val decoder = org.apache.avro.io.DecoderFactory.get().directBinaryDecoder(in, null);
        val event = reader.read(null, decoder);
        new String(event.getBody().array())

-Or set parseAsFlumeEvent=false and merge fix FLUME-2781 (or wait for Flume 1.7.0 to be released).
Without the fix, flume will still serialize avro kafka regardless of the property.

Alternatively, you can write to kafka as a sink instead of a channel, so it doesn't wrap the message. But that adds some overhead because now you need a channel.


On 6 November 2015 at 08:38, Guillermo Ortiz <konstt2000@gmail.com> wrote:

I'm using the KafkaChannel, I have a doubt about the documentation.

Expecting Avro datums with FlumeEvent schema in the channel. This
should be true if Flume source is writing to the channel And false if
other producers are writing into the topic that the channel is using
Flume source messages to Kafka can be parsed outside of Flume by using
org.apache.flume.source.avro.AvroFlumeEvent provided by the
flume-ng-sdk artifact

In my PoC, I'm writing just with Kafka to my topic but I want to read
from the KafkaChannel with Spark. I have configured parseAsFlumeEvent
as true because just Flume writes to this topic.
What should I do to read the "events" from Kafka? DO I have to use
org.apache.flume.source.avro.AvroFlumeEvent??. If I set the parameter
as false, how are the events inserted?