flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zhishan Li <zhishan...@gmail.com>
Subject Re: Kafka Source Error
Date Mon, 07 Dec 2015 10:00:48 GMT
Hi Gonzalo,


Thank you very much.  And I have another question:

Here is my metrics, It seems no data lost, right?

{
  "SOURCE.KafkaSource": {
    "KafkaEventGetTimer": "11916336",
    "Type": "SOURCE",
    "EventAcceptedCount": "28938986",
    "AppendReceivedCount": "0",
    "EventReceivedCount": "28938986",
    "KafkaCommitTimer": "0",
    "KafkaEmptyCount": "0",
    "OpenConnectionCount": "0",
    "AppendBatchReceivedCount": "0",
    "AppendBatchAcceptedCount": "0",
    "StopTime": "0",
    "StartTime": "1449463617288",
    "AppendAcceptedCount": "0"
  },
  "SINK.k4": {
    "ConnectionFailedCount": "0",
    "BatchCompleteCount": "0",
    "EventDrainAttemptCount": "9",
    "ConnectionCreatedCount": "1",
    "Type": "SINK",
    "BatchEmptyCount": "823",
    "ConnectionClosedCount": "0",
    "EventDrainSuccessCount": "9",
    "StopTime": "0",
    "StartTime": "1449463610232",
    "BatchUnderflowCount": "9"
  },
  "CHANNEL.c1": {
    "EventPutSuccessCount": "28938977",
    "ChannelFillPercentage": "4.885",
    "Type": "CHANNEL",
    "EventPutAttemptCount": "28938977",
    "ChannelSize": "977",
    "StopTime": "0",
    "StartTime": "1449463610228",
    "EventTakeSuccessCount": "28938000",
    "ChannelCapacity": "20000",
    "EventTakeAttemptCount": "28938979"
  },
  "SINK.k1": {
    "ConnectionFailedCount": "0",
    "BatchCompleteCount": "28938",
    "EventDrainAttemptCount": "28938977",
    "ConnectionCreatedCount": "167",
    "Type": "SINK",
    "BatchEmptyCount": "1",
    "ConnectionClosedCount": "159",
    "EventDrainSuccessCount": "28938000",
    "StopTime": "0",
    "StartTime": "1449463610233",
    "BatchUnderflowCount": "0"
  },
  "CHANNEL.c2": {
    "EventPutSuccessCount": "9",
    "ChannelFillPercentage": "0.0",
    "Type": "CHANNEL",
    "EventPutAttemptCount": "9",
    "ChannelSize": "0",
    "StopTime": "0",
    "StartTime": "1449463610228",
    "EventTakeSuccessCount": "9",
    "ChannelCapacity": "3000",
    "EventTakeAttemptCount": "841"
  }
}


The pipeline: KafkaSource -> c1 -> k1
		     KafkaSource  -> c2 -> k4

But I don’t know why the counter name not match with the source code? For example:
"EventReceivedCount": "28938986",
And in KafKaSource.java
line 127: counter.addToEventReceivedCount(Long.valueOf(eventList.size()));


it calls SourceCounter.java
public long addToEventReceivedCount(long delta) {
  return addAndGet(COUNTER_EVENTS_RECEIVED, delta);
}
The COUNTER_EVENTS_RECEIVED defined in SourceCounter.java as below:
private static final String COUNTER_EVENTS_RECEIVED =
    "src.events.received";


So how the counter name is changed from source code to monitor metrics?

Thanks.




> On 7 Dec, 2015, at 5:37 pm, Gonzalo Herreros <gherreros@gmail.com> wrote:
> 
> Kafka consumers keep track of the progress made (offset) in Zookeeper (there is an option
in the newer versions to change that but Flume still uses the old way).
> What happens there is that when the consumer tries to get messages from the offset it
knows but Kafka comes back saying that offset is not present and it requests the client to
"reset" the offset (not sure if it resets to the oldest or newest).
> 
> That might indicate either a conflict because some other kafka cluster is using the same
Zookeeper and groupId, or that the kafka retention is so low that messages in the queue get
deleted before they can be processed. (A third option is that the Kafka cluster is completely
messed up).
> 
> In my view, this is a Kafka issue and not Flume's, try to troubleshoot Kafka first.
> When you say "messages lost in the flume pipeline", do you mean that error you are getting
or you have some other issue?
> 
> 
> On 7 December 2015 at 09:02, Zhishan Li <zhishanlee@gmail.com <mailto:zhishanlee@gmail.com>>
wrote:
> Hi Gonzalo,
> 
> Thanks for your reply. But I still can not figure out why the value of offset is frequently
reset. I am sure, the groupId is unique for my kafka cluster.
> 
> I find that some messages lost in flume pipeline.  But I don’t know the reason. Please
do me a favour. 
> 
> Thanks,
> 
> 
> 
>> On 7 Dec, 2015, at 4:06 pm, Gonzalo Herreros <gherreros@gmail.com <mailto:gherreros@gmail.com>>
wrote:
>> 
>> What that means is that the KafkaSource is trying to read messages from the last
time it was running (or at least the last time some client used kafka with the same groupId)
but they have been already deleted by Kafka so is working you that there are messages that
have been missed.
>> Even if is the first time you use the KafkaSource, maybe somebody used a Kafka consumer
with the same groupId long ago. It's better if you make up your own groupId so you don't have
strange conflicts.
>> 
>> Regards,
>> Gonzalo
>> 
>> 
>> On 7 December 2015 at 04:37, Zhishan Li <zhishanlee@gmail.com <mailto:zhishanlee@gmail.com>>
wrote:
>> When I use KafkaSource, the following error is raised:
>> 
>> 	07 Dec 2015 04:13:11,571 ERROR [ConsumerFetcherThread-] (kafka.utils.Logging$class.error:97)
 - Current offset 482243452 for partition [5] out of range; reset offset to 483146676	
>> 	Current offset 482243452 for partition [log,5] out of range; reset offset to 483146676
>> 	consumed offset: 482243452 doesn't match fetch offset: 483146676 for log:5: fetched
offset = 483147611: consumed offset = 482243452;
>>  	Consumer may lose data
>> 
>> But the default configuration of KafkaSource is used. 
>> 
>> What happens during the agent running?
>> 
>> Thanks
>> 
>> 
> 
> 


Mime
View raw message