flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Justin Ryan <jur...@ziprealty.com>
Subject KafkaSource not picking up any messages
Date Thu, 04 Feb 2016 21:16:53 GMT
Hiya folks,

I¹m setting up a new environment with Kafka, Flume, and HDFS, and have
implemented the simplest possible testing configuration I can come up with.
It logs successfully configuring and starting the KafkaSource, and with
kafka tools I can confirm that messages have been sent, but the JSON Metrics
from Flume show 0 messages processed.

Are there any more tools at my disposal to investigate? Any assistance would
be greatly appreciated!

My config and log:

‹
# generated by Chef for mesos10, changes will be overwritten

flume1.sources=kafka-source-test
flume1.channels=hdfs-channel-kafka
flume1.sinks=hdfs-sink-kafka
flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSou
rce
flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
flume1.sources.kafka-source-test.topic=test
flume1.sources.kafka-source-test.groupId=flume
flume1.sources.kafka-source-test.interceptors=i1
flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
flume1.sources.kafka-source-test.consumer.timeout.ms=100
flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
flume1.channels.hdfs-channel-kafka.type=memory
flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
flume1.sinks.hdfs-sink-kafka.type=hdfs
flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
flume1.channels.hdfs-channel-kafka.capacity=10
flume1.channels.hdfs-channel-kafka.transactionCapacity=10
‹

Startup log (less incredibly long path lines):
‹
16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
Configuration provider starting
16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
Reloading configuration file:/etc/flume/conf.chef/flume.conf
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink-kafka
Agent: flume1
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume
configuration contains configuration for agents: [flume1]
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating channels
16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of
channel hdfs-channel-kafka type memory
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel
hdfs-channel-kafka
16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of
source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={
parameters:{interceptors.i1.type=timestamp,
zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka,
groupId=flume, consumer.timeout.ms=100, topic=test,
type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} }
16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink:
hdfs-sink-kafka, type: hdfs
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel
hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
16/02/04 11:32:07 INFO node.Application: Starting new configuration:{
sourceRunners:{kafka-source-test=PollableSourceRunner: {
source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,stat
e:IDLE} counterGroup:{ name:null counters:{} } }}
sinkRunners:{hdfs-sink-kafka=SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{
name:null counters:{} } }}
channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name:
hdfs-channel-kafka}} }
16/02/04 11:32:07 INFO node.Application: Starting Channel hdfs-channel-kafka
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully
registered new MBean.
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component
type: CHANNEL, name: hdfs-channel-kafka started
16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
16/02/04 11:32:07 INFO kafka.KafkaSource: Starting
org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}
...
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
counter group for type: SINK, name: hdfs-sink-kafka: Successfully registered
new MBean.
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component
type: SINK, name: hdfs-sink-kafka started
16/02/04 11:32:07 INFO mortbay.log: Logging to
org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
org.mortbay.log.Slf4jLog
16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
16/02/04 11:32:07 INFO mortbay.log: Started
SelectChannelConnector@0.0.0.0:34545
16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
auto.commit.enable is overridden to false
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
consumer.timeout.ms is overridden to 10
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id is
overridden to flume
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
zookeeper.connect is overridden to zk01:2181/mesos-kafka
16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at
zk01:2181/mesos-kafka
16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event
thread.
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:host.name=mesos10
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:java.version=1.8.0_72-internal
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:java.vendor=Oracle Corporation
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:java.io.tmpdir=/tmp
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:java.compiler=<NA>
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:os.version=3.13.0-63-generic
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:user.name=marathon
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
environment:user.home=/opt/marathon
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection,
connectString=zk01:2181/mesos-kafka sessionTimeout=6000
watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to
server 10.100.6.251/10.100.6.251:2181. Will not attempt to authenticate
using SASL (unknown error)
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established
to 10.100.6.251/10.100.6.251:2181, initiating session
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment complete
on server 10.100.6.251/10.100.6.251:2181, sessionid = 0x152858b1cc07491,
negotiated timeout = 6000
16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed
(SyncConnected)
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], begin registering consumer
flume_mesos10-1454614328204-ca8a74df in ZK
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], end registering consumer
flume_mesos10-1454614328204-ca8a74df in ZK
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread for
consumer flume_mesos10-1454614328204-ca8a74df
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer
flume_mesos10-1454614328204-ca8a74df try #0
16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to
rebalance.
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
[flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer
flume_mesos10-1454614328204-ca8a74df try #0
16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test do
started.
16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored
counter group for type: SOURCE, name: kafka-source-test: Successfully
registered new MBean.
16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component
type: SOURCE, name: kafka-source-test started
--

--
Justin Alan Ryan
Sr. Systems / Release Engineer
ZipRealty



Mime
View raw message