flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeong-shik Jang <jsja...@gmail.com>
Subject Re: Spooldir -> Kafka sink
Date Fri, 20 May 2016 09:30:28 GMT
Just to make sure, you may need an interceptor implementing a logic setting
"topic" value in headers from event maybe, which should be sit between your
spooldir source and kafka sink; this is for the case you don't have one.

2016-05-20 18:16 GMT+09:00 Jeong-shik Jang <jsjangg@gmail.com>:

> Hi Simone,
>
> I got better understanding; thanks for your explanation.
> In that case, how about checking key name in headers; it is supposed to be
> "topic".
>
> User guide reads:
> If the event header contains a “topic” field, the event will be published
> to that topic overriding the topic configured here
>
> JS
>
>
> 2016-05-20 18:08 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:
>
>> Hi Jeong,
>>
>> thanks for your answer.
>>
>> I already have my topics in Kafka, I don't need to create new topics.
>> Unfortunately, the problem is different here.
>>
>> Problem in one sentence:
>>
>> ** The Spooldir source is not able to successfully send events to my
>> Kafka topic, if the topic name is not set in the agent.conf **
>>
>>
>>
>> Simone Roselli
>> ITE Sysadmin
>> simone.roselli@plista.com
>> http://www.plista.com
>>
>> ----- Original Message -----
>> From: "Jeong-shik Jang" <jsjangg@gmail.com>
>> To: "user" <user@flume.apache.org>
>> Sent: Friday, May 20, 2016 3:00:02 AM
>> Subject: Re: Spooldir -> Kafka sink
>>
>> Hi Simone,
>>
>> How about starting from checking your Kafka configuration? The related
>> property name I think is "auto.create.topics.enable".
>>
>> auto.create.topics.enable true Enable auto creation of topic on the
>> server.
>> If this is set to true then attempts to produce, consume, or fetch
>> metadata
>> for a non-existent topic will automatically create it with the default
>> replication factor and number of partitions.
>>
>> Default value is true so likely it is enabled but just to make sure.
>>
>> JS
>>
>> 2016-05-19 22:37 GMT+09:00 Simone Roselli <simone.roselli@plista.com>:
>>
>> > Hallo,
>> >
>> > I'm using 2 sinks (Kafka, Fileroll) in failover.
>> >
>> > If the Kafka sink is temporary unreachable, the Fileroll takes over and
>> > writes events on a local dir.
>> >
>> > Then, I configure a spoolDir source, for a directory /dir, pointing to
>> the
>> > Kafka sink.
>> >
>> > When I try to move an event from the local dir to the spool dir, the
>> event
>> > doesn't reach Kafka and I get this:
>> >
>> > """
>> > 9 May 2016 15:12:39,007 WARN
>> > [SinkRunner-PollingRunner-FailoverSinkProcessor]
>> > (kafka.utils.Logging$class.warn:83)  - Error while fetching metadata
>> > [{TopicMetadata for topic default-flume-topic ->
>> > No partition metadata for topic default-flume-topic due to
>> > kafka.common.UnknownTopicOrPartitionException}] for topic
>> > [default-flume-topic]: class
>> kafka.common.UnknownTopicOrPartitionException
>> >
>> > 19 May 2016 15:12:39,007 ERROR
>> > [SinkRunner-PollingRunner-FailoverSinkProcessor]
>> > (kafka.utils.Logging$class.error:97)  - Failed to collate messages by
>> > topic, partition due to: Failed to fetch topic metadata for topic:
>> > default-flume-topic
>> >
>> > 19 May 2016 15:12:39,007 INFO
>> > [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$
>> > class.info:68)  - Back off for 100 ms before retrying send. Remaining
>> > retries = 3
>> >
>> > 19 May 2016 15:12:39,108 INFO
>> > [SinkRunner-PollingRunner-FailoverSinkProcessor] (kafka.utils.Logging$
>> > class.info:68)  - Fetching metadata from broker id:1,host:
>> > broker01.doamain.com,port:9092 with correlation id 45270 for 2 topic(s)
>> > Set(MyTopic, default-flume-topic)
>> >
>> > ...
>> >
>> > 19 May 2016 15:12:39,433 ERROR
>> > [SinkRunner-PollingRunner-FailoverSinkProcessor]
>> > (kafka.utils.Logging$class.error:97)  - Failed to send requests for
>> topics
>> > MyTopic,default-flume-topic with correlation ids in [xxx,xxx]
>> >
>> > """
>> >
>> > default-flume-topic = kafka topic used by flume-ng Kafka sink
>> > MyTopic = my actual target topic, present in the event headers
>> >
>> > In the agent.conf i didn't set any topic name as topic names are
>> > dynamically assigned. If I define a topic name in the agent.conf, then
>> it
>> > works.
>> >
>> >
>> > Any clues?
>> > Thanks
>> >
>> >
>> >
>> > Simone Roselli
>> > ITE Sysadmin
>> > simone.roselli@plista.com
>> > http://www.plista.com
>> >
>>
>
>

Mime
View raw message