flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Frank Yao <baniu....@gmail.com>
Subject Re: Flume loss data when collect online data to hdfs
Date Sun, 25 Jan 2015 14:52:24 GMT
I have not read this whole thread. Only on my own condition, I'm one of
contributor of flume-kafka source and sink. In my company, we use flume to
get log from log files to kafka, and use flume to get data from kafka and
sink data to hdfs in about 600k nginx log per second in peak time. To
simplified this, it works like log_file->flume->kafka->flume->hdfs. When I
check the data loss between log_file and hdfs, I found there is NO data
loss of any line of log_file.



---------------
Baniu Yao
Shanghai



On Sun, Jan 25, 2015 at 12:28 AM, Jay Alexander <zyacer@gmail.com> wrote:

> I have found an insue for this already in jira:
> https://issues.apache.org/jira/browse/FLUME-2215
>
> 2015-01-24 10:04 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:
>
>> Can you a file a jira with this info? We can probably make the change.
>>
>> Thanks,
>> Hari
>>
>>
>> On Fri, Jan 23, 2015 at 2:07 AM, Alex <zyacer@gmail.com> wrote:
>>
>>>  Today, I retransfer all the data online, I find the data loss again and
>>> it's the same as last time. So I look into the file that is suspicious.
>>> I find a weird character, I use a java program to parse it, and it's an
>>> unicode two-char surrogate pair sequence, its code point is: 0x1F4AB.
>>>
>>> Then, I look into the source code:
>>> 1. Class: org.apache.flume.serialization.LineDeserializer
>>> The LineDeserializer use the
>>> "org.apache.flume.serialization.ResettableFileInputStream#readChar" to read
>>> one char, when it encounters the character "0x1F4AB", it returns -1, and
>>> the remain file after the character are skipped.
>>> 2. Class: org.apache.flume.serialization.ResettableFileInputStream
>>> the method
>>> org.apache.flume.serialization.ResettableFileInputStream#readChar snippet:
>>>     CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
>>> when the decoder decode the char "0x1F4AB" and the CoderResult is
>>> OVERFLOW, that is right because 0x1F4AB should be represented as two char.
>>>
>>> To solve this problem, I have a solution that is to implement a line
>>> deserializer that use
>>> "org.apache.flume.serialization.ResettableFileInputStream#read()" instead
>>> of "org.apache.flume.serialization.ResettableFileInputStream#readChar". But
>>> I am not sure it's a good solution.
>>>
>>> The attachment is a snippet of data with weird character at 2nd line.
>>>
>>> Any suggestions?
>>>
>>> Thanks,
>>> Alex
>>>
>>> On 1/22/2015 2:18 PM, Alex wrote:
>>>
>>> 1: In agent1, there is a "regex_extractor" interceptor for extracting
>>> header "dt"
>>>
>>>  #interceptors
>>>  agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
>>>
>>> agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
>>>  agent1.sources.src_spooldir.interceptors.i1.serializers=s1
>>>  agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name=dt
>>>
>>>   in agent2, the hdfs sink use the header in the path, this is the
>>> configurations:
>>>
>>> agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt}
>>>
>>> 2: I misunderstood this property, thank you for revision.
>>>
>>> Thanks,
>>> Alex
>>>
>>>
>>> On 1/22/2015 12:51 PM, Hari Shreedharan wrote:
>>>
>>>  1: How do you guarantee that the data from the previous day has not
>>> spilled over to the next day? Where are you inserting the timestamp (if you
>>> are doing bucketing).
>>> 2: Flume creates transactions for writes. Each batch defaults to 1000
>>> events, which are written  and flushed.  There is still only one
>>> transaction per sink, the pool size is for IO ops.
>>>
>>> Thanks,
>>> Hari
>>>
>>>
>>> On Wed, Jan 21, 2015 at 7:32 PM, Jay Alexander <zyacer@gmail.com> wrote:
>>>
>>>>  First Question: No, I query the all the file in hdfs had been closed,
>>>> exactly I account the data one day later.
>>>>
>>>> Second Question: I hadn't config any about the transaction. And I saw
>>>> there is an item in the hdfs sink configuration:"hdfs.threadsPoolSize
>>>> 10 Number of threads per HDFS sink for HDFS IO ops (open, write,
>>>> etc.)".
>>>> So there is 10 transactions per sink from the file channel.
>>>>
>>>> Thanks.
>>>>
>>>>
>>>> 2015-01-22 11:04 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>
>>>> :
>>>>
>>>>> Are you accounting for the data still being written but not yet
>>>>> hflushed at the time of the query? Basically one transaction per sink
?
>>>>>
>>>>> Thanks,
>>>>> Hari
>>>>>
>>>>>
>>>>> On Wed, Jan 21, 2015 at 6:42 PM, Jay Alexander <zyacer@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>  I used *flume-ng 1.5* version to collect logs.
>>>>>>
>>>>>> There are two agents in the data flow and they are on two hosts,
>>>>>> respectively.
>>>>>>
>>>>>> And the data is sended *from agent1 to agent2.*
>>>>>>
>>>>>> The agents's component is as follows:
>>>>>>
>>>>>> agent1: spooling dir source --> file channel --> avro sink
>>>>>> agent2: avro source --> file channel --> hdfs sink
>>>>>>
>>>>>> But it seems to loss data about 1/1000 percentage of million data.To
>>>>>> solve problem I tried these steps:
>>>>>>
>>>>>>    1. look up agents log: cannot find any error or exception.
>>>>>>    2. look up agents monitor metrics: the events number that put
and
>>>>>>    take from channel always equals
>>>>>>    3. statistic the data number by hive query and hdfs file use
>>>>>>    shell, respectively: the two number is equal and less than the
online data
>>>>>>    number
>>>>>>
>>>>>>
>>>>>> These are the two agents configuration:
>>>>>>
>>>>>>  #agent1
>>>>>>  agent1.sources = src_spooldir
>>>>>>  agent1.channels = chan_file
>>>>>>  agent1.sinks = sink_avro
>>>>>>
>>>>>>  #source
>>>>>>  agent1.sources.src_spooldir.type = spooldir
>>>>>>  agent1.sources.src_spooldir.spoolDir = /data/logs/flume-spooldir
>>>>>>  agent1.sources.src_spooldir.interceptors=i1
>>>>>>
>>>>>>  #interceptors
>>>>>>  agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
>>>>>>
>>>>>> agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
>>>>>>  agent1.sources.src_spooldir.interceptors.i1.serializers=s1
>>>>>>   agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name=dt
>>>>>>
>>>>>>  #sink
>>>>>>  agent1.sinks.sink_avro.type = avro
>>>>>>  agent1.sinks.sink_avro.hostname = 10.235.2.212
>>>>>>  agent1.sinks.sink_avro.port = 9910
>>>>>>
>>>>>>  #channel
>>>>>>  agent1.channels.chan_file.type = file
>>>>>>  agent1.channels.chan_file.checkpointDir =
>>>>>> /data/flume/agent1/checkpoint
>>>>>>  agent1.channels.chan_file.dataDirs = /data/flume/agent1/data
>>>>>>
>>>>>>  agent1.sources.src_spooldir.channels = chan_file
>>>>>>  agent1.sinks.sink_avro.channel = chan_file
>>>>>>
>>>>>>
>>>>>>
>>>>>>   # agent2
>>>>>>  agent2.sources  = source1
>>>>>>  agent2.channels = channel1
>>>>>>  agent2.sinks    = sink1
>>>>>>
>>>>>>  # source
>>>>>>  agent2.sources.source1.type     = avro
>>>>>>  agent2.sources.source1.bind     = 10.235.2.212
>>>>>>  agent2.sources.source1.port     = 9910
>>>>>>
>>>>>>  # sink
>>>>>>  agent2.sinks.sink1.type= hdfs
>>>>>>  agent2.sinks.sink1.hdfs.fileType = DataStream
>>>>>>  agent2.sinks.sink1.hdfs.filePrefix = log
>>>>>>  agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt}
>>>>>>  agent2.sinks.sink1.hdfs.rollInterval = 600
>>>>>>  agent2.sinks.sink1.hdfs.rollSize = 0
>>>>>>  agent2.sinks.sink1.hdfs.rollCount = 0
>>>>>>  agent2.sinks.sink1.hdfs.idleTimeout = 300
>>>>>>  agent2.sinks.sink1.hdfs.round = true
>>>>>>  agent2.sinks.sink1.hdfs.roundValue = 10
>>>>>>  agent2.sinks.sink1.hdfs.roundUnit = minute
>>>>>>
>>>>>>  # channel
>>>>>>  agent2.channels.channel1.type   = file
>>>>>>  agent2.channels.channel1.checkpointDir =
>>>>>> /data/flume/agent2/checkpoint
>>>>>>  agent2.channels.channel1.dataDirs = /data/flume/agent2/data
>>>>>>  agent2.sinks.sink1.channel      = channel1
>>>>>>  agent2.sources.source1.channels = channel1
>>>>>>
>>>>>>
>>>>>>  Any suggestions are welcome!
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> <t4.log>
>>
>>
>>
>

Mime
View raw message