flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Morello <michael.more...@gmail.com>
Subject Re: syslog + kafka channel, possibility of data loss ?
Date Thu, 02 Jul 2015 08:51:53 GMT
Seems related to https://issues.apache.org/jira/browse/FLUME-1103
If I understand correctly data is lost and syslog source is not reliable ?

2015-07-02 10:31 GMT+02:00 Michael Morello <michael.morello@gmail.com>:

> Hi,
>
> After a small change in a syslog source configuration and a hot deployment
> here is what we have seen in the log :
>
> 2015-07-02 08:33:27,258 INFO  [conf-file-poller-0]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:150)  -
> Component type: CHANNEL, name: a-kafka-channel stopped
> 2015-07-02 08:33:27,258 WARN  [New I/O  worker #34]
> (org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit:363)
>  - Sending events to Kafka failed
> kafka.producer.ProducerClosedException: producer already closed
> at kafka.producer.Producer.send(Producer.scala:73)
> at kafka.javaapi.producer.Producer.send(Producer.scala:42)
> at
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:357)
> at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> at
> org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
> at
> org.apache.flume.source.SyslogTcpSource$syslogTcpHandler.messageReceived(SyslogTcpSource.java:91)
> [...]
> 2015-07-02 08:33:27,258 INFO  [conf-file-poller-0]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:156)  -
> Shutdown Metric for type: CHANNEL, name: a-kafka-channel.
> channel.start.time == 1435735128166
> 2015-07-02 08:33:27,260 ERROR [New I/O  worker #34]
> (org.apache.flume.source.SyslogTcpSource$syslogTcpHandler.messageReceived:95)
>  - Error writting to channel, event dropped
> org.apache.flume.ChannelException: Unable to put event on required
> channel: org.apache.flume.channel.kafka.KafkaChannel{name: a-kafka-channel}
> at
> org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:275)
> at
> org.apache.flume.source.SyslogTcpSource$syslogTcpHandler.messageReceived(SyslogTcpSource.java:91)
> [....]
> Caused by: org.apache.flume.ChannelException: Commit failed as send to
> Kafka failed
> at
> org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doCommit(KafkaChannel.java:364)
> 2015-07-02 08:33:27,264 INFO  [conf-file-poller-0]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:178)  -
> Shutdown Metric for type: CHANNEL, name: a-kafka-channel. channel.capacity
> == 0
> 2015-07-02 08:33:27,265 ERROR [New I/O  worker #34]
> (org.apache.flume.source.SyslogTcpSource$syslogTcpHandler.messageReceived:95)
>  - Error writting to channel, event dropped
>
> Flume recovered quickly but are the few messages "dropped" definitively
> lost ?
>
> Version is Flume 1.6, here is our configuration :
>
> my-agent.sources = a-syslog-source
> my-agent.channels = a-kafka-channel
> my-agent.sinks = a-hdfs-sink
>
> # Syslog TCP source
> my-agent.sources.a-syslog-source.type = syslogtcp
> my-agent.sources.a-syslog-source.port = xxxxx
> my-agent.sources.a-syslog-source.eventSize = 65550
> my-agent.sources.a-syslog-source.host = xxx.xxx.xxx.xxx
>
> my-agent.sources.a-syslog-source.interceptors = i1
> my-agent.sources.a-syslog-source.interceptors.i1.type = timestamp
> my-agent.sources.a-syslog-source.interceptors.i1.preserveExisting = true
>
> # Syslog messages go to the Kafka Channel
> my-agent.sources.a-syslog-source.channels = a-kafka-channel
>
> # Kafka channel
> my-agent.channels.a-kafka-channel.type =
> org.apache.flume.channel.kafka.KafkaChannel
> my-agent.channels.a-kafka-channel.brokerList =
> kafka3:9100,kafka4:9100,kafka5:9100
> my-agent.channels.a-kafka-channel.topic = a-kafka-topic
> my-agent.channels.a-kafka-channel.groupId = flume-a-kafka-group
> my-agent.channels.a-kafka-channel.zookeeperConnect =
> kafka1:2181,kafka2:2181,kafka3:2181
>
> my-agent.sinks.a-hdfs-sink.type=hdfs
> my-agent.sinks.a-hdfs-sink.hdfs.path=/data/%y-%m-%d
> my-agent.sinks.a-hdfs-sink.hdfs.filePrefix=syslog
> my-agent.sinks.a-hdfs-sink.hdfs.rollInterval=3600
> my-agent.sinks.a-hdfs-sink.hdfs.rollSize=512000000
> my-agent.sinks.a-hdfs-sink.hdfs.rollCount=0
> my-agent.sinks.a-hdfs-sink.hdfs.fileType=DataStream
> my-agent.sinks.a-hdfs-sink.hdfs.fileSuffix=.avro
> my-agent.sinks.a-hdfs-sink.serializer=avro_event
> my-agent.sinks.a-hdfs-sink.serializer.compressionCodec=snappy
>
> my-agent.sinks.a-hdfs-sink.channel = a-kafka-channel
>



-- 
Michael

Mime
View raw message