flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ping PW Wang" <wpw...@cn.ibm.com>
Subject Re: how to make KafkaSource consume the existing messages
Date Mon, 17 Oct 2016 07:33:46 GMT
<div class="socmaildefaultfont" dir="ltr" style="font-family:&quot;Helvetica Neue&quot;,
Helvetica, Arial, sans-serif;font-size:10.5pt" ><div dir="ltr" ><font face="Arial"
size="2" >Hi Attila, Hi </font><font size="2" >Chris, </font><br><font
face="Arial" size="2" >Many thanks for your reply and good comments. </font><br><br><font
face="Arial" size="2" >Yes, besides New group.id and "auto.offset.reset=earliest", "enable.auto.commit=false"
is also needed to make KafkaSouce read messages from the beginning. &nbsp; After I added
the following lines into flume agent configuration, restarting flume then it worked as I expected.
</font><br><font face="Arial" size="2" >&nbsp;tier1.sources.source1.kafka.consumer.group.id
= group-new2</font><br><font face="Arial" size="2" >&nbsp;tier1.sources.source1.kafka.consumer.auto.offset.reset
= earliest</font><br><font face="Arial" size="2" >&nbsp;tier1.sources.source1.kafka.consumer.enable.auto.commit
= false</font><br><br><font face="Arial" size="2" >Thanks a lot for
help!</font><br><br><img alt="非活动的隐藏详细信息Attila Simon
---2016-10-13 下午 06:32:02---Hi, One more thing. If you switch to the new group.i" border="0"
height="16" src="/icons/graycol.gif" width="16" ><font color="#424282" size="2" >Attila
Simon ---2016-10-13 下午 06:32:02---Hi, One more thing. If you switch to the new group.id
and would like to</font><br><br><font color="#5F5F5F" size="2" >From:
</font><font size="2" >Attila Simon &lt;sati@cloudera.com&gt;</font><br><font
color="#5F5F5F" size="2" >To: </font><font size="2" >user@flume.apache.org,
dev@flume.apache.org</font><br><font color="#5F5F5F" size="2" >Date: </font><font
size="2" >2016-10-13 下午 06:32</font><br><font color="#5F5F5F" size="2"
>Subject: </font><font size="2" >Re: how to make KafkaSource consume the existing
<hr align="left" size="2" style="color:#8091A5; " width="100%" ><br><br><br><tt><font
face="" size="3" >Hi,<br><br>One more thing. If you switch to the new group.id
and would like to<br>maintain the read from beginning behaviour every time flume restart<br>then
you might try setting enable.auto.commit to false.<br>Again Kafka normally won't store
the events indefinitely.<br><br>Cheers,<br>Attila<br><br><br>On
Thu, Oct 13, 2016 at 11:45 AM, Attila Simon &lt;sati@cloudera.com&gt; wrote:<br>&gt;
for the records cc dev@<br>&gt;<br>&gt; On Thu, Oct 13, 2016 at 11:43
AM, Attila Simon &lt;sati@cloudera.com&gt; wrote:<br>&gt;&gt; Hi,<br>&gt;&gt;<br>&gt;&gt;
auto.offset.reset aim to handle failure scenarios when Flume lost the<br>&gt;&gt;
track of offsets. When Flume is able to successfully consume the<br>&gt;&gt;
messages it also commits the last processed offset. When failure<br>&gt;&gt;
happens and &lt;earliest&gt; was set resetting offset would use the last<br>&gt;&gt;
committed value.<br>&gt;&gt; I don't think that always starting from "zero"
offset would be<br>&gt;&gt; valuable (would result a lot of duplicates). So
I assume you would<br>&gt;&gt; like to have a recovery scenario. What you can
do is setting the<br>&gt;&gt; consumer group.id to something new so if kafka
still has the messages<br>&gt;&gt; - you can check that with command line kafka
consumer setting the<br>&gt;&gt; --from-beginning argument as kafka by default
purges them periodically<br>&gt;&gt; - then flume would reset the offset to
the effective beginning since<br>&gt;&gt; offsets are stored per group.id.<br>&gt;&gt;<br>&gt;&gt;
Quoted from Kafka docs<br>&gt;&gt; (</font></tt><tt><font
face="" size="3" ><a href="http://kafka.apache.org/documentation#newconsumerconfigs"
target="_blank" >http://kafka.apache.org/documentation#newconsumerconfigs</a></font></tt><tt><font
face="" size="3" >):<br>&gt;&gt; auto.offset.reset - What to do when there
is no initial offset in<br>&gt;&gt; Kafka or if the current offset does not
exist any more on the server<br>&gt;&gt; (e.g. because that data has been deleted):<br>&gt;&gt;<br>&gt;&gt;
earliest: automatically reset the offset to the earliest offset<br>&gt;&gt;
latest: automatically reset the offset to the latest offset<br>&gt;&gt; none:
throw exception to the consumer if no previous offset is found<br>&gt;&gt; for
the consumer's group<br>&gt;&gt; anything else: throw exception to the consumer.<br>&gt;&gt;<br>&gt;&gt;
Cheers,<br>&gt;&gt; Attila<br>&gt;&gt;<br>&gt;&gt;<br>&gt;&gt;
On Thu, Oct 13, 2016 at 10:00 AM, Ping PW Wang &lt;wpwang@cn.ibm.com&gt; wrote:<br>&gt;&gt;&gt;
Hi,<br>&gt;&gt;&gt; I used KafkaSource to consume the messages from Kafka.
I found only new<br>&gt;&gt;&gt; messages were received while the old existing
message not. I tried to use a<br>&gt;&gt;&gt; new consumer group and update
the parameter "auto.offset.reset = latest" to<br>&gt;&gt;&gt; "earliest",
but this does not work.<br>&gt;&gt;&gt;<br>&gt;&gt;&gt;
tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-new<br>&gt;&gt;&gt;
tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest<br>&gt;&gt;&gt;<br>&gt;&gt;&gt;
Anyone knows how to make KafkaSource consume the existing messages?<br>&gt;&gt;&gt;
Thanks a lot for any advice!<br>&gt;&gt;&gt;<br>&gt;&gt;&gt;
Best Regards,<br>&gt;&gt;&gt;<br>&gt;&gt;&gt; Wang Ping
(王苹)<br>&gt;&gt;&gt; InfoSphere BigInsights, CDL<br>&gt;&gt;&gt;
Email: wpwang@cn.ibm.com Phone: (8610)82453448 Mobile: (86)17090815725<br>&gt;&gt;&gt;
Address: Ring Bldg.No.28 Building,ZhongGuanCun Software Park,No.8 Dong Bei<br>&gt;&gt;&gt;
Wang West Road, Haidian District Beijing P.R.China 100193<br>&gt;&gt;&gt;
地址:北京市海淀区东北旺西路8号,中关村软件园28号楼 邮编:100193<br>&gt;&gt;&gt;</font></tt><br><br>&nbsp;</div></div><BR>

View raw message