This was actually generating errors, the memory channel had been configured with very low capacity thinking that would force it to flush more often or something, and was fixed.

Now I’m on to a challenge I think I understand just fine, or fine enough: HDFS permissions.

Thanks, Gonzalo for input and for being a sounding board!

Justin

From: Justin Ryan <juryan@ziprealty.com>
Date: Monday, February 8, 2016 at 11:52 AM
To: <user@flume.apache.org>
Subject: Re: KafkaSource not picking up any messages

So it looks like Flume is building against Kafka 2.10-0.8.1.1, and I’m using 2.10-0.8.2.2, which kafka-mesos leans on.

Any likelihood of this being incompatible?

I did notice this morning that Flume has an “EventReceivedCount” that increments 1-2k at a time and is always a round number, but the EventAcceptedCount is 0.

When I pipe dmesg to the console producer, and then try to read it with the console consumer, I have to tell it to start from the beginning, so it seems like flume may be reading events and moving the marker, but not doing anything with them.

{
   "CHANNEL.hdfs-channel-kafka" : {
      "EventTakeAttemptCount" : "117",
      "EventTakeSuccessCount" : "0",
      "StartTime" : "1454959470045",
      "ChannelFillPercentage" : "0.0",
      "ChannelSize" : "0",
      "EventPutSuccessCount" : "0",
      "Type" : "CHANNEL",
      "EventPutAttemptCount" : "1826",
      "ChannelCapacity" : "10",
      "StopTime" : "0"
   },
   "SOURCE.kafka-source-test" : {
      "StartTime" : "1454959471241",
      "OpenConnectionCount" : "0",
      "AppendBatchReceivedCount" : "0",
      "EventReceivedCount" : "166000",
      "KafkaCommitTimer" : "0",
      "AppendBatchAcceptedCount" : "0",
      "AppendReceivedCount" : "0",
      "StopTime" : "0",
      "KafkaEventGetTimer" : "17114",
      "KafkaEmptyCount" : "0",
      "EventAcceptedCount" : "0",
      "Type" : "SOURCE",
      "AppendAcceptedCount" : "0"
   },
   "SINK.hdfs-sink-kafka" : {
      "ConnectionFailedCount" : "0",
      "StartTime" : "1454959470054",
      "BatchCompleteCount" : "0",
      "ConnectionClosedCount" : "0",
      "EventDrainSuccessCount" : "0",
      "BatchEmptyCount" : "117",
      "BatchUnderflowCount" : "0",
      "Type" : "SINK",
      "ConnectionCreatedCount" : "0",
      "StopTime" : "0",
      "EventDrainAttemptCount" : "0"
   }
}
--

From: Gonzalo Herreros <gherreros@gmail.com>
Reply-To: <user@flume.apache.org>
Date: Monday, February 8, 2016 at 12:29 AM
To: user <user@flume.apache.org>
Subject: Re: KafkaSource not picking up any messages

The only thing I can think of is that the kafka client included in Kafka is not compatible with the kafka version on the brokers (there's been a lot of changes recently), but normally you get errors when that happens.

On 5 February 2016 at 20:02, Justin Ryan <juryan@ziprealty.com> wrote:
Thanks, Gonzalo – that def helped!

This also ties into an issue I’d raised with mesos-kafka where the zk path seemed to be ignored, and I now see that there is a node that stores the mesos-kafka scheduler config, and the kafka path must be specified separately, so is currently ‘/‘.

Still not reading events, but definitely looks better in startup log:

16/02/05 11:55:38 INFO kafka.KafkaSource: Kafka source kafka-source-test do started.
16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_mesos04-1454702137146-6cd63609-leader-finder-thread], Starting 
16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: kafka-source-test: Successfully registered new MBean.
16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: kafka-source-test started
16/02/05 11:55:38 INFO utils.VerifiableProperties: Verifying properties
16/02/05 11:55:38 INFO utils.VerifiableProperties: Property client.id is overridden to flume
16/02/05 11:55:38 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to mesos01:31000,mesos02:31000,mesos08:31000
16/02/05 11:55:38 INFO utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
16/02/05 11:55:38 INFO client.ClientUtils$: Fetching metadata from broker id:1,host:mesos02,port:31000 with correlation id 0 for 1 topic(s) Set(home_views)
16/02/05 11:55:38 INFO producer.SyncProducer: Connected to mesos02:31000 for producing
16/02/05 11:55:38 INFO producer.SyncProducer: Disconnecting from mesos02:31000
16/02/05 11:55:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flume_mesos04-1454702137146-6cd63609-0-0], Starting 
16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1454702137389] Added fetcher for partitions ArrayBuffer([[home_views,0], initOffset -1 to broker id:0,host:mesos01,port:31000] )


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   925    0   925    0     0   7741      0 --:--:-- --:--:-- --:--:--  7773
{
   "CHANNEL.hdfs-channel-kafka" : {
      "ChannelCapacity" : "10",
      "StartTime" : "1454702136681",
      "EventTakeSuccessCount" : "0",
      "ChannelFillPercentage" : "0.0",
      "EventPutAttemptCount" : "0",
      "EventTakeAttemptCount" : "14",
      "StopTime" : "0",
      "ChannelSize" : "0",
      "EventPutSuccessCount" : "0",
      "Type" : "CHANNEL"
   },
   "SOURCE.kafka-source-test" : {
      "AppendBatchReceivedCount" : "0",
      "AppendAcceptedCount" : "0",
      "KafkaEmptyCount" : "0",
      "AppendReceivedCount" : "0",
      "KafkaEventGetTimer" : "18046",
      "EventAcceptedCount" : "0",
      "StartTime" : "1454702138033",
      "StopTime" : "0",
      "KafkaCommitTimer" : "0",
      "Type" : "SOURCE",
      "AppendBatchAcceptedCount" : "0",
      "EventReceivedCount" : "0",
      "OpenConnectionCount" : "0"
   },
   "SINK.hdfs-sink-kafka" : {
      "ConnectionCreatedCount" : "0",
      "EventDrainAttemptCount" : "0",
      "BatchCompleteCount" : "0",
      "StartTime" : "1454702136714",
      "Type" : "SINK",
      "EventDrainSuccessCount" : "0",
      "StopTime" : "0",
      "BatchUnderflowCount" : "0",
      "ConnectionFailedCount" : "0",
      "BatchEmptyCount" : "13",
      "ConnectionClosedCount" : "0"
   }
}


From: Gonzalo Herreros <gherreros@gmail.com>
Reply-To: <user@flume.apache.org>
Date: Thursday, February 4, 2016 at 11:15 PM
To: user <user@flume.apache.org>
Subject: Re: KafkaSource not picking up any messages

I'm concerned with the warning "no brokers found when trying to rebalance"
Double check that the path in zookeeper is correct zk01:2181/mesos-kafka and it's not the standard /kafka

When you connect with the kafka-console-consumer, do you specify /mesos-kafka or just zk01:2181?
You can use the zkclient tool to check if there are brokers currently registered under that path for the topic "test"

Regards,
Gonzalo


On 4 February 2016 at 21:16, Justin Ryan <juryan@ziprealty.com> wrote:
Hiya folks,

I’m setting up a new environment with Kafka, Flume, and HDFS, and have implemented the simplest possible testing configuration I can come up with.  It logs successfully configuring and starting the KafkaSource, and with kafka tools I can confirm that messages have been sent, but the JSON Metrics from Flume show 0 messages processed.

Are there any more tools at my disposal to investigate? Any assistance would be greatly appreciated!

My config and log:

# generated by Chef for mesos10, changes will be overwritten

flume1.sources=kafka-source-test
flume1.channels=hdfs-channel-kafka
flume1.sinks=hdfs-sink-kafka
flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
flume1.sources.kafka-source-test.topic=test
flume1.sources.kafka-source-test.groupId=flume
flume1.sources.kafka-source-test.interceptors=i1
flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
flume1.channels.hdfs-channel-kafka.type=memory
flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
flume1.sinks.hdfs-sink-kafka.type=hdfs
flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
flume1.channels.hdfs-channel-kafka.capacity=10
flume1.channels.hdfs-channel-kafka.transactionCapacity=10

Startup log (less incredibly long path lines):
16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/etc/flume/conf.chef/flume.conf
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink-kafka Agent: flume1
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [flume1]
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating channels
16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of channel hdfs-channel-kafka type memory
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel hdfs-channel-kafka
16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={ parameters:{interceptors.i1.type=timestamp, zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka, groupId=flume, consumer.timeout.ms=100, topic=test, type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1} }
16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink: hdfs-sink-kafka, type: hdfs
16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
16/02/04 11:32:07 INFO node.Application: Starting new configuration:{ sourceRunners:{kafka-source-test=PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{hdfs-sink-kafka=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{ name:null counters:{} } }} channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name: hdfs-channel-kafka}} }
16/02/04 11:32:07 INFO node.Application: Starting Channel hdfs-channel-kafka
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully registered new MBean.
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: hdfs-channel-kafka started
16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
16/02/04 11:32:07 INFO kafka.KafkaSource: Starting org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}...
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: hdfs-sink-kafka: Successfully registered new MBean.
16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: hdfs-sink-kafka started
16/02/04 11:32:07 INFO mortbay.log: Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
16/02/04 11:32:07 INFO mortbay.log: Started SelectChannelConnector@0.0.0.0:34545
16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property auto.commit.enable is overridden to false
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property consumer.timeout.ms is overridden to 10
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id is overridden to flume
16/02/04 11:32:08 INFO utils.VerifiableProperties: Property zookeeper.connect is overridden to zk01:2181/mesos-kafka
16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at zk01:2181/mesos-kafka
16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name=mesos10
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_72-internal
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.version=3.13.0-63-generic
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name=marathon
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.home=/opt/marathon
16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=zk01:2181/mesos-kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to server 10.100.6.251/10.100.6.251:2181. Will not attempt to authenticate using SASL (unknown error)
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established to 10.100.6.251/10.100.6.251:2181, initiating session
16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment complete on server 10.100.6.251/10.100.6.251:2181, sessionid = 0x152858b1cc07491, negotiated timeout = 6000
16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], begin registering consumer flume_mesos10-1454614328204-ca8a74df in ZK
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], end registering consumer flume_mesos10-1454614328204-ca8a74df in ZK
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread for consumer flume_mesos10-1454614328204-ca8a74df
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer flume_mesos10-1454614328204-ca8a74df try #0
16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to rebalance.
16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector: [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer flume_mesos10-1454614328204-ca8a74df try #0
16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test do started.
16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: kafka-source-test: Successfully registered new MBean.
16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: kafka-source-test started
--

--
Justin Alan Ryan
Sr. Systems / Release Engineer
ZipRealty