flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Ma <mj.saber1...@gmail.com>
Subject Flafka: Kafka channel unable to deliver event
Date Fri, 03 Jul 2015 20:07:59 GMT
Hi guys,

I'm interested in Flafka and try to build a kafka to hdfs data flow on my own machine. I have
some questions to ask and how someone can help me, thanks in advance.

1. I tried kafka channel and logger sink(no source, logger sink for test, eventually it will
be hdfs sink). The config file is:

a1.sinks = sink1
a1.channels = channel1

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.brokerList = localhost:9093,localhost:9094
a1.channels.channel1.topic = test
a1.channels.channel1.zookeeperConnect = localhost:2181
a1.channels.channel1.parseAsFlumeEvent = false
a1.channels.cnannel1.kafka.consumer.timeout.ms <http://a1.channels.cnannel1.kafka.consumer.timeout.ms/>
= 1000000

a1.sinks.sink1.channel = channel1
a1.sinks.sink1.type = logger

and the error messages I got is the following which says unable to deliver event:

MACC02PHH5LG3QC:apache-flume-1.6.0-bin jun.ma <http://jun.ma/>$ bin/flume-ng agent --conf
conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
+ exec /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=INFO,console
-cp '/Users/jun.ma/apache-flume-1.6.0-bin/conf:/Users/jun.ma/apache-flume-1.6.0-bin/lib/*
<http://jun.ma/apache-flume-1.6.0-bin/conf:/Users/jun.ma/apache-flume-1.6.0-bin/lib/*>'
-Djava.library.path= org.apache.flume.node.Application --conf-file conf/example.conf --name
a1
2015-07-03 12:50:21,549 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)]
Configuration provider starting
2015-07-03 12:50:21,554 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)]
Reloading configuration file:conf/example.conf
2015-07-03 12:50:21,562 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
Processing:sink1
2015-07-03 12:50:21,562 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
Processing:sink1
2015-07-03 12:50:21,562 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)]
Added sinks: sink1 Agent: a1
2015-07-03 12:50:21,567 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:508)]
Agent configuration for 'a1' has no sources.
2015-07-03 12:50:21,569 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)]
Post-validation flume configuration contains configuration for agents: [a1]
2015-07-03 12:50:21,570 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)]
Creating channels
2015-07-03 12:50:21,576 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)]
Creating instance of channel channel1 type org.apache.flume.channel.kafka.KafkaChannel
2015-07-03 12:50:21,588 (conf-file-poller-0) [INFO - org.apache.flume.channel.kafka.KafkaChannel.configure(KafkaChannel.java:168)]
Group ID was not specified. Using flume as the group id.
2015-07-03 12:50:21,602 (conf-file-poller-0) [INFO - org.apache.flume.channel.kafka.KafkaChannel.configure(KafkaChannel.java:188)]
{metadata.broker.list=localhost:9093,localhost:9094, request.required.acks=-1, group.id <http://group.id/>=flume,
zookeeper.connect=localhost:2181, consumer.timeout.ms <http://consumer.timeout.ms/>=100,
auto.commit.enable=false}
2015-07-03 12:50:21,610 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)]
Created channel channel1
2015-07-03 12:50:21,611 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)]
Creating instance of sink: sink1, type: logger
2015-07-03 12:50:21,614 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)]
Channel channel1 connected to [sink1]
2015-07-03 12:50:21,620 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)]
Starting new configuration:{ sourceRunners:{} sinkRunners:{sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4ae671f6
counterGroup:{ name:null counters:{} } }} channels:{channel1=org.apache.flume.channel.kafka.KafkaChannel{name:
channel1}} }
2015-07-03 12:50:21,621 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)]
Starting Channel channel1
2015-07-03 12:50:21,623 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:96)]
Starting Kafka Channel: channel1
2015-07-03 12:50:21,938 (lifecycleSupervisor-1-0) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)]
Verifying properties
2015-07-03 12:50:21,971 (lifecycleSupervisor-1-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)]
Property auto.commit.enable is not valid
2015-07-03 12:50:21,972 (lifecycleSupervisor-1-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)]
Property consumer.timeout.ms <http://consumer.timeout.ms/> is not valid
2015-07-03 12:50:21,972 (lifecycleSupervisor-1-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)]
Property group.id <http://group.id/> is not valid
2015-07-03 12:50:21,973 (lifecycleSupervisor-1-0) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)]
Property metadata.broker.list is overridden to localhost:9093,localhost:9094
2015-07-03 12:50:21,973 (lifecycleSupervisor-1-0) [INFO - kafka.utils.Logging$class.info <http://class.info/>(Logging.scala:68)]
Property request.required.acks is overridden to -1
2015-07-03 12:50:21,973 (lifecycleSupervisor-1-0) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)]
Property zookeeper.connect is not valid
2015-07-03 12:50:22,017 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.kafka.KafkaChannel.start(KafkaChannel.java:99)]
Topic = test
2015-07-03 12:50:22,018 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]
Monitored counter group for type: CHANNEL, name: channel1: Successfully registered new MBean.
2015-07-03 12:50:22,018 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]
Component type: CHANNEL, name: channel1 started
2015-07-03 12:50:22,018 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)]
Starting Sink sink1
2015-07-03 12:50:22,029 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info
<http://class.info/>(Logging.scala:68)] Verifying properties
2015-07-03 12:50:22,029 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info
<http://class.info/>(Logging.scala:68)] Property auto.commit.enable is overridden to
false
2015-07-03 12:50:22,029 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info
<http://class.info/>(Logging.scala:68)] Property consumer.timeout.ms <http://consumer.timeout.ms/>
is overridden to 100
2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info
<http://class.info/>(Logging.scala:68)] Property group.id <http://group.id/> is
overridden to flume
2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)]
Property metadata.broker.list is not valid
2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - kafka.utils.Logging$class.warn(Logging.scala:83)]
Property request.required.acks is not valid
2015-07-03 12:50:22,030 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info
<http://class.info/>(Logging.scala:68)] Property zookeeper.connect is overridden to
localhost:2181
2015-07-03 12:50:22,063 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info
<http://class.info/>(Logging.scala:68)] [flume_MACC02PHH5LG3QC-1435953022061-eaf69e13],
Connecting to zookeeper instance at localhost:2181
2015-07-03 12:50:22,065 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]
Unable to deliver event. Exception follows.
java.lang.IllegalStateException: close() called when transaction is OPEN - you must either
commit or rollback first
	at com.google.common.base.Preconditions.checkState(Preconditions.java:172)
	at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
	at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
	at java.lang.Thread.run(Thread.java:745)

I set up zookeeper and two brokers locally and have a customer producer generating data to
the "test" topic. If I use my consumer I can get data delivered. Another thing is a1.channels.cnannel1.kafka.consumer.timeout.ms
<http://a1.channels.cnannel1.kafka.consumer.timeout.ms/> = 1000000 is not successfully
set up and I don't know why.

2. How do I set up hdfs path if I run agent on my local machine? Currently I put hdfs-site.xml
which illustrate into flume/conf directory, I'm not sure if that is correct. Any suggestions
are welcome, thanks!

Bests,
Jun
Mime
View raw message