hi foo

it seems your stack showed exception was caused by kafka itself 
Failed to add leader for partitions

I have used kafka sink and source of flume 1.6 for several weeks and it works well.

Could you please try to use kafka console producer first to test if the partitionis okay or not?
Frank Yao 
@Vipshop, Shanghai
from iPhone

在 2014年12月30日,04:21,Foo Lim <foo.lim@vungle.com> 写道:

BTW, I followed the directions & ran

~/flume-ng-kafka-sink$ mvn clean install

On Mon, Dec 29, 2014 at 12:10 PM, Foo Lim <foo.lim@vungle.com> wrote:
Hi Gwen,

Thanks for the reply.

I'll try the CDH jar file. Where do I put it in the flume directory structure?

I have kafka_2.9.2-0.8.1.1 running. Here's the test error (which keeps
repeating) in the project
git@github.com:thilinamb/flume-ng-kafka-sink.git

[2014-12-29 20:02:34,028] INFO Verifying properties
(kafka.utils.VerifiableProperties)
[2014-12-29 20:02:34,029] INFO Property client.id is overridden to
group_1 (kafka.utils.VerifiableProperties)
[2014-12-29 20:02:34,030] INFO Property metadata.broker.list is
overridden to vagrant-ubuntu-precise-64:50753
(kafka.utils.VerifiableProperties)
[2014-12-29 20:02:34,030] INFO Property request.timeout.ms is
overridden to 30000 (kafka.utils.VerifiableProperties)
[2014-12-29 20:02:34,031] INFO Fetching metadata from broker
id:0,host:vagrant-ubuntu-precise-64,port:50753 with correlation id 18
for 1 topic(s) Set(custom-topic) (kafka.client.ClientUtils$)
[2014-12-29 20:02:34,032] INFO Connected to
vagrant-ubuntu-precise-64:50753 for producing
(kafka.producer.SyncProducer)
[2014-12-29 20:02:34,035] INFO Disconnecting from
vagrant-ubuntu-precise-64:50753 (kafka.producer.SyncProducer)
[2014-12-29 20:02:34,036] INFO Closing socket connection to
/10.0.2.15. (kafka.network.Processor)
[2014-12-29 20:02:34,038] WARN [KafkaApi-0] Offset request with
correlation id 0 from client
group_1-ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0
on partition [custom-topic,1] failed due to Leader not local for
partition [custom-topic,1] on broker 0 (kafka.server.KafkaApis)
[2014-12-29 20:02:34,040] WARN
[group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-leader-finder-thread],
Failed to add leader for partitions [custom-topic,1],[custom-topic,0];
will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
kafka.common.NotLeaderForPartitionException
at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160)
at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179)
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174)
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2014-12-29 20:02:34,045] INFO
[ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0],
Shutting down (kafka.consumer.ConsumerFetcherThread)
[2014-12-29 20:02:34,039] INFO
[ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0],
Starting  (kafka.consumer.ConsumerFetcherThread)
[2014-12-29 20:02:34,046] INFO
[ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0],
Shutdown completed (kafka.consumer.ConsumerFetcherThread)
[2014-12-29 20:02:34,047] INFO Closing socket connection to
/10.0.2.15. (kafka.network.Processor)
[2014-12-29 20:02:34,048] INFO
[ConsumerFetcherThread-group_1_vagrant-ubuntu-precise-64-1419883349017-325c06d5-0-0],
Stopped  (kafka.consumer.ConsumerFetcherThread)

On Fri, Dec 26, 2014 at 4:06 PM, Gwen Shapira <gshapira@cloudera.com> wrote:
I can't say when's the 1.6 release, but I have other solutions :)

1. The packages that are part of CDH5.3 release will contain that jar.
Perhaps use this distro? Or even just get the RPM, unpackage and dig the jar
out?
2. Let us know what's the compilation error, perhaps we can help there?

On Fri, Dec 26, 2014 at 3:30 PM, Foo Lim <foo.lim@vungle.com> wrote:

Hi all,

Happy holidays! Just wondering if there's any ETA on a 1.6 release.
Looking forward to the kafka sink plugin that I can't get to compile
independently. :-/

Thanks!