flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mangtani, Kushal" <Kushal.Mangt...@viasat.com>
Subject Re: Flume && Kafka Integration
Date Tue, 17 Mar 2015 21:51:26 GMT
flume.conf (Cleaned up for security and making it less verbose by removing additional sources,channel,sinks)

# Name the components on this agent
collector.sources = Source1
collector.channels = HdfsChannel KafkaChannel

collector.sinks = HdfsSink KafkaSink

# Describe/configure the source AvroSource
collector.sources.CustomSource.type = com.flume.CustomSource
collector.sources.CustomSource.channels = HdfsChannel KafkaChannel
collector.sources.CustomSource.bind = 0.0.0.0
collector.sources.CustomSource.port = 9898
collector.sources.CustomSource.schemaFolder = /usr/lib/flume-ng/schemas
collector.sources.CustomSource.selector.type = multiplexing
collector.sources.CustomSource.selector.header = recordType
# required channel mapings
collector.sources.CustomSource.selector.mapping.MyRecord = HdfsChannel
# optional channel mapings
collector.sources.CustomSource.selector.optional.MyRecord  = KafkaChannel

# HdfsChannel channel config
collector.channels.HdfsChannel.type = file
collector.channels.HdfsChannel.useDualCheckpoints = true
collector.channels.HdfsChannel.checkpointDir = /mnt/persistent/0/flume-ng-data/Hdfsdata/checkpoint
collector.channels.HdfsChannel.backupCheckpointDir = /mnt/persistent/0/flume-ng-data/Hdfsdata/backup-checkpoint
collector.channels.HdfsChannel.dataDirs = /mnt/persistent/0/flume-ng-data/Hdfsdata/logs
collector.channels.HdfsChannel.capacity = 1000000
collector.channels.HdfsChannel.transactionCapacity = 50000
collector.channels.HdfsChannel.write-timeout = 60
collector.channels.HdfsChannel.keep-alive = 30

# HdfsSink sink config
collector.sinks.HdfsSink.type = hdfs
collector.sinks.HdfsSink.channel = HdfsChannel
collector.sinks.HdfsSink.hdfs.fileType = DataStream
collector.sinks.HdfsSink.serializer = CUstomSerializer
collector.sinks.HdfsSink.serializer.schemaFolder = /usr/lib/flume-ng/schemas
collector.sinks.HdfsSink.serializer.syncIntervalBytes = 4096000
collector.sinks.HdfsSink.serializer.compressionCodec = snappy
collector.sinks.HdfsSink.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/myrecord
collector.sinks.HdfsSink.hdfs.rollSize = 0
collector.sinks.HdfsSink.hdfs.rollInterval = 1200
collector.sinks.HdfsSink.hdfs.rollCount = 0
collector.sinks.HdfsSink.hdfs.callTimeout = 60000
collector.sinks.HdfsSink.hdfs.batchSize = 10000

# ObjectRecordKafkaChannel channel config
collector.channels.KafkaChannel.type = memory
collector.channels.KafkaChannel.capacity = 1500000
collector.channels.KafkaChannel.transactionCapacity = 50000
collector.channels.KafkaChannel.write-timeout = 60
collector.channels.KafkaChannel.keep-alive = 30

# ObjectRecordKafkaSink sink config
collector.sinks.KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
collector.sinks.KafkaSink.channel = KafkaChannel
collector.sinks.KafkaSink.zk.connect = zk-1.com<http://zk-1.com>,zk-2.com<http://zk-2.com>,zk-3.com<http://zk-3.com>
collector.sinks.KafkaSink.metadata.broker.list = kafka-1.com<http://kafka-1.com>:9092,kafka-2.com<http://kafka-2.com>:9092,kafka-3.com<http://kafka-3.com>:9092
collector.sinks.KafkaSink.topic = MyRecord
collector.sinks.KafkaSink.batch.num.messages = 1000
collector.sinks.KafkaSink.producer.type = async
collector.sinks.KafkaSink.request.required.acks = 0
collector.sinks.KafkaSink.serializer.class = kafka.serializer.DefaultEncoder
collector.sinks.KafkaSink.key.serializer.class = kafka.serializer.StringEncoder
collector.sinks.KafkaSink.partition.key=keyName


-Kushal

On Mar 17, 2015, at 2:31 PM, Hari Shreedharan <hshreedharan@cloudera.com<mailto:hshreedharan@cloudera.com>>
wrote:

I have seen one other report recently with the optional mapping issues. Can you also send
your configuration? I’d like to investigate this and figure out what the issue is.

Thanks,
Hari Shreedharan




On Mar 17, 2015, at 2:24 PM, Mangtani, Kushal <Kushal.Mangtani@viasat.com<mailto:Kushal.Mangtani@viasat.com>>
wrote:

Hello,

We are using Flume in our prod env to ingest data. A while back, we decided to extend the
functionality and added kafka for real time monitoring.
So, the Flume Source forks off and deposits the data into two separate channels ,one if HDFS(required
mapping) and other is Kafka(optional mapping). We have made the KafkaChannels as optional
selector mapping<http://flume.apache.org/releases/content/1.4.0/FlumeUserGuide.html#fan-out-flow>
so that any issue with Kafka should not block the HDFS pipeline.
However, I have noticed this never happens. Any issue with Kafka cluster eventually also brings
down the HDFS ingestion. So, my question is that either Optional Channel Mapping in flume
src code does not works correctly OR kafka-sink/kafka cluster  I am using is outdated ? Any
inputs will be appreciated.

Env:

  *   Ubuntu 12.04
  *   CDH 5 flume 1.4
  *   Kafka Src Download - 2.9.1-0.8.1.1
  *   Using Custom Flume-Kafka Sink https://github.com/baniuyao/flume-ng-kafka-sink

- Kushal



Mime
View raw message