flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Morello <michael.more...@gmail.com>
Subject syslog + kafka channel, possibility of data loss ?
Date Thu, 02 Jul 2015 08:31:21 GMT
Hi,

After a small change in a syslog source configuration and a hot deployment
here is what we have seen in the log :

2015-07-02 08:33:27,258 INFO  [conf-file-poller-0]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:150)  -
Component type: CHANNEL, name: a-kafka-channel stopped
2015-07-02 08:33:27,258 WARN  [New I/O  worker #34]
(org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:363)
 - Sending events to Kafka failed
kafka.producer.ProducerClosedException: producer already closed
at kafka.producer.Producer.send(Producer.scala:73)
at kafka.javaapi.producer.Producer.send(Producer.scala:42)
at
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at
org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
at
org.apache.flume.source.SyslogTcpSource$syslogTcpHandler.messageReceived(SyslogTcpSource.java:91)
[...]
2015-07-02 08:33:27,258 INFO  [conf-file-poller-0]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:156)  -
Shutdown Metric for type: CHANNEL, name: a-kafka-channel.
channel.start.time == 1435735128166
2015-07-02 08:33:27,260 ERROR [New I/O  worker #34]
(org.apache.flume.source.SyslogTcpSource$syslogTcpHandler.messageReceived:95)
 - Error writting to channel, event dropped
org.apache.flume.ChannelException: Unable to put event on required channel:
org.apache.flume.channel.kafka.KafkaChannel{name: a-kafka-channel}
at
org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
at
org.apache.flume.source.SyslogTcpSource$syslogTcpHandler.messageReceived(SyslogTcpSource.java:91)
[....]
Caused by: org.apache.flume.ChannelException: Commit failed as send to
Kafka failed
at
org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:364)
2015-07-02 08:33:27,264 INFO  [conf-file-poller-0]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  -
Shutdown Metric for type: CHANNEL, name: a-kafka-channel. channel.capacity
== 0
2015-07-02 08:33:27,265 ERROR [New I/O  worker #34]
(org.apache.flume.source.SyslogTcpSource$syslogTcpHandler.messageReceived:95)
 - Error writting to channel, event dropped

Flume recovered quickly but are the few messages "dropped" definitively
lost ?

Version is Flume 1.6, here is our configuration :

my-agent.sources = a-syslog-source
my-agent.channels = a-kafka-channel
my-agent.sinks = a-hdfs-sink

# Syslog TCP source
my-agent.sources.a-syslog-source.type = syslogtcp
my-agent.sources.a-syslog-source.port = xxxxx
my-agent.sources.a-syslog-source.eventSize = 65550
my-agent.sources.a-syslog-source.host = xxx.xxx.xxx.xxx

my-agent.sources.a-syslog-source.interceptors = i1
my-agent.sources.a-syslog-source.interceptors.i1.type = timestamp
my-agent.sources.a-syslog-source.interceptors.i1.preserveExisting = true

# Syslog messages go to the Kafka Channel
my-agent.sources.a-syslog-source.channels = a-kafka-channel

# Kafka channel
my-agent.channels.a-kafka-channel.type =
org.apache.flume.channel.kafka.KafkaChannel
my-agent.channels.a-kafka-channel.brokerList =
kafka3:9100,kafka4:9100,kafka5:9100
my-agent.channels.a-kafka-channel.topic = a-kafka-topic
my-agent.channels.a-kafka-channel.groupId = flume-a-kafka-group
my-agent.channels.a-kafka-channel.zookeeperConnect =
kafka1:2181,kafka2:2181,kafka3:2181

my-agent.sinks.a-hdfs-sink.type=hdfs
my-agent.sinks.a-hdfs-sink.hdfs.path=/data/%y-%m-%d
my-agent.sinks.a-hdfs-sink.hdfs.filePrefix=syslog
my-agent.sinks.a-hdfs-sink.hdfs.rollInterval=3600
my-agent.sinks.a-hdfs-sink.hdfs.rollSize=512000000
my-agent.sinks.a-hdfs-sink.hdfs.rollCount=0
my-agent.sinks.a-hdfs-sink.hdfs.fileType=DataStream
my-agent.sinks.a-hdfs-sink.hdfs.fileSuffix=.avro
my-agent.sinks.a-hdfs-sink.serializer=avro_event
my-agent.sinks.a-hdfs-sink.serializer.compressionCodec=snappy

my-agent.sinks.a-hdfs-sink.channel = a-kafka-channel


Thanks

-- 
Michael

Mime
View raw message