flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gonzalo Herreros <gherre...@gmail.com>
Subject Re: Kafka Sink Topic was overwritten by Kafka Source Topic
Date Sun, 12 Jun 2016 07:53:55 GMT
You need an interceptor to update/remove the topic header

Gonzalo
On Jun 12, 2016 4:57 AM, "lxw" <lxw1234@qq.com> wrote:

> Hi,All:
>
>    I use Kafka Source to read events from one Kafka topic and write events
> to another Topic with Kafka Sink,
> the Kafka Sink topic configuration is not work, flume still write events
> to Kafka source Topic (sourceTopic).
>
>   agent_myAgent.sources.kafkaSource.topic = sourceTopic
>   agent_myAgent.sinks.kafkaSink.topic = sinkTopic
>
>   SourceCode in "org.apache.flume.source.kafka.KafkaSource.process()" :
>    // Add headers to event (topic, timestamp, and key)
>           headers = new HashMap<String, String>();
>           headers.put(KafkaSourceConstants.TIMESTAMP,
>                   String.valueOf(System.currentTimeMillis()));
>           headers.put(KafkaSourceConstants.TOPIC, topic);
>
>  and in Kafka Sink properties:
>
> The topic in Kafka to which the messages will be published. If this
> parameter is configured, messages will be published to this topic. If the
> event header contains a “topic” field, the event will be published to that
> topic overriding the topic configured here.
>
>   SourceCode in "org.apache.flume.sink.kafka.KafkaSink.process()":
>   if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
>             eventTopic = topic;
>    }
>
>   In my case, how I can fix this problem?
>   Flume version is 1.6.0.
>   Thanks!
>
>
>

Mime
View raw message