flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "lxw" <lxw1...@qq.com>
Subject 回复: Kafka Sink Topic was overwritten by Kafka Source Topic
Date Sun, 12 Jun 2016 09:08:19 GMT
Yes, I use a Static Interceptor to solve that:

agent_myAgent.sources.kafkaSource.interceptors = i1
agent_myAgent.sources.kafkaSource.interceptors.i1.type = static
agent_myAgent.sources.kafkaSource.interceptors.i1.key = topic
agent_myAgent.sources.kafkaSource.interceptors.i1.preserveExisting = false
agent_myAgent.sources.kafkaSource.interceptors.i1.value = sinkTopic

Thanks for your reply.




------------------ 原始邮件 ------------------
发件人: "Gonzalo Herreros";<gherreros@gmail.com>;
发送时间: 2016年6月12日(星期天) 下午3:53
收件人: "user"<user@flume.apache.org>; 

主题: Re: Kafka Sink Topic was overwritten by Kafka Source Topic




You need an interceptor to update/remove the topic header
 
Gonzalo
 On Jun 12, 2016 4:57 AM, "lxw" <lxw1234@qq.com> wrote:
Hi,All:

   I use Kafka Source to read events from one Kafka topic and write events to another Topic
with Kafka Sink, 
the Kafka Sink topic configuration is not work, flume still write events to Kafka source Topic
(sourceTopic).

  agent_myAgent.sources.kafkaSource.topic = sourceTopic
  agent_myAgent.sinks.kafkaSink.topic = sinkTopic
 
  SourceCode in "org.apache.flume.source.kafka.KafkaSource.process()" :
   // Add headers to event (topic, timestamp, and key)
          headers = new HashMap<String, String>();
          headers.put(KafkaSourceConstants.TIMESTAMP,
                  String.valueOf(System.currentTimeMillis()));
          headers.put(KafkaSourceConstants.TOPIC, topic);
 
 and in Kafka Sink properties:
 
The topic in Kafka to which the messages will be published. If this parameter is configured,
messages will be published to this topic. If the event header contains a “topic” field,
the event will be published to that topic overriding the topic configured here.

  SourceCode in "org.apache.flume.sink.kafka.KafkaSink.process()":
  if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
            eventTopic = topic;
   }

  In my case, how I can fix this problem?
  Flume version is 1.6.0.
  Thanks!
Mime
View raw message