flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Attila Simon <s...@cloudera.com>
Subject Re: how to make KafkaSource consume the existing messages
Date Thu, 13 Oct 2016 09:45:46 GMT
for the records cc dev@

On Thu, Oct 13, 2016 at 11:43 AM, Attila Simon <sati@cloudera.com> wrote:
> Hi,
> auto.offset.reset aim to handle failure scenarios when Flume lost the
> track of offsets. When Flume is able to successfully consume the
> messages it also commits the last processed offset. When failure
> happens and <earliest> was set resetting offset would use the last
> committed value.
> I don't think that always starting from "zero" offset would be
> valuable (would result a lot of duplicates). So I assume you would
> like to have a recovery scenario. What you can do is setting the
> consumer group.id to something new so if kafka still has the messages
> - you can check that with command line kafka consumer setting the
> --from-beginning argument as kafka by default purges them periodically
> - then flume would reset the offset to the effective beginning since
> offsets are stored per group.id.
> Quoted from Kafka docs
> (http://kafka.apache.org/documentation#newconsumerconfigs):
> auto.offset.reset - What to do when there is no initial offset in
> Kafka or if the current offset does not exist any more on the server
> (e.g. because that data has been deleted):
> earliest: automatically reset the offset to the earliest offset
> latest: automatically reset the offset to the latest offset
> none: throw exception to the consumer if no previous offset is found
> for the consumer's group
> anything else: throw exception to the consumer.
> Cheers,
> Attila
> On Thu, Oct 13, 2016 at 10:00 AM, Ping PW Wang <wpwang@cn.ibm.com> wrote:
>> Hi,
>> I used KafkaSource to consume the messages from Kafka. I found only new
>> messages were received while the old existing message not. I tried to use a
>> new consumer group and update the parameter "auto.offset.reset = latest" to
>> "earliest", but this does not work.
>> tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-new
>> tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest
>> Anyone knows how to make KafkaSource consume the existing messages?
>> Thanks a lot for any advice!
>> Best Regards,
>> Wang Ping (王苹)
>> InfoSphere BigInsights, CDL
>> Email: wpwang@cn.ibm.com Phone: (8610)82453448 Mobile: (86)17090815725
>> Address: Ring Bldg.No.28 Building,ZhongGuanCun Software Park,No.8 Dong Bei
>> Wang West Road, Haidian District Beijing P.R.China 100193
>> 地址:北京市海淀区东北旺西路8号,中关村软件园28号楼 邮编:100193

View raw message