flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Tannir <atan...@gmail.com>
Subject Kafka sink deleted, brokerList null
Date Wed, 25 Mar 2015 19:02:47 GMT
Hello,

When running flume with kafka as a sink, an error is logged that
"brokerList must contain at least one Kafka broker" but the line
immediately previous shows the host:port entries as were entered in the
config file and stored in the context.

Everything works when I hardcode the host:port into the brokerList string
and skip the failing test but that is a suboptimal solution. The kafka
instances are from their quickstart guide and have no issues.

Why isn't the value being selected from the context?

flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java:

  private static void addDocumentedKafkaProps(Context context,
                                              Properties kafkaProps)
          throws ConfigurationException {
    String brokerList = context.getString(KafkaSinkConstants
            .BROKER_LIST_FLUME_KEY);
    if (brokerList == null) {
      throw new ConfigurationException("brokerList must contain at least " +
              "one Kafka broker");
    }
    kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);

    String requiredKey = context.getString(
            KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);

    if (requiredKey != null ) {
      kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
    }
  }



Config:

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.metadata.broker.list = localhost:9091,localhost:9092
a1.sinks.k1.kafka.zookeeper.connect = localhost:2181
a1.sinks.k1.topic = test

logs/flume.log

25 Mar 2015 14:28:52,598 INFO  [conf-file-poller-0]
(org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)  -
context={ parameters:{topic=test,
kafka.metadata.broker.list=localhost:9091,localhost:9092,
kafka.zookeeper.connect=localhost:2181,
type=org.apache.flume.sink.kafka.KafkaSink, channel=c1} }
25 Mar 2015 14:28:52,611 ERROR [conf-file-poller-0]
(org.apache.flume.node.AbstractConfigurationProvider.loadSinks:427)  - Sink
k1 has been removed due to an error during configuration
org.apache.flume.conf.ConfigurationException: brokerList must contain at
least one Kafka broker
        at
org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
        at
org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
        at
org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)
        at
org.apache.flume.conf.Configurables.configure(Configurables.java:41)
        at
org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
        at
org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
        at
org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)


git clone https://github.com/apache/flume.git
mvn compile install -DskipTests
Version 1.6.0-SNAPSHOT from today

Thanks!

Mime
View raw message