Bumping up the thread*

Hari - Did u get a chance to look into this issue ?

-Kushal 

On Mar 17, 2015, at 2:51 PM, Mangtani, Kushal <Kushal.Mangtani@viasat.com> wrote:

flume.conf (Cleaned up for security and making it less verbose by removing additional sources,channel,sinks)

# Name the components on this agent
collector.sources = Source1
collector.channels = HdfsChannel KafkaChannel 

collector.sinks = HdfsSink KafkaSink 

# Describe/configure the source AvroSource
collector.sources.CustomSource.type = com.flume.CustomSource
collector.sources.CustomSource.channels = HdfsChannel KafkaChannel 
collector.sources.CustomSource.bind = 0.0.0.0
collector.sources.CustomSource.port = 9898
collector.sources.CustomSource.schemaFolder = /usr/lib/flume-ng/schemas
collector.sources.CustomSource.selector.type = multiplexing
collector.sources.CustomSource.selector.header = recordType
# required channel mapings
collector.sources.CustomSource.selector.mapping.MyRecord = HdfsChannel
# optional channel mapings
collector.sources.CustomSource.selector.optional.MyRecord  = KafkaChannel

# HdfsChannel channel config
collector.channels.HdfsChannel.type = file
collector.channels.HdfsChannel.useDualCheckpoints = true
collector.channels.HdfsChannel.checkpointDir = /mnt/persistent/0/flume-ng-data/Hdfsdata/checkpoint
collector.channels.HdfsChannel.backupCheckpointDir = /mnt/persistent/0/flume-ng-data/Hdfsdata/backup-checkpoint
collector.channels.HdfsChannel.dataDirs = /mnt/persistent/0/flume-ng-data/Hdfsdata/logs
collector.channels.HdfsChannel.capacity = 1000000
collector.channels.HdfsChannel.transactionCapacity = 50000
collector.channels.HdfsChannel.write-timeout = 60
collector.channels.HdfsChannel.keep-alive = 30

# HdfsSink sink config
collector.sinks.HdfsSink.type = hdfs
collector.sinks.HdfsSink.channel = HdfsChannel
collector.sinks.HdfsSink.hdfs.fileType = DataStream
collector.sinks.HdfsSink.serializer = CUstomSerializer
collector.sinks.HdfsSink.serializer.schemaFolder = /usr/lib/flume-ng/schemas
collector.sinks.HdfsSink.serializer.syncIntervalBytes = 4096000
collector.sinks.HdfsSink.serializer.compressionCodec = snappy
collector.sinks.HdfsSink.hdfs.path = hdfs://namenode/data/%Y/%m/%d/%H00/myrecord
collector.sinks.HdfsSink.hdfs.rollSize = 0
collector.sinks.HdfsSink.hdfs.rollInterval = 1200
collector.sinks.HdfsSink.hdfs.rollCount = 0
collector.sinks.HdfsSink.hdfs.callTimeout = 60000
collector.sinks.HdfsSink.hdfs.batchSize = 10000

# ObjectRecordKafkaChannel channel config
collector.channels.KafkaChannel.type = memory
collector.channels.KafkaChannel.capacity = 1500000
collector.channels.KafkaChannel.transactionCapacity = 50000
collector.channels.KafkaChannel.write-timeout = 60
collector.channels.KafkaChannel.keep-alive = 30

# ObjectRecordKafkaSink sink config
collector.sinks.KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
collector.sinks.KafkaSink.channel = KafkaChannel
collector.sinks.KafkaSink.zk.connect = zk-1.com,zk-2.com,zk-3.com
collector.sinks.KafkaSink.metadata.broker.list = kafka-1.com:9092,kafka-2.com:9092,kafka-3.com:9092
collector.sinks.KafkaSink.topic = MyRecord
collector.sinks.KafkaSink.batch.num.messages = 1000
collector.sinks.KafkaSink.producer.type = async
collector.sinks.KafkaSink.request.required.acks = 0
collector.sinks.KafkaSink.serializer.class = kafka.serializer.DefaultEncoder
collector.sinks.KafkaSink.key.serializer.class = kafka.serializer.StringEncoder
collector.sinks.KafkaSink.partition.key=keyName


-Kushal 

On Mar 17, 2015, at 2:31 PM, Hari Shreedharan <hshreedharan@cloudera.com> wrote:

I have seen one other report recently with the optional mapping issues. Can you also send your configuration? Id like to investigate this and figure out what the issue is.

Thanks,
Hari Shreedharan




On Mar 17, 2015, at 2:24 PM, Mangtani, Kushal <Kushal.Mangtani@viasat.com> wrote:

Hello,

We are using Flume in our prod env to ingest data. A while back, we decided to extend the functionality and added kafka for real time monitoring.
So, the Flume Source forks off and deposits the data into two separate channels ,one if HDFS(required mapping) and other is Kafka(optional mapping). We have made the KafkaChannels as optional selector mapping so that any issue with Kafka should not block the HDFS pipeline.
However, I have noticed this never happens. Any issue with Kafka cluster eventually also brings down the HDFS ingestion. So, my question is that either Optional Channel Mapping in flume src code does not works correctly OR kafka-sink/kafka cluster  I am using is outdated ? Any inputs will be appreciated.

Env:

- Kushal