flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Denes Arvay <de...@cloudera.com>
Subject Re: Write transformed data to file_roller sink with Morphline
Date Wed, 08 Mar 2017 13:42:22 GMT
Hi Scott,

I think it's not because of the morphlines. If the data appears properly on
loggersink then it should be the same with any other sinks.
I did some debugging on the file roll sink and apparently it rotates empty
files as well: after the configured rollInterval it marks the current file
to be rotated and when it tries to process the next event the file will be
rotated even if there is nothing to write into it.

If you are interested in the details on code level:
- here is the file rotation logic:
https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java#L147
-
this kick in even if there is no new event in the channel
- this method (the process()) is called (through multiple levels) by the
SinkRunner which sleeps for maximum 5 seconds if the sink returned BACKOFF
(ie. there was no event to process):
https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java#L148-L148
- the result is that you will have new (and apparently empty files) in
every ~5 seconds

IMHO this is a bug in the file roller sink, I'm going to file a JIRA ticket
to get this fixed.

Btw, the 3 seconds rollInterval seems a bit low for me (the default is 30
seconds), is it intentional? If yes, what do you want to achieve with it?

Kind regards,
Denes

On Tue, Mar 7, 2017 at 9:59 AM Scott Handerson <sjhanderson@mailbox.org>
wrote:

> I attempt to use integrate morphline with flume. And I am able to use
> logger sink to print data to the console. However, when switching to use
> file_roller sink. The output folder only contains new line (empty line).
> How can I get transformed data written to the file_roller sink?
>
>
> Thanks.
>
>
> Flume agent conf file:
>
> a1.sources = r1
> a1.sinks = k1
> a1.channels = c1
>
> a1.sources.r1.type = spooldir
> a1.sources.r1.spoolDir = /path/to/source
> a1.sources.r1.interceptors = m
> a1.sources.r1.interceptors.m.type =
> org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
> a1.sources.r1.interceptors.m.morphlineFile = /path/to/conf/morphline.conf
> a1.sources.r1.interceptors.m.morphlineId = morphline1
>
> a1.sinks.k1.type = file_roll
> a1.sinks.k1.sink.directory = /path/to/output
> a1.sinks.k1.sink.rollInterval = 3
>
>
> a1.channels.c1.type = memory
> a1.channels.c1.capacity = 100000
>
>
> a1.sources.r1.channels = c1
> a1.sinks.k1.channel = c1
>
>
> Morphline conf file:
>
> morphlines : [{
> id : morphline1
> importCommands : ["org.kitesdk.**" ]
> commands : [
> {
> readCSV {
> separator : "\t"
> columns :
> [userID,movieID,rating,date_day,date_month,date_year,date_hour,date_minute,date_second]
> ignoreFirstLine : false
> trim : true
> charset : UTF-8
> }
> }
> {
> java {
> imports : "import java.util.*;"
> code: """
> record.removeAll("date_hour");
> record.removeAll("date_minute");
> record.removeAll("date_second");
> return child.process(record);
> """
> }
> }
> { logInfo { format : "(after columns are removed) record: {}", args :
> ["@{}"] } }
> {
> setValues {
> _attachment_body : "@{bodybak}"
> bodybak : []
> _attachment_mimetype : []
> }
> }
> ]
> }]
>

Mime
View raw message