flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Connor Woodson <cwoodson....@gmail.com>
Subject Re: Question about gzip compression when using Flume Ng
Date Mon, 14 Jan 2013 22:52:50 GMT
Try adding:

collector102.sinks.sink1.hdfs.writeFormat = TEXT
collector102.sinks.sink2.hdfs.writeFormat = TEXT

- Connor


On Mon, Jan 14, 2013 at 2:34 PM, Sagar Mehta <sagarmehta@gmail.com> wrote:

> Yeah sure!!
>
> smehta@collector102:/opt/flume/conf$ cat hdfs.conf
> # hdfs.conf: This is a configuration file to configures Flume NG to use
> # An exec source to get a live tail of the jetty logFile
> # An hdfs sink to write events to the hdfs on the test cluster
> # A file based channel to connect the above source and sink
>
> # Name the components on this agent
> collector102.sources = source1
> collector102.sinks = sink1 sink2
> collector102.channels = channel1 channel2
>
> # Configure the source
> collector102.sources.source1.type = exec
> collector102.sources.source1.command = tail -F /opt/jetty/logFile.log
>
> # Configure the interceptors
> collector102.sources.source1.interceptors = TimestampInterceptor
> HostInterceptor
>
> # We use the Timestamp interceptor to get timestamps of when flume
> receives events
> # This is used for figuring out the bucket to which an event goes
> collector102.sources.source1.interceptors.TimestampInterceptor.type =
> timestamp
>
> # We use the Host interceptor to populate the host header with the fully
> qualified domain name of the collector.
> # That way we know which file in the sink respresents which collector.
> collector102.sources.source1.interceptors.HostInterceptor.type =
> org.apache.flume.interceptor.HostInterceptor$Builder
> collector102.sources.source1.interceptors.HostInterceptor.preserveExisting
> = false
> collector102.sources.source1.interceptors.HostInterceptor.useIP = false
> collector102.sources.source1.interceptors.HostInterceptor.hostHeader = host
>
> # Configure the sink
>
> collector102.sinks.sink1.type = hdfs
>
> # Configure the bucketing
> collector102.sinks.sink1.hdfs.path=hdfs://
> namenode301.ngpipes.milp.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00
>
> # Prefix the file with the source so that we know where the events in the
> file came from
> collector102.sinks.sink1.hdfs.filePrefix = %{host}
>
> # We roll the flume output file based on time interval - currently every 5
> minutes
> collector102.sinks.sink1.hdfs.rollSize = 0
> collector102.sinks.sink1.hdfs.rollCount = 0
> collector102.sinks.sink1.hdfs.rollInterval = 300
>
> #gzip compression related settings
> collector102.sinks.sink1.hdfs.codeC = gzip
> collector102.sinks.sink1.hdfs.fileType = CompressedStream
> collector102.sinks.sink1.hdfs.fileSuffix = .gz
>
> # Configure the sink
>
> collector102.sinks.sink2.type = hdfs
>
> # Configure the bucketing
> collector102.sinks.sink2.hdfs.path=hdfs://
> namenode5001.ngpipes.sac.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00
>
> # Prefix the file with the source so that we know where the events in the
> file came from
> collector102.sinks.sink2.hdfs.filePrefix = %{host}
>
> # We roll the flume output file based on time interval - currently every 5
> minutes
> collector102.sinks.sink2.hdfs.rollSize = 0
> collector102.sinks.sink2.hdfs.rollCount = 0
> collector102.sinks.sink2.hdfs.rollInterval = 300
> collector102.sinks.sink2.hdfs.fileType = DataStream
>
> # Configure the channel that connects the source to the sink
>
> # Use a channel which buffers events in filesystem
> collector102.channels.channel1.type = file
> collector102.channels.channel1.checkpointDir =
> /data/flume_data/channel1/checkpoint
> collector102.channels.channel1.dataDirs = /data/flume_data/channel1/data
>
> # Use a channel which buffers events in filesystem
> collector102.channels.channel2.type = file
> collector102.channels.channel2.checkpointDir =
> /data/flume_data/channel2/checkpoint
> collector102.channels.channel2.dataDirs = /data/flume_data/channel2/data
>
> # Bind the source and sink to the channel configured above
> collector102.sources.source1.channels = channel1 channel2
> collector102.sinks.sink1.channel = channel1
> collector102.sinks.sink2.channel = channel2
>
> On Mon, Jan 14, 2013 at 2:25 PM, Connor Woodson <cwoodson.dev@gmail.com>wrote:
>
>> Can you post your full config?
>>
>> - Connor
>>
>>
>> On Mon, Jan 14, 2013 at 11:18 AM, Sagar Mehta <sagarmehta@gmail.com>wrote:
>>
>>> Hi Guys,
>>>
>>> I'm using Flume Ng and it works great for me. In essence I'm using an
>>> exec source for doing  tail -F on a logfile and using two HDFS sinks using
>>> a File channel. So far so great - Now I'm trying to use gzip compression
>>> using the following config as per the Flume-Ng User guide at
>>> http://flume.apache.org/FlumeUserGuide.html.
>>>
>>> #gzip compression related settings
>>> collector102.sinks.sink1.hdfs.codeC = gzip
>>> collector102.sinks.sink1.hdfs.fileType = CompressedStream
>>> collector102.sinks.sink1.hdfs.fileSuffix = .gz
>>>
>>> However this is what looks to be happening
>>>
>>> *Flume seems to write gzipped compressed output [I see the .gz files in
>>> the output buckets], however when I try to decompress it - I get an error
>>> about 'trailing garbage ignored' and the decompressed output is in fact
>>> smaller in size.*
>>>
>>> hadoop@jobtracker301:/home/hadoop/sagar/temp$ ls -ltr
>>> collector102.ngpipes.sac.ngmoco.com.1357936638713.gz
>>> -rw-r--r-- 1 hadoop hadoop *5381235* 2013-01-11 20:44
>>> *collector102.ngpipes.sac.ngmoco.com.1357936638713.gz*
>>>
>>> hadoop@jobtracker301:/home/hadoop/sagar/temp$ gunzip
>>> collector102.ngpipes.sac.ngmoco.com.1357936638713.gz
>>>
>>> *gzip: collector102.ngpipes.sac.ngmoco.com.1357936638713.gz:
>>> decompression OK, trailing garbage ignored*
>>> *
>>> *
>>> hadoop@jobtracker301:/home/hadoop/sagar/temp$ ls -l
>>>
>>> -rw-r--r-- 1 hadoop hadoop *58898* 2013-01-11 20:44 *
>>> collector102.ngpipes.sac.ngmoco.com.1357936638713*
>>> *
>>> *
>>> *Below are some helpful details.*
>>> *
>>> *
>>> *I'm using apache-flume-1.4.0-SNAPSHOT-bin*
>>> *
>>> *
>>> smehta@collector102:/opt$ ls -l flume
>>> lrwxrwxrwx 1 root root 31 2012-12-14 00:44 flume ->
>>> apache-flume-1.4.0-SNAPSHOT-bin
>>>
>>> *I also have the hadoop-core jar in my path*
>>>
>>> smehta@collector102:/opt/flume/lib$ ls -l hadoop-core-0.20.2-cdh3u2.jar
>>> -rw-r--r-- 1 hadoop hadoop 3534499 2012-12-01 01:53
>>> hadoop-core-0.20.2-cdh3u2.jar
>>> *
>>> *
>>> Everything is working well for me except the compression part. I'm not
>>> quite sure what I'm missing here. So while I debug this, any ideas/help is
>>> much appreciated.
>>>
>>> Thanks in advance,
>>>  Sagar
>>>
>>
>>
>

Mime
View raw message