flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Bohr <a...@gradientx.com>
Subject Flume Consumer name
Date Tue, 10 Mar 2015 19:09:46 GMT
After running some flafka tests I wanted to switch the name of the consumer
group to reset the offset to the most recent, instead of waiting for the
for the agents to plow through the backlog.
So I changed the value in the "groupId" property.

But I don't see the new consumer group in Kafka.  It looks like the agents
are still consuming under the original consumer group name "flume."

But I know the new config file is being used because I also changed the
directory in the sink and they are writing to the new HDFS path.

Here's my new config file:

flume1.sources  = kafka-source-1
flume1.channels = hdfs-channel-1
flume1.sinks    = hdfs-sink-1

# For each source, channel, and sink, set
# standard properties.
flume1.sources.kafka-source-1.type =
org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect =
xxx.xx.xx.107:2181/kafkaCluster
flume1.sources.kafka-source-1.topic = Events3
flume1.sources.kafka-source-1.groupid = newfloom
flume1.sources.kafka-source-1.batchSize = 10000
flume1.sources.kafka-source-1.channels = hdfs-channel-1

flume1.channels.hdfs-channel-1.type   = memory
flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Writable
flume1.sinks.hdfs-sink-1.hdfs.fileType = SequenceFile
flume1.sinks.hdfs-sink-1.hdfs.filePrefix =
%Y-%m-%d-%H-%M-%{host}-1-sequence-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path =
hdfs://xxx.xx.xxx.41:8020/user/gxetl/test_flume_2/%{topic}
flume1.sinks.hdfs-sink-1.hdfs.rollCount=0
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
flume1.sinks.hdfs-sink-1.hdfs.rollInterval=120

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
flume1.channels.hdfs-channel-1.capacity = 500000
flume1.channels.hdfs-channel-1.transactionCapacity = 100000



Any advice on how to change the consumer group name?

Also, how does the kafka consumer set its initial offest when starting a
new source/consumer?
I'm just hoping it sets it at the most recent but it probably sets it at
the oldest offset.  And if so, how can I get it to start at the most recent?

Thanks!

Mime
View raw message