flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alex Bohr <a...@gradientx.com>
Subject Re: Flume Consumer name
Date Tue, 10 Mar 2015 20:31:38 GMT
Cool, easy fix.

What about the initial offset - does the consumer start from the oldest or
the most recent offset?

Thanks!

On Tue, Mar 10, 2015 at 12:31 PM, Gwen Shapira <gshapira@cloudera.com>
wrote:

> Just a small typo, I think.
>
> should be groupId (capital I. Flume standardized on camel case)
>
>
>
> On Tue, Mar 10, 2015 at 12:09 PM, Alex Bohr <alex@gradientx.com> wrote:
> > After running some flafka tests I wanted to switch the name of the
> consumer
> > group to reset the offset to the most recent, instead of waiting for the
> for
> > the agents to plow through the backlog.
> > So I changed the value in the "groupId" property.
> >
> > But I don't see the new consumer group in Kafka.  It looks like the
> agents
> > are still consuming under the original consumer group name "flume."
> >
> > But I know the new config file is being used because I also changed the
> > directory in the sink and they are writing to the new HDFS path.
> >
> > Here's my new config file:
> >
> > flume1.sources  = kafka-source-1
> > flume1.channels = hdfs-channel-1
> > flume1.sinks    = hdfs-sink-1
> >
> > # For each source, channel, and sink, set
> > # standard properties.
> > flume1.sources.kafka-source-1.type =
> > org.apache.flume.source.kafka.KafkaSource
> > flume1.sources.kafka-source-1.zookeeperConnect =
> > xxx.xx.xx.107:2181/kafkaCluster
> > flume1.sources.kafka-source-1.topic = Events3
> > flume1.sources.kafka-source-1.groupid = newfloom
> > flume1.sources.kafka-source-1.batchSize = 10000
> > flume1.sources.kafka-source-1.channels = hdfs-channel-1
> >
> > flume1.channels.hdfs-channel-1.type   = memory
> > flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
> > flume1.sinks.hdfs-sink-1.type = hdfs
> > flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Writable
> > flume1.sinks.hdfs-sink-1.hdfs.fileType = SequenceFile
> > flume1.sinks.hdfs-sink-1.hdfs.filePrefix =
> > %Y-%m-%d-%H-%M-%{host}-1-sequence-events
> > flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
> > flume1.sinks.hdfs-sink-1.hdfs.path =
> > hdfs://xxx.xx.xxx.41:8020/user/gxetl/test_flume_2/%{topic}
> > flume1.sinks.hdfs-sink-1.hdfs.rollCount=0
> > flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
> > flume1.sinks.hdfs-sink-1.hdfs.rollInterval=120
> >
> > # Other properties are specific to each type of
> > # source, channel, or sink. In this case, we
> > # specify the capacity of the memory channel.
> > flume1.channels.hdfs-channel-1.capacity = 500000
> > flume1.channels.hdfs-channel-1.transactionCapacity = 100000
> >
> >
> >
> > Any advice on how to change the consumer group name?
> >
> > Also, how does the kafka consumer set its initial offest when starting a
> new
> > source/consumer?
> > I'm just hoping it sets it at the most recent but it probably sets it at
> the
> > oldest offset.  And if so, how can I get it to start at the most recent?
> >
> > Thanks!
>

Mime
View raw message