My guess would be that you don't have a steady flow of messages and you get
to the interval threshold.
Set the idleTimeout to something bigger. just note that the last file will
not be closed until this timeout exceeded.
Eran
On Mon, Sep 21, 2015 at 1:32 PM <Thomas.Beer@continental-corporation.com>
wrote:
> Addon: The rollover count varies (it is not always done after 4 events, as
> written before).
>
> Best,
> Thomas
>
>
>
>
> Von: Thomas.Beer@continental-corporation.com
> An: user@flume.apache.org,
> Datum: 21.09.2015 12:05
> Betreff: HDFS sink - rollover
> ------------------------------
>
>
>
> Hi,
>
> I'm using the Kafka-Flume source and the Flume-HDFS sink for writing
> SequenceFiles. I would like to rollover a SequenceFile after a specific
> count of events/messages was written, e.g. after 50 messages (see rollCount
> parameter below) a new file should be written.
> My configuration seems to be incorrect as a rollover is done after 4
> messages (instead of 50).
> I'm using the following "rollover" configuration:
>
> "a_cobepa_probe.sources = kafka-cobepa_probe
> a_cobepa_probe.channels = hdfs-channel-notused
> a_cobepa_probe.sinks = hdfs_cobepa_probe
>
> a_cobepa_probe.sources.kafka-cobepa_probe.type =
> org.apache.flume.source.kafka.KafkaSource
> a_cobepa_probe.sources.kafka-cobepa_probe.zookeeperConnect = <secret>
> a_cobepa_probe.sources.kafka-cobepa_probe.topic = cobepa_probe
> a_cobepa_probe.sources.kafka-cobepa_probe.batchSize = 1
> a_cobepa_probe.sources.kafka-cobepa_probe.channels = hdfs-channel-notused
>
> a_cobepa_probe.channels.hdfs-channel-notused.type = memory
> a_cobepa_probe.sinks.hdfs_cobepa_probe.channel = hdfs-channel-notused
> a_cobepa_probe.sinks.hdfs_cobepa_probe.type = hdfs
> a_cobepa_probe.sinks.hdfs_cobepa_probe.hdfs.writeFormat =
> de.conti.backend.asw.flume.serializer.MyBuilder
> a_cobepa_probe.sinks.hdfs_cobepa_probe.hdfs.fileType = SequenceFile
> # a_cobepa_probe.sinks.hdfs_cobepa_probe.hdfs.fileType = DataStream
> a_cobepa_probe.sinks.hdfs_cobepa_probe.hdfs.filePrefix = %k%M
> a_cobepa_probe.sinks.hdfs_cobepa_probe.hdfs.fileSuffix = .cobr
> a_cobepa_probe.sinks.hdfs_cobepa_probe.hdfs.useLocalTimeStamp = true
> a_cobepa_probe.sinks.hdfs_cobepa_probe.hdfs.path = /etl/%{topic}/%y%m%d
> * a_cobepa_probe.sinks.hdfs_cobepa_probe.hdfs.rollCount=50*
> * a_cobepa_probe.sinks.hdfs_cobepa_probe.hdfs.rollSize=0*
> * a_cobepa_probe.sinks.hdfs_cobepa_probe.hdfs.batchSize=50*
>
> a_cobepa_probe.channels.hdfs-channel-notused.capacity = 100
> a_cobepa_probe.channels.hdfs-channel-notused.transactionCapacity = 100"
>
>
> Are there any dependencies to other configuration parameters (in addition
> to the rollCount and rollSize parameter)?
>
> Thank you very much and kind regards,
> Thomas
>
--
Eran | "You don't need eyes to see, you need vision" (Faithless)
|