flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Alexander <zya...@gmail.com>
Subject Re: Flume loss data when collect online data to hdfs
Date Sat, 24 Jan 2015 16:28:12 GMT
I have found an insue for this already in jira:
https://issues.apache.org/jira/browse/FLUME-2215

2015-01-24 10:04 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:

> Can you a file a jira with this info? We can probably make the change.
>
> Thanks,
> Hari
>
>
> On Fri, Jan 23, 2015 at 2:07 AM, Alex <zyacer@gmail.com> wrote:
>
>>  Today, I retransfer all the data online, I find the data loss again and
>> it's the same as last time. So I look into the file that is suspicious.
>> I find a weird character, I use a java program to parse it, and it's an
>> unicode two-char surrogate pair sequence, its code point is: 0x1F4AB.
>>
>> Then, I look into the source code:
>> 1. Class: org.apache.flume.serialization.LineDeserializer
>> The LineDeserializer use the
>> "org.apache.flume.serialization.ResettableFileInputStream#readChar" to read
>> one char, when it encounters the character "0x1F4AB", it returns -1, and
>> the remain file after the character are skipped.
>> 2. Class: org.apache.flume.serialization.ResettableFileInputStream
>> the method
>> org.apache.flume.serialization.ResettableFileInputStream#readChar snippet:
>>     CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
>> when the decoder decode the char "0x1F4AB" and the CoderResult is
>> OVERFLOW, that is right because 0x1F4AB should be represented as two char.
>>
>> To solve this problem, I have a solution that is to implement a line
>> deserializer that use
>> "org.apache.flume.serialization.ResettableFileInputStream#read()" instead
>> of "org.apache.flume.serialization.ResettableFileInputStream#readChar". But
>> I am not sure it's a good solution.
>>
>> The attachment is a snippet of data with weird character at 2nd line.
>>
>> Any suggestions?
>>
>> Thanks,
>> Alex
>>
>> On 1/22/2015 2:18 PM, Alex wrote:
>>
>> 1: In agent1, there is a "regex_extractor" interceptor for extracting
>> header "dt"
>>
>>  #interceptors
>>  agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
>>
>> agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
>>  agent1.sources.src_spooldir.interceptors.i1.serializers=s1
>>  agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name=dt
>>
>>   in agent2, the hdfs sink use the header in the path, this is the
>> configurations:
>>
>> agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt}
>>
>> 2: I misunderstood this property, thank you for revision.
>>
>> Thanks,
>> Alex
>>
>>
>> On 1/22/2015 12:51 PM, Hari Shreedharan wrote:
>>
>>  1: How do you guarantee that the data from the previous day has not
>> spilled over to the next day? Where are you inserting the timestamp (if you
>> are doing bucketing).
>> 2: Flume creates transactions for writes. Each batch defaults to 1000
>> events, which are written  and flushed.  There is still only one
>> transaction per sink, the pool size is for IO ops.
>>
>> Thanks,
>> Hari
>>
>>
>> On Wed, Jan 21, 2015 at 7:32 PM, Jay Alexander <zyacer@gmail.com> wrote:
>>
>>>  First Question: No, I query the all the file in hdfs had been closed,
>>> exactly I account the data one day later.
>>>
>>> Second Question: I hadn't config any about the transaction. And I saw
>>> there is an item in the hdfs sink configuration:"hdfs.threadsPoolSize 10 Number
>>> of threads per HDFS sink for HDFS IO ops (open, write, etc.)".
>>> So there is 10 transactions per sink from the file channel.
>>>
>>> Thanks.
>>>
>>>
>>> 2015-01-22 11:04 GMT+08:00 Hari Shreedharan <hshreedharan@cloudera.com>:
>>>
>>>> Are you accounting for the data still being written but not yet
>>>> hflushed at the time of the query? Basically one transaction per sink ?
>>>>
>>>> Thanks,
>>>> Hari
>>>>
>>>>
>>>> On Wed, Jan 21, 2015 at 6:42 PM, Jay Alexander <zyacer@gmail.com>
>>>> wrote:
>>>>
>>>>>  I used *flume-ng 1.5* version to collect logs.
>>>>>
>>>>> There are two agents in the data flow and they are on two hosts,
>>>>> respectively.
>>>>>
>>>>> And the data is sended *from agent1 to agent2.*
>>>>>
>>>>> The agents's component is as follows:
>>>>>
>>>>> agent1: spooling dir source --> file channel --> avro sink
>>>>> agent2: avro source --> file channel --> hdfs sink
>>>>>
>>>>> But it seems to loss data about 1/1000 percentage of million data.To
>>>>> solve problem I tried these steps:
>>>>>
>>>>>    1. look up agents log: cannot find any error or exception.
>>>>>    2. look up agents monitor metrics: the events number that put and
>>>>>    take from channel always equals
>>>>>    3. statistic the data number by hive query and hdfs file use
>>>>>    shell, respectively: the two number is equal and less than the online
data
>>>>>    number
>>>>>
>>>>>
>>>>> These are the two agents configuration:
>>>>>
>>>>>  #agent1
>>>>>  agent1.sources = src_spooldir
>>>>>  agent1.channels = chan_file
>>>>>  agent1.sinks = sink_avro
>>>>>
>>>>>  #source
>>>>>  agent1.sources.src_spooldir.type = spooldir
>>>>>  agent1.sources.src_spooldir.spoolDir = /data/logs/flume-spooldir
>>>>>  agent1.sources.src_spooldir.interceptors=i1
>>>>>
>>>>>  #interceptors
>>>>>  agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
>>>>>
>>>>> agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
>>>>>  agent1.sources.src_spooldir.interceptors.i1.serializers=s1
>>>>>   agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name=dt
>>>>>
>>>>>  #sink
>>>>>  agent1.sinks.sink_avro.type = avro
>>>>>  agent1.sinks.sink_avro.hostname = 10.235.2.212
>>>>>  agent1.sinks.sink_avro.port = 9910
>>>>>
>>>>>  #channel
>>>>>  agent1.channels.chan_file.type = file
>>>>>  agent1.channels.chan_file.checkpointDir =
>>>>> /data/flume/agent1/checkpoint
>>>>>  agent1.channels.chan_file.dataDirs = /data/flume/agent1/data
>>>>>
>>>>>  agent1.sources.src_spooldir.channels = chan_file
>>>>>  agent1.sinks.sink_avro.channel = chan_file
>>>>>
>>>>>
>>>>>
>>>>>   # agent2
>>>>>  agent2.sources  = source1
>>>>>  agent2.channels = channel1
>>>>>  agent2.sinks    = sink1
>>>>>
>>>>>  # source
>>>>>  agent2.sources.source1.type     = avro
>>>>>  agent2.sources.source1.bind     = 10.235.2.212
>>>>>  agent2.sources.source1.port     = 9910
>>>>>
>>>>>  # sink
>>>>>  agent2.sinks.sink1.type= hdfs
>>>>>  agent2.sinks.sink1.hdfs.fileType = DataStream
>>>>>  agent2.sinks.sink1.hdfs.filePrefix = log
>>>>>  agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt}
>>>>>  agent2.sinks.sink1.hdfs.rollInterval = 600
>>>>>  agent2.sinks.sink1.hdfs.rollSize = 0
>>>>>  agent2.sinks.sink1.hdfs.rollCount = 0
>>>>>  agent2.sinks.sink1.hdfs.idleTimeout = 300
>>>>>  agent2.sinks.sink1.hdfs.round = true
>>>>>  agent2.sinks.sink1.hdfs.roundValue = 10
>>>>>  agent2.sinks.sink1.hdfs.roundUnit = minute
>>>>>
>>>>>  # channel
>>>>>  agent2.channels.channel1.type   = file
>>>>>  agent2.channels.channel1.checkpointDir =
>>>>> /data/flume/agent2/checkpoint
>>>>>  agent2.channels.channel1.dataDirs = /data/flume/agent2/data
>>>>>  agent2.sinks.sink1.channel      = channel1
>>>>>  agent2.sources.source1.channels = channel1
>>>>>
>>>>>
>>>>>  Any suggestions are welcome!
>>>>>
>>>>
>>>>
>>>
>>
>>
>> <t4.log>
>
>
>

Mime
View raw message