You need an interceptor to update/remove the topic header
Gonzalo
On Jun 12, 2016 4:57 AM, "lxw" <lxw1234@qq.com> wrote:
> Hi,All:
>
> I use Kafka Source to read events from one Kafka topic and write events
> to another Topic with Kafka Sink,
> the Kafka Sink topic configuration is not work, flume still write events
> to Kafka source Topic (sourceTopic).
>
> agent_myAgent.sources.kafkaSource.topic = sourceTopic
> agent_myAgent.sinks.kafkaSink.topic = sinkTopic
>
> SourceCode in "org.apache.flume.source.kafka.KafkaSource.process()" :
> // Add headers to event (topic, timestamp, and key)
> headers = new HashMap<String, String>();
> headers.put(KafkaSourceConstants.TIMESTAMP,
> String.valueOf(System.currentTimeMillis()));
> headers.put(KafkaSourceConstants.TOPIC, topic);
>
> and in Kafka Sink properties:
>
> The topic in Kafka to which the messages will be published. If this
> parameter is configured, messages will be published to this topic. If the
> event header contains a “topic” field, the event will be published to that
> topic overriding the topic configured here.
>
> SourceCode in "org.apache.flume.sink.kafka.KafkaSink.process()":
> if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
> eventTopic = topic;
> }
>
> In my case, how I can fix this problem?
> Flume version is 1.6.0.
> Thanks!
>
>
>
|