flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Tannir <atan...@gmail.com>
Subject Re: Kafka sink deleted, brokerList null
Date Wed, 25 Mar 2015 19:37:33 GMT
Thanks, Hari. Everything works as it should now.

I had tried a few other configuration entries previously but they all had
kafka after the sink name.



On Wed, Mar 25, 2015 at 3:15 PM, Hari Shreedharan <hshreedharan@cloudera.com
> wrote:

> Set this param: a1.sinks.k1.brokerList = <list of brokers>
> instead of a1.sinks.k1.kafka.metadata.broker.list =
> localhost:9091,localhost:9092
>
>
> Thanks,
> Hari
>
> On Wed, Mar 25, 2015 at 12:02 PM, Adam Tannir <atannir@gmail.com> wrote:
>
>> Hello,
>>
>> When running flume with kafka as a sink, an error is logged that
>> "brokerList must contain at least one Kafka broker" but the line
>> immediately previous shows the host:port entries as were entered in the
>> config file and stored in the context.
>>
>> Everything works when I hardcode the host:port into the brokerList string
>> and skip the failing test but that is a suboptimal solution. The kafka
>> instances are from their quickstart guide and have no issues.
>>
>> Why isn't the value being selected from the context?
>>
>>
>> flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java:
>>
>>   private static void addDocumentedKafkaProps(Context context,
>>                                               Properties kafkaProps)
>>           throws ConfigurationException {
>>     String brokerList = context.getString(KafkaSinkConstants
>>             .BROKER_LIST_FLUME_KEY);
>>     if (brokerList == null) {
>>       throw new ConfigurationException("brokerList must contain at least
>> " +
>>               "one Kafka broker");
>>     }
>>     kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);
>>
>>     String requiredKey = context.getString(
>>             KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
>>
>>     if (requiredKey != null ) {
>>       kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
>>     }
>>   }
>>
>>
>>
>> Config:
>>
>> # Describe the sink
>> a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
>> a1.sinks.k1.kafka.metadata.broker.list = localhost:9091,localhost:9092
>> a1.sinks.k1.kafka.zookeeper.connect = localhost:2181
>> a1.sinks.k1.topic = test
>>
>> logs/flume.log
>>
>> 25 Mar 2015 14:28:52,598 INFO  [conf-file-poller-0]
>> (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)  -
>> context={ parameters:{topic=test,
>> kafka.metadata.broker.list=localhost:9091,localhost:9092,
>> kafka.zookeeper.connect=localhost:2181,
>> type=org.apache.flume.sink.kafka.KafkaSink, channel=c1} }
>> 25 Mar 2015 14:28:52,611 ERROR [conf-file-poller-0]
>> (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:427)  - Sink
>> k1 has been removed due to an error during configuration
>> org.apache.flume.conf.ConfigurationException: brokerList must contain at
>> least one Kafka broker
>>         at
>> org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
>>         at
>> org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
>>         at
>> org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)
>>         at
>> org.apache.flume.conf.Configurables.configure(Configurables.java:41)
>>         at
>> org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
>>         at
>> org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
>>         at
>> org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
>>         at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>         at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>         at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:745)
>>
>>
>> git clone https://github.com/apache/flume.git
>> mvn compile install -DskipTests
>> Version 1.6.0-SNAPSHOT from today
>>
>> Thanks!
>>
>
>

Mime
View raw message