flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hari Shreedharan" <hshreedha...@cloudera.com>
Subject Re: Kafka sink deleted, brokerList null
Date Wed, 25 Mar 2015 19:44:58 GMT
The ones which prepend kafka in the config params are the ones that need to be passed to Kafka,
but things like brokerList are required for Flume to connect to Kafka in the first place.
The basic idea is that some params need to be passed to the Flume sink, the rest which the
sink does not need to connect or write to Kafka can be passed using the kafka. prefix so the
Kafka client can use them.




Thanks, Hari

On Wed, Mar 25, 2015 at 12:41 PM, Adam Tannir <atannir@gmail.com> wrote:

> Thanks, Hari. Everything works as it should now.
> I had tried a few other configuration entries previously but they all had
> kafka after the sink name.
> On Wed, Mar 25, 2015 at 3:15 PM, Hari Shreedharan <hshreedharan@cloudera.com
>> wrote:
>> Set this param: a1.sinks.k1.brokerList = <list of brokers>
>> instead of a1.sinks.k1.kafka.metadata.broker.list =
>> localhost:9091,localhost:9092
>>
>>
>> Thanks,
>> Hari
>>
>> On Wed, Mar 25, 2015 at 12:02 PM, Adam Tannir <atannir@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> When running flume with kafka as a sink, an error is logged that
>>> "brokerList must contain at least one Kafka broker" but the line
>>> immediately previous shows the host:port entries as were entered in the
>>> config file and stored in the context.
>>>
>>> Everything works when I hardcode the host:port into the brokerList string
>>> and skip the failing test but that is a suboptimal solution. The kafka
>>> instances are from their quickstart guide and have no issues.
>>>
>>> Why isn't the value being selected from the context?
>>>
>>>
>>> flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSinkUtil.java:
>>>
>>>   private static void addDocumentedKafkaProps(Context context,
>>>                                               Properties kafkaProps)
>>>           throws ConfigurationException {
>>>     String brokerList = context.getString(KafkaSinkConstants
>>>             .BROKER_LIST_FLUME_KEY);
>>>     if (brokerList == null) {
>>>       throw new ConfigurationException("brokerList must contain at least
>>> " +
>>>               "one Kafka broker");
>>>     }
>>>     kafkaProps.put(KafkaSinkConstants.BROKER_LIST_KEY, brokerList);
>>>
>>>     String requiredKey = context.getString(
>>>             KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY);
>>>
>>>     if (requiredKey != null ) {
>>>       kafkaProps.put(KafkaSinkConstants.REQUIRED_ACKS_KEY, requiredKey);
>>>     }
>>>   }
>>>
>>>
>>>
>>> Config:
>>>
>>> # Describe the sink
>>> a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
>>> a1.sinks.k1.kafka.metadata.broker.list = localhost:9091,localhost:9092
>>> a1.sinks.k1.kafka.zookeeper.connect = localhost:2181
>>> a1.sinks.k1.topic = test
>>>
>>> logs/flume.log
>>>
>>> 25 Mar 2015 14:28:52,598 INFO  [conf-file-poller-0]
>>> (org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties:34)  -
>>> context={ parameters:{topic=test,
>>> kafka.metadata.broker.list=localhost:9091,localhost:9092,
>>> kafka.zookeeper.connect=localhost:2181,
>>> type=org.apache.flume.sink.kafka.KafkaSink, channel=c1} }
>>> 25 Mar 2015 14:28:52,611 ERROR [conf-file-poller-0]
>>> (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:427)  - Sink
>>> k1 has been removed due to an error during configuration
>>> org.apache.flume.conf.ConfigurationException: brokerList must contain at
>>> least one Kafka broker
>>>         at
>>> org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55)
>>>         at
>>> org.apache.flume.sink.kafka.KafkaSinkUtil.getKafkaProperties(KafkaSinkUtil.java:37)
>>>         at
>>> org.apache.flume.sink.kafka.KafkaSink.configure(KafkaSink.java:211)
>>>         at
>>> org.apache.flume.conf.Configurables.configure(Configurables.java:41)
>>>         at
>>> org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
>>>         at
>>> org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
>>>         at
>>> org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
>>>         at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>>         at
>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>>         at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> git clone https://github.com/apache/flume.git
>>> mvn compile install -DskipTests
>>> Version 1.6.0-SNAPSHOT from today
>>>
>>> Thanks!
>>>
>>
>>
Mime
View raw message