flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Balthasar Schopman <b.schop...@tech.leaseweb.com>
Subject recovery after memory transaction capacity is exceeded
Date Tue, 13 Oct 2015 07:12:30 GMT

I'm creating a proof-of-concept of a Flume agent that'll buffer events and stops consuming
events from the source when the sink is unavailable. Only when the sink is available again,
the buffered events should be processed and then the source restarts consumption.

For this I've created a simple agent, which reads from a SpoolDir and writes to a file. To
simulate that the sink service is down, I change file permissions so Flume can't write to
it. Then I start Flume some events are buffered in the memory channel and it stops consuming
events when the channel capacity is full, as expected. As soon as the file becomes writeable,
the sink is able to process the events and Flume recovers. However, that only works when the
transaction capacity is not exceeded. As soon as the transaction capacity is exceeded, Flume
never recovers and keeps writing the following error:

    2015-10-02 14:52:51,940 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR -
    org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to
    deliver event. Exception follows.
    org.apache.flume.EventDeliveryException: Failed to process transaction
        at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction,
    capacity 4 full, consider committing more frequently, increasing capacity, or
    increasing thread count
        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
        at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
        ... 3 more

As soon as the number of events buffered in memory exceed the transaction capacity (4) this
error occurs. I don't understand why, because the batchSize of the fileout is 1, so it should
take out the events one by one.

This is the config I'm using:

    agent.sources = spool-src
    agent.channels = mem-channel
    agent.sinks = fileout

    agent.sources.spool-src.channels = mem-channel
    agent.sources.spool-src.type = spooldir
    agent.sources.spool-src.spoolDir = /tmp/flume-spool
    agent.sources.spool-src.batchSize = 1

    agent.channels.mem-channel.type = memory
    agent.channels.mem-channel.capacity = 10
    agent.channels.mem-channel.transactionCapacity = 4

    agent.sinks.fileout.channel = mem-channel
    agent.sinks.fileout.type = file_roll
    agent.sinks.fileout.sink.directory = /tmp/flume-output
    agent.sinks.fileout.sink.rollInterval = 0
    agent.sinks.fileout.batchSize = 1

I've tested this config with different values for the channel capacity & transaction capacity
(e.g., 3 and 3), but haven't found a config where Flume is able to recover after the channel
capacity is full. Any ideas on how to achieve this?

Kind regards,
Balthasar Schopman
LeaseWeb CDN Innovation Engineer

Kind regards,

Balthasar Schopman
Software Developer
LeaseWeb Technologies B.V.

T: +31 20 316 0232
E: b.schopman@tech.leaseweb.com
W: http://www.leaseweb.com

Luttenbergweg 8, 1101 EC Amsterdam, Netherlands

View raw message