flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hari Shreedharan" <hshreedha...@cloudera.com>
Subject Re: Flume loss data when collect online data to hdfs
Date Sat, 24 Jan 2015 02:04:05 GMT
Can you a file a jira with this info? We can probably make the change.




Thanks, Hari

On Fri, Jan 23, 2015 at 2:07 AM, Alex <zyacer@gmail.com> wrote:

> Today, I retransfer all the data online, I find the data loss again and 
> it's the same as last time. So I look into the file that is suspicious. 
> I find a weird character, I use a java program to parse it, and it's an 
> unicode two-char surrogate pair sequence, its code point is: 0x1F4AB.
> Then, I look into the source code:
> 1. Class: org.apache.flume.serialization.LineDeserializer
> The LineDeserializer use the 
> "org.apache.flume.serialization.ResettableFileInputStream#readChar" to 
> read one char, when it encounters the character "0x1F4AB", it returns 
> -1, and the remain file after the character are skipped.
> 2. Class: org.apache.flume.serialization.ResettableFileInputStream
> the method 
> org.apache.flume.serialization.ResettableFileInputStream#readChar snippet:
>      CoderResult res = decoder.decode(buf, charBuf, isEndOfInput);
> when the decoder decode the char "0x1F4AB" and the CoderResult is 
> OVERFLOW, that is right because 0x1F4AB should be represented as two char.
> To solve this problem, I have a solution that is to implement a line 
> deserializer that use 
> "org.apache.flume.serialization.ResettableFileInputStream#read()" 
> instead of 
> "org.apache.flume.serialization.ResettableFileInputStream#readChar". But 
> I am not sure it's a good solution.
> The attachment is a snippet of data with weird character at 2nd line.
> Any suggestions?
> Thanks,
> Alex
> On 1/22/2015 2:18 PM, Alex wrote:
>> 1: In agent1, there is a "regex_extractor" interceptor for extracting 
>> header "dt"
>>
>>     #interceptors
>>     agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
>>     agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
>>     agent1.sources.src_spooldir.interceptors.i1.serializers=s1
>>     agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name
>>     <http://agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name>=dt
>>
>>  in agent2, the hdfs sink use the header in the path, this is the 
>> configurations:
>>
>>     agent2.sinks.sink1.hdfs.path = hdfs://hnd.hadoop.jsh:8020/data/%{dt}
>>
>> 2: I misunderstood this property, thank you for revision.
>>
>> Thanks,
>> Alex
>>
>>
>> On 1/22/2015 12:51 PM, Hari Shreedharan wrote:
>>> 1: How do you guarantee that the data from the previous day has not 
>>> spilled over to the next day? Where are you inserting the timestamp 
>>> (if you are doing bucketing).
>>> 2: Flume creates transactions for writes. Each batch defaults to 1000 
>>> events, which are written  and flushed.  There is still only one 
>>> transaction per sink, the pool size is for IO ops.
>>>
>>> Thanks,
>>> Hari
>>>
>>>
>>> On Wed, Jan 21, 2015 at 7:32 PM, Jay Alexander <zyacer@gmail.com 
>>> <mailto:zyacer@gmail.com>> wrote:
>>>
>>>     First Question: No, I query the all the file in hdfs had been
>>>     closed, exactly I account the data one day later.
>>>
>>>     Second Question: I hadn't config any about the transaction. And I
>>>     saw there is an item in the hdfs sink
>>>     configuration:"hdfs.threadsPoolSize10Number of threads per HDFS
>>>     sink for HDFS IO ops (open, write, etc.)".
>>>     So there is 10 transactions per sink from the file channel.
>>>
>>>     Thanks.
>>>
>>>
>>>     2015-01-22 11:04 GMT+08:00 Hari Shreedharan
>>>     <hshreedharan@cloudera.com <mailto:hshreedharan@cloudera.com>>:
>>>
>>>         Are you accounting for the data still being written but not
>>>         yet hflushed at the time of the query? Basically one
>>>         transaction per sink ?
>>>
>>>         Thanks,
>>>         Hari
>>>
>>>
>>>         On Wed, Jan 21, 2015 at 6:42 PM, Jay Alexander
>>>         <zyacer@gmail.com <mailto:zyacer@gmail.com>> wrote:
>>>
>>>             I used *flume-ng 1.5* version to collect logs.
>>>
>>>             There are two agents in the data flow and they are on two
>>>             hosts, respectively.
>>>
>>>             And the data is sended *from agent1 to agent2.*
>>>
>>>             The agents's component is as follows:
>>>
>>>             agent1: spooling dir source --> file channel --> avro sink
>>>             agent2: avro source --> file channel --> hdfs sink
>>>
>>>             But it seems to loss data about 1/1000 percentage of
>>>             million data.To solve problem I tried these steps:
>>>
>>>              1. look up agents log: cannot find any error or exception.
>>>              2. look up agents monitor metrics: the events number
>>>                 that put and take from channel always equals
>>>              3. statistic the data number by hive query and hdfs file
>>>                 use shell, respectively: the two number is equal and
>>>                 less than the online data number
>>>
>>>
>>>             These are the two agents configuration:
>>>
>>>                 #agent1
>>>                 agent1.sources = src_spooldir
>>>                 agent1.channels = chan_file
>>>                 agent1.sinks = sink_avro
>>>
>>>                 #source
>>>                 agent1.sources.src_spooldir.type = spooldir
>>>                 agent1.sources.src_spooldir.spoolDir =
>>>                 /data/logs/flume-spooldir
>>>                 agent1.sources.src_spooldir.interceptors=i1
>>>
>>>                 #interceptors
>>>                 agent1.sources.src_spooldir.interceptors.i1.type=regex_extractor
>>>                 agent1.sources.src_spooldir.interceptors.i1.regex=(\\d{4}-\\d{2}-\\d{2}).*
>>>                 agent1.sources.src_spooldir.interceptors.i1.serializers=s1
>>>                 agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name
>>>                 <http://agent1.sources.src_spooldir.interceptors.i1.serializers.s1.name>=dt
>>>
>>>                 #sink
>>>                 agent1.sinks.sink_avro.type = avro
>>>                 agent1.sinks.sink_avro.hostname = 10.235.2.212
>>>                 agent1.sinks.sink_avro.port = 9910
>>>
>>>                 #channel
>>>                 agent1.channels.chan_file.type = file
>>>                 agent1.channels.chan_file.checkpointDir =
>>>                 /data/flume/agent1/checkpoint
>>>                 agent1.channels.chan_file.dataDirs =
>>>                 /data/flume/agent1/data
>>>
>>>                 agent1.sources.src_spooldir.channels = chan_file
>>>                 agent1.sinks.sink_avro.channel = chan_file
>>>
>>>
>>>
>>>                 # agent2
>>>                 agent2.sources  = source1
>>>                 agent2.channels = channel1
>>>                 agent2.sinks    = sink1
>>>
>>>                 # source
>>>                 agent2.sources.source1.type     = avro
>>>                 agent2.sources.source1.bind     = 10.235.2.212
>>>                 agent2.sources.source1.port     = 9910
>>>
>>>                 # sink
>>>                 agent2.sinks.sink1.type= hdfs
>>>                 agent2.sinks.sink1.hdfs.fileType = DataStream
>>>                 agent2.sinks.sink1.hdfs.filePrefix = log
>>>                 agent2.sinks.sink1.hdfs.path =
>>>                 hdfs://hnd.hadoop.jsh:8020/data/%{dt}
>>>                 agent2.sinks.sink1.hdfs.rollInterval = 600
>>>                 agent2.sinks.sink1.hdfs.rollSize = 0
>>>                 agent2.sinks.sink1.hdfs.rollCount = 0
>>>                 agent2.sinks.sink1.hdfs.idleTimeout = 300
>>>                 agent2.sinks.sink1.hdfs.round = true
>>>                 agent2.sinks.sink1.hdfs.roundValue = 10
>>>                 agent2.sinks.sink1.hdfs.roundUnit = minute
>>>
>>>                 # channel
>>>                 agent2.channels.channel1.type   = file
>>>                 agent2.channels.channel1.checkpointDir =
>>>                 /data/flume/agent2/checkpoint
>>>                 agent2.channels.channel1.dataDirs =
>>>                 /data/flume/agent2/data
>>>                 agent2.sinks.sink1.channel      = channel1
>>>                 agent2.sources.source1.channels = channel1
>>>
>>>
>>>             Any suggestions are welcome!
>>>
>>>
>>>
>>>
>>
Mime
View raw message