flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Justin Ryan <jur...@ziprealty.com>
Subject Re: KafkaSource not picking up any messages
Date Mon, 08 Feb 2016 21:48:51 GMT
This was actually generating errors, the memory channel had been configured
with very low capacity thinking that would force it to flush more often or
something, and was fixed.

Now I¹m on to a challenge I think I understand just fine, or fine enough:
HDFS permissions.

Thanks, Gonzalo for input and for being a sounding board!

Justin

From:  Justin Ryan <juryan@ziprealty.com>
Date:  Monday, February 8, 2016 at 11:52 AM
To:  <user@flume.apache.org>
Subject:  Re: KafkaSource not picking up any messages

So it looks like Flume is building against Kafka 2.10-0.8.1.1, and I¹m using
2.10-0.8.2.2, which kafka-mesos leans on.

Any likelihood of this being incompatible?

I did notice this morning that Flume has an ³EventReceivedCount² that
increments 1-2k at a time and is always a round number, but the
EventAcceptedCount is 0.

When I pipe dmesg to the console producer, and then try to read it with the
console consumer, I have to tell it to start from the beginning, so it seems
like flume may be reading events and moving the marker, but not doing
anything with them.

‹
{
   "CHANNEL.hdfs-channel-kafka" : {
      "EventTakeAttemptCount" : "117",
      "EventTakeSuccessCount" : "0",
      "StartTime" : "1454959470045",
      "ChannelFillPercentage" : "0.0",
      "ChannelSize" : "0",
      "EventPutSuccessCount" : "0",
      "Type" : "CHANNEL",
      "EventPutAttemptCount" : "1826",
      "ChannelCapacity" : "10",
      "StopTime" : "0"
   },
   "SOURCE.kafka-source-test" : {
      "StartTime" : "1454959471241",
      "OpenConnectionCount" : "0",
      "AppendBatchReceivedCount" : "0",
      "EventReceivedCount" : "166000",
      "KafkaCommitTimer" : "0",
      "AppendBatchAcceptedCount" : "0",
      "AppendReceivedCount" : "0",
      "StopTime" : "0",
      "KafkaEventGetTimer" : "17114",
      "KafkaEmptyCount" : "0",
      "EventAcceptedCount" : "0",
      "Type" : "SOURCE",
      "AppendAcceptedCount" : "0"
   },
   "SINK.hdfs-sink-kafka" : {
      "ConnectionFailedCount" : "0",
      "StartTime" : "1454959470054",
      "BatchCompleteCount" : "0",
      "ConnectionClosedCount" : "0",
      "EventDrainSuccessCount" : "0",
      "BatchEmptyCount" : "117",
      "BatchUnderflowCount" : "0",
      "Type" : "SINK",
      "ConnectionCreatedCount" : "0",
      "StopTime" : "0",
      "EventDrainAttemptCount" : "0"
   }
}
--

From:  Gonzalo Herreros <gherreros@gmail.com>
Reply-To:  <user@flume.apache.org>
Date:  Monday, February 8, 2016 at 12:29 AM
To:  user <user@flume.apache.org>
Subject:  Re: KafkaSource not picking up any messages

The only thing I can think of is that the kafka client included in Kafka is
not compatible with the kafka version on the brokers (there's been a lot of
changes recently), but normally you get errors when that happens.

On 5 February 2016 at 20:02, Justin Ryan <juryan@ziprealty.com> wrote:
> Thanks, Gonzalo ­ that def helped!
> 
> This also ties into an issue I¹d raised with mesos-kafka where the zk path
> seemed to be ignored, and I now see that there is a node that stores the
> mesos-kafka scheduler config, and the kafka path must be specified separately,
> so is currently Œ/Œ.
> 
> Still not reading events, but definitely looks better in startup log:
> 
> 16/02/05 11:55:38 INFO kafka.KafkaSource: Kafka source kafka-source-test do
> started.
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
> [flume_mesos04-1454702137146-6cd63609-leader-finder-thread], Starting
> 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Monitored
> counter group for type: SOURCE, name: kafka-source-test: Successfully
> registered new MBean.
> 16/02/05 11:55:38 INFO instrumentation.MonitoredCounterGroup: Component type:
> SOURCE, name: kafka-source-test started
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Verifying properties
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property client.id
> <http://client.id>  is overridden to flume
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property
> metadata.broker.list is overridden to
> mesos01:31000,mesos02:31000,mesos08:31000
> 16/02/05 11:55:38 INFO utils.VerifiableProperties: Property request.timeout.ms
> <http://request.timeout.ms>  is overridden to 30000
> 16/02/05 11:55:38 INFO client.ClientUtils$: Fetching metadata from broker
> id:1,host:mesos02,port:31000 with correlation id 0 for 1 topic(s)
> Set(home_views)
> 16/02/05 11:55:38 INFO producer.SyncProducer: Connected to mesos02:31000 for
> producing
> 16/02/05 11:55:38 INFO producer.SyncProducer: Disconnecting from mesos02:31000
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherThread:
> [ConsumerFetcherThread-flume_mesos04-1454702137146-6cd63609-0-0], Starting
> 16/02/05 11:55:38 INFO consumer.ConsumerFetcherManager:
> [ConsumerFetcherManager-1454702137389] Added fetcher for partitions
> ArrayBuffer([[home_views,0], initOffset -1 to broker
> id:0,host:mesos01,port:31000] )
> 
> ‹
> 
> $ curl http://mesos04:34545/metrics | json_pp
>   % Total    % Received % Xferd  Average Speed   Time    Time     Time
> Current
>                                  Dload  Upload   Total   Spent    Left  Speed
> 100   925    0   925    0     0   7741      0 --:--:-- --:--:-- --:--:--  7773
> {
>    "CHANNEL.hdfs-channel-kafka" : {
>       "ChannelCapacity" : "10",
>       "StartTime" : "1454702136681",
>       "EventTakeSuccessCount" : "0",
>       "ChannelFillPercentage" : "0.0",
>       "EventPutAttemptCount" : "0",
>       "EventTakeAttemptCount" : "14",
>       "StopTime" : "0",
>       "ChannelSize" : "0",
>       "EventPutSuccessCount" : "0",
>       "Type" : "CHANNEL"
>    },
>    "SOURCE.kafka-source-test" : {
>       "AppendBatchReceivedCount" : "0",
>       "AppendAcceptedCount" : "0",
>       "KafkaEmptyCount" : "0",
>       "AppendReceivedCount" : "0",
>       "KafkaEventGetTimer" : "18046",
>       "EventAcceptedCount" : "0",
>       "StartTime" : "1454702138033",
>       "StopTime" : "0",
>       "KafkaCommitTimer" : "0",
>       "Type" : "SOURCE",
>       "AppendBatchAcceptedCount" : "0",
>       "EventReceivedCount" : "0",
>       "OpenConnectionCount" : "0"
>    },
>    "SINK.hdfs-sink-kafka" : {
>       "ConnectionCreatedCount" : "0",
>       "EventDrainAttemptCount" : "0",
>       "BatchCompleteCount" : "0",
>       "StartTime" : "1454702136714",
>       "Type" : "SINK",
>       "EventDrainSuccessCount" : "0",
>       "StopTime" : "0",
>       "BatchUnderflowCount" : "0",
>       "ConnectionFailedCount" : "0",
>       "BatchEmptyCount" : "13",
>       "ConnectionClosedCount" : "0"
>    }
> }
> 
> 
> From:  Gonzalo Herreros <gherreros@gmail.com>
> Reply-To:  <user@flume.apache.org>
> Date:  Thursday, February 4, 2016 at 11:15 PM
> To:  user <user@flume.apache.org>
> Subject:  Re: KafkaSource not picking up any messages
> 
> I'm concerned with the warning "no brokers found when trying to rebalance"
> Double check that the path in zookeeper is correct zk01:2181/mesos-kafka and
> it's not the standard /kafka
> 
> When you connect with the kafka-console-consumer, do you specify /mesos-kafka
> or just zk01:2181?
> You can use the zkclient tool to check if there are brokers currently
> registered under that path for the topic "test"
> 
> Regards,
> Gonzalo
> 
> 
> On 4 February 2016 at 21:16, Justin Ryan <juryan@ziprealty.com> wrote:
>> Hiya folks,
>> 
>> I¹m setting up a new environment with Kafka, Flume, and HDFS, and have
>> implemented the simplest possible testing configuration I can come up with.
>> It logs successfully configuring and starting the KafkaSource, and with kafka
>> tools I can confirm that messages have been sent, but the JSON Metrics from
>> Flume show 0 messages processed.
>> 
>> Are there any more tools at my disposal to investigate? Any assistance would
>> be greatly appreciated!
>> 
>> My config and log:
>> 
>> ‹
>> # generated by Chef for mesos10, changes will be overwritten
>> 
>> flume1.sources=kafka-source-test
>> flume1.channels=hdfs-channel-kafka
>> flume1.sinks=hdfs-sink-kafka
>> flume1.sources.kafka-source-test.type=org.apache.flume.source.kafka.KafkaSour
>> ce
>> flume1.sources.kafka-source-test.zookeeperConnect=zk01:2181/mesos-kafka
>> flume1.sources.kafka-source-test.topic=test
>> flume1.sources.kafka-source-test.groupId=flume
>> flume1.sources.kafka-source-test.interceptors=i1
>> flume1.sources.kafka-source-test.interceptors.i1.type=timestamp
>> flume1.sources.kafka-source-test.consumer.timeout.ms
>> <http://flume1.sources.kafka-source-test.consumer.timeout.ms> =100
>> flume1.sources.kafka-source-test.channels=hdfs-channel-kafka
>> flume1.channels.hdfs-channel-kafka.type=memory
>> flume1.sinks.hdfs-sink-kafka.channel=hdfs-channel-kafka
>> flume1.sinks.hdfs-sink-kafka.type=hdfs
>> flume1.sinks.hdfs-sink-kafka.hdfs.path=/tmp/kafka/%{topic}/%y-%m-%d
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollInterval=5
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollCount=0
>> flume1.sinks.hdfs-sink-kafka.hdfs.rollSize=0
>> flume1.sinks.hdfs-sink-kafka.hdfs.fileType=DataStream
>> flume1.channels.hdfs-channel-kafka.capacity=10
>> flume1.channels.hdfs-channel-kafka.transactionCapacity=10
>> ‹
>> 
>> Startup log (less incredibly long path lines):
>> ‹
>> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
>> Configuration provider starting
>> 16/02/04 11:32:07 INFO node.PollingPropertiesFileConfigurationProvider:
>> Reloading configuration file:/etc/flume/conf.chef/flume.conf
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Added sinks: hdfs-sink-kafka
>> Agent: flume1
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Processing:hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO conf.FlumeConfiguration: Post-validation flume
>> configuration contains configuration for agents: [flume1]
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Creating channels
>> 16/02/04 11:32:07 INFO channel.DefaultChannelFactory: Creating instance of
>> channel hdfs-channel-kafka type memory
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Created channel
>> hdfs-channel-kafka
>> 16/02/04 11:32:07 INFO source.DefaultSourceFactory: Creating instance of
>> source kafka-source-test, type org.apache.flume.source.kafka.KafkaSource
>> 16/02/04 11:32:07 INFO kafka.KafkaSourceUtil: context={
>> parameters:{interceptors.i1.type=timestamp,
>> zookeeperConnect=zk01:2181/mesos-kafka, channels=hdfs-channel-kafka,
>> groupId=flume, consumer.timeout.ms <http://consumer.timeout.ms> =100,
>> topic=test, type=org.apache.flume.source.kafka.KafkaSource, interceptors=i1}
>> }
>> 16/02/04 11:32:07 INFO sink.DefaultSinkFactory: Creating instance of sink:
>> hdfs-sink-kafka, type: hdfs
>> 16/02/04 11:32:07 INFO node.AbstractConfigurationProvider: Channel
>> hdfs-channel-kafka connected to [kafka-source-test, hdfs-sink-kafka]
>> 16/02/04 11:32:07 INFO node.Application: Starting new configuration:{
>> sourceRunners:{kafka-source-test=PollableSourceRunner: {
>> source:org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state
>> :IDLE} counterGroup:{ name:null counters:{} } }}
>> sinkRunners:{hdfs-sink-kafka=SinkRunner: {
>> policy:org.apache.flume.sink.DefaultSinkProcessor@2f33f35e counterGroup:{
>> name:null counters:{} } }}
>> channels:{hdfs-channel-kafka=org.apache.flume.channel.MemoryChannel{name:
>> hdfs-channel-kafka}} }
>> 16/02/04 11:32:07 INFO node.Application: Starting Channel hdfs-channel-kafka
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: CHANNEL, name: hdfs-channel-kafka: Successfully
>> registered new MBean.
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type:
>> CHANNEL, name: hdfs-channel-kafka started
>> 16/02/04 11:32:07 INFO node.Application: Starting Sink hdfs-sink-kafka
>> 16/02/04 11:32:07 INFO node.Application: Starting Source kafka-source-test
>> 16/02/04 11:32:07 INFO kafka.KafkaSource: Starting
>> org.apache.flume.source.kafka.KafkaSource{name:kafka-source-test,state:IDLE}.
>> ..
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: SINK, name: hdfs-sink-kafka: Successfully registered
>> new MBean.
>> 16/02/04 11:32:07 INFO instrumentation.MonitoredCounterGroup: Component type:
>> SINK, name: hdfs-sink-kafka started
>> 16/02/04 11:32:07 INFO mortbay.log: Logging to
>> org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via
>> org.mortbay.log.Slf4jLog
>> 16/02/04 11:32:07 INFO mortbay.log: jetty-6.1.26.cloudera.4
>> 16/02/04 11:32:07 INFO mortbay.log: Started
>> SelectChannelConnector@0.0.0.0:34545
>> <http://SelectChannelConnector@0.0.0.0:34545>
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Verifying properties
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
>> auto.commit.enable is overridden to false
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property
>> consumer.timeout.ms <http://consumer.timeout.ms>  is overridden to 10
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property group.id
>> <http://group.id>  is overridden to flume
>> 16/02/04 11:32:08 INFO utils.VerifiableProperties: Property zookeeper.connect
>> is overridden to zk01:2181/mesos-kafka
>> 16/02/04 11:32:08 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], Connecting to zookeeper instance at
>> zk01:2181/mesos-kafka
>> 16/02/04 11:32:08 INFO zkclient.ZkEventThread: Starting ZkClient event
>> thread.
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:zookeeper.version=3.4.5-946--1, built on 05/18/2015 19:03 GMT
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:host.name
>> <http://host.name> =mesos10
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.version=1.8.0_72-internal
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.vendor=Oracle Corporation
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.io.tmpdir=/tmp
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:java.compiler=<NA>
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.name
>> <http://os.name> =Linux
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:os.version=3.13.0-63-generic
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client environment:user.name
>> <http://user.name> =marathon
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Client
>> environment:user.home=/opt/marathon
>> 16/02/04 11:32:08 INFO zookeeper.ZooKeeper: Initiating client connection,
>> connectString=zk01:2181/mesos-kafka sessionTimeout=6000
>> watcher=org.I0Itec.zkclient.ZkClient@2e1b7b98
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Opening socket connection to
>> server 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181>
>> . Will not attempt to authenticate using SASL (unknown error)
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Socket connection established to
>> 10.100.6.251/10.100.6.251:2181 <http://10.100.6.251/10.100.6.251:2181> ,
>> initiating session
>> 16/02/04 11:32:08 INFO zookeeper.ClientCnxn: Session establishment complete
>> on server 10.100.6.251/10.100.6.251:2181
>> <http://10.100.6.251/10.100.6.251:2181> , sessionid = 0x152858b1cc07491,
>> negotiated timeout = 6000
>> 16/02/04 11:32:08 INFO zkclient.ZkClient: zookeeper state changed
>> (SyncConnected)
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], begin registering consumer
>> flume_mesos10-1454614328204-ca8a74df in ZK
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], end registering consumer
>> flume_mesos10-1454614328204-ca8a74df in ZK
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], starting watcher executor thread for
>> consumer flume_mesos10-1454614328204-ca8a74df
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], begin rebalancing consumer
>> flume_mesos10-1454614328204-ca8a74df try #0
>> 16/02/04 11:32:09 WARN consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], no brokers found when trying to
>> rebalance.
>> 16/02/04 11:32:09 INFO consumer.ZookeeperConsumerConnector:
>> [flume_mesos10-1454614328204-ca8a74df], end rebalancing consumer
>> flume_mesos10-1454614328204-ca8a74df try #0
>> 16/02/04 11:32:09 INFO kafka.KafkaSource: Kafka source kafka-source-test do
>> started.
>> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Monitored
>> counter group for type: SOURCE, name: kafka-source-test: Successfully
>> registered new MBean.
>> 16/02/04 11:32:09 INFO instrumentation.MonitoredCounterGroup: Component type:
>> SOURCE, name: kafka-source-test started
>> --
>> 
>> --
>> Justin Alan Ryan
>> Sr. Systems / Release Engineer
>> ZipRealty
> 




Mime
View raw message