So it looks like Flume is building against Kafka 2.10-0.8.1.1, and I’m using 2.10-0.8.2.2, which kafka-mesos leans on.

Any likelihood of this being incompatible?

I did notice this morning that Flume has an “EventReceivedCount” that increments 1-2k at a time and is always a round number, but the EventAcceptedCount is 0.

When I pipe dmesg to the console producer, and then try to read it with the console consumer, I have to tell it to start from the beginning, so it seems like flume may be reading events and moving the marker, but not doing anything with them.

{
   "CHANNEL.hdfs-channel-kafka" : {
      "EventTakeAttemptCount" : "117",
      "EventTakeSuccessCount" : "0",
      "StartTime" : "1454959470045",
      "ChannelFillPercentage" : "0.0",
      "ChannelSize" : "0",
      "EventPutSuccessCount" : "0",
      "Type" : "CHANNEL",
      "EventPutAttemptCount" : "1826",
      "ChannelCapacity" : "10",
      "StopTime" : "0"
   },
   "SOURCE.kafka-source-test" : {
      "StartTime" : "1454959471241",
      "OpenConnectionCount" : "0",
      "AppendBatchReceivedCount" : "0",
      "EventReceivedCount" : "166000",
      "KafkaCommitTimer" : "0",
      "AppendBatchAcceptedCount" : "0",
      "AppendReceivedCount" : "0",
      "StopTime" : "0",
      "KafkaEventGetTimer" : "17114",
      "KafkaEmptyCount" : "0",
      "EventAcceptedCount" : "0",
      "Type" : "SOURCE",
      "AppendAcceptedCount" : "0"
   },
   "SINK.hdfs-sink-kafka" : {
      "ConnectionFailedCount" : "0",
      "StartTime" : "1454959470054",
      "BatchCompleteCount" : "0",
      "ConnectionClosedCount" : "0",
      "EventDrainSuccessCount" : "0",
      "BatchEmptyCount" : "117",
      "BatchUnderflowCount" : "0",
      "Type" : "SINK",
      "ConnectionCreatedCount" : "0",
      "StopTime" : "0",
      "EventDrainAttemptCount" : "0"
   }
}
--

From: Gonzalo Herreros <gherreros@gmail.com>
Reply-To: <user@flume.apache.org>
Date: Monday, February 8, 2016 at 12:29 AM
To: user <user@flume.apache.org>
Subject: Re: KafkaSource not picking up any messages

The only thing I can think of is that the kafka client included in Kafka is not compatible with the kafka version on the brokers (there's been a lot of changes recently), but normally you get errors when that happens.

On 5 February 2016 at 20:02, Justin Ryan <juryan@ziprealty.com> wrote:
Thanks, Gonzalo – that def helped!

This also ties into an issue I’d raised with mesos-kafka where the zk path seemed to be ignored, and I now see that there is a node that stores the mesos-kafka scheduler config, and the kafka path must be specified separately, so is currently ‘/‘.

Still not reading events, but definitely looks better in startup log:

16/02/05 11:55:38 INFO kafka.KafkaSource: Kafka source kafka-source-test do started.
16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_mesos04-1454702137146-6cd63609-leader-finder-thread], Starting 
16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: kafka-source-test: Successfully registered new MBean.
16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: kafka-source-test started
16/02/05 11:55:38 INFO utils.VerifiableProperties: Verifying properties
16/02/05 11:55:38 INFO utils.VerifiableProperties: Property client.id is overridden to flume
16/02/05 11:55:38 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to mesos01:31000,mesos02:31000,mesos08:31000
16/02/05 11:55:38 INFO utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
16/02/05 11:55:38 INFO client.ClientUtils$: Fetching metadata from broker id:1,host:mesos02,port:31000 with correlation id 0 for 1 topic(s) Set(home_views)
16/02/05 11:55:38 INFO producer.SyncProducer: Connected to mesos02:31000 for producing
16/02/05 11:55:38 INFO producer.SyncProducer: Disconnecting from mesos02:31000
16/02/05 11:55:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flume_mesos04-1454702137146-6cd63609-0-0], Starting 
16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1454702137389] Added fetcher for partitions ArrayBuffer([[home_views,0], initOffset -1 to broker id:0,host:mesos01,port:31000] )


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   925    0   925    0     0   7741      0 --:--:-- --:--:-- --:--:--  7773
{
   "CHANNEL.hdfs-channel-kafka" : {
      "ChannelCapacity" : "10",
      "StartTime" : "1454702136681",
      "EventTakeSuccessCount" : "0",
      "ChannelFillPercentage" : "0.0",
      "EventPutAttemptCount" : "0",
      "EventTakeAttemptCount" : "14",
      "StopTime" : "0",
      "ChannelSize" : "0",
      "EventPutSuccessCount" : "0",
      "Type" : "CHANNEL"
   },
   "SOURCE.kafka-source-test" : {
      "AppendBatchReceivedCount" : "0",
      "AppendAcceptedCount" : "0",
      "KafkaEmptyCount" : "0",
      "AppendReceivedCount" : "0",
      "KafkaEventGetTimer" : "18046",
      "EventAcceptedCount" : "0",
      "StartTime" : "1454702138033",
      "StopTime" : "0",
      "KafkaCommitTimer" : "0",
      "Type" : "SOURCE",
      "AppendBatchAcceptedCount" : "0",
      "EventReceivedCount" : "0",
      "OpenConnectionCount" : "0"
   },
   "SINK.hdfs-sink-kafka" : {
      "ConnectionCreatedCount" : "0",
      "EventDrainAttemptCount" : "0",
      "BatchCompleteCount" : "0",
      "StartTime" : "1454702136714",
      "Type" : "SINK",
      "EventDrainSuccessCount" : "0",
      "StopTime" : "0",
      "BatchUnderflowCount" : "0",
      "ConnectionFailedCount" : "0",
      "BatchEmptyCount" : "13",
      "ConnectionClosedCount" : "0"
   }
}


From: Gonzalo Herreros <gherreros@gmail.com>
Reply-To: <user@flume.apache.org>
Date: Thursday, February 4, 2016 at 11:15 PM
To: user <user@flume.apache.org>
Subject: Re: KafkaSource not picking up any messages

I'm concerned with the warning "no brokers found when trying to rebalance"
Double check that the path in zookeeper is correct zk01:2181/mesos-kafka and it's not the standard /kafka

When you connect with the kafka-console-consumer, do you specify /mesos-kafka or just zk01:2181?
You can use the zkclient tool to check if there are brokers currently registered under that path for the topic "test"

Regards,
Gonzalo


On 4 February 2016 at 21:16, Justin Ryan <juryan@ziprealty.com> wrote:
Hiya folks,

I’m setting up a new environment with Kafka, Flume, and HDFS, and have implemented the simplest possible testing configuration I can come up with.  It logs successfully configuring and starting the KafkaSource, and with kafka tools I can confirm that messages have been sent, but the JSON Metrics from Flume show 0 messages processed.

Are there any more tools at my disposal to investigate? Any assistance would be greatly appreciated!

My config and log:

# generated by Chef for mesos10, changes will be overwritten

flume1.sources=kafka-source-test
flume1.channels=hdfs-channel-kafka
flume1.sinks=hdfs-sink-kafka
flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
flume1.sources.kafka-source-test.topic=test
flume1.sources.kafka-source-test.groupId=flume
flume1.sources.kafka-source-test.interceptors=i1
flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
flume1.channels.hdfs-channel-kafka.type=memory
flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
flume1.sinks.hdfs-sink-kafka.type=hdfs
flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
flume1.channels.hdfs-channel-kafka.capacity=10
flume1.channels.hdfs-channel-kafka.transactionCapacity=10

Startup log (less incredibly long path lines):
16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/etc/flume/conf.chef/flume.conf
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink-kafka Agent: flume1
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [flume1]
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating channels
16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of channel hdfs-channel-kafka type memory
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel hdfs-channel-kafka
16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={ parameters:{interceptors.i1.type=timestamp, zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka, groupId=flume, consumer.timeout.ms=100, topic=test, type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} }
16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs-sink-kafka, type: hdfs
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
16/02/04 11:32:07 INFO node.Application: Starting new configuration:{ sourceRunners:{kafka-source-test=PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{hdfs-sink-kafka=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{ name:null counters:{} } }} channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name: hdfs-channel-kafka}} }
16/02/04 11:32:07 INFO node.Application: Starting Channel hdfs-channel-kafka
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully registered new MBean.
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: hdfs-channel-kafka started
16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
16/02/04 11:32:07 INFO kafka.KafkaSource: Starting org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}...
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs-sink-kafka: Successfully registered new MBean.
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs-sink-kafka started
16/02/04 11:32:07 INFO mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
16/02/04 11:32:07 INFO mortbay.log: Started SelectChannelConnector@0.0.0.0:34545
16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property auto.commit.enable is overridden to false
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property consumer.timeout.ms is overridden to 10
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id is overridden to flume
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property zookeeper.connect is overridden to zk01:2181/mesos-kafka
16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at zk01:2181/mesos-kafka
16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name=mesos10
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_72-internal
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.version=3.13.0-63-generic
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name=marathon
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.home=/opt/marathon
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=zk01:2181/mesos-kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to server 10.100.6.251/10.100.6.251:2181. Will not attempt to authenticate using SASL (unknown error)
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established to 10.100.6.251/10.100.6.251:2181, initiating session
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment complete on server 10.100.6.251/10.100.6.251:2181, sessionid = 0x152858b1cc07491, negotiated timeout = 6000
16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], begin registering consumer flume_mesos10-1454614328204-ca8a74df in ZK
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], end registering consumer flume_mesos10-1454614328204-ca8a74df in ZK
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread for consumer flume_mesos10-1454614328204-ca8a74df
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer flume_mesos10-1454614328204-ca8a74df try #0
16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to rebalance.
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer flume_mesos10-1454614328204-ca8a74df try #0
16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test do started.
16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: kafka-source-test: Successfully registered new MBean.
16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: kafka-source-test started
--

--
Justin Alan Ryan
Sr. Systems / Release Engineer
ZipRealty