flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nitin Pawar <nitinpawar...@gmail.com>
Subject Re: flume-ng cassandra sink problem..
Date Wed, 26 Dec 2012 18:20:34 GMT
can you provide sample line out of your log?

the cassandra sink you are using looks at a particular event headers in
your log such as key, src and host


On Wed, Dec 26, 2012 at 11:35 PM, Priyanka jain
<jainpriyanka1110@gmail.com>wrote:

> Hi Nitin,
> Thanks for your suggestion.
>
> I have done all the thing as in README. but am not getting from where I
> can set that key.
> can you please give me idea about from where I can configure it or from
> where its get generated.
>
>
> On Wed, Dec 26, 2012 at 11:22 PM, Nitin Pawar <nitinpawar432@gmail.com>wrote:
>
>> from the README
>> you need to have following things in conf
>>
>> The Sink expects several flume event headers to be present:
>>
>>    - key - used (combined with src) to create the Cassandra row key. It
>>    should be generated by the application doing the logging
>>    - timestamp - timestamp of when the log occurred, not necessarily
>>    when the flume event is created
>>    - src - A logical source of the flume event. Could be host, but
>>    probably you will have many hosts for a source. A more likely candidate for
>>    source is the name of the application
>>    - host - the name of the host where the message was generated
>>    -
>>
>>
>>
>> On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain <
>> jainpriyanka1110@gmail.com> wrote:
>>
>>> HI,
>>>
>>> I am working on the POC of Cassandra flume integration. For that am
>>> using Cassandra sink plugin from *Github (flume-ng Cassandra sink
>>> plugin).*
>>> *And *
>>> *Flume-NG version-1.2.0*
>>> *Apache Cassandra Version :1.1.5*
>>> *I *have build the jar using maven and am using sink configuration as
>>> below in flume.conf.cassandra in conf directory...
>>>
>>> *agent.sources = avrosource*
>>>
>>> *agent.channels = channel1*
>>>
>>> *agent.sinks = cassandraSink*
>>>
>>> * *
>>>
>>> *#source defination*
>>>
>>> *agent.sources.avrosource.channels = channel1*
>>>
>>> *agent.sources.avrosource.type = exec*
>>>
>>> *agent.sources.avrosource.command = tail -f
>>> /home/user/priyanka/flume-ng/flnginput.txt*
>>>
>>> * *
>>>
>>> *#agent.sources.avrosource.type = avro*
>>>
>>> *#agent.sources.avrosource.channels = channel1*
>>>
>>> *#agent.sources.avrosource.bind =127.0.0.1*
>>>
>>> *#agent.sources.avrosource.port =41414*
>>>
>>> * *
>>>
>>> *#Flume header event*
>>>
>>> *agent.sources.avrosource.interceptors = addHost*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.type =
>>> org.apache.flume.interceptor.HostInterceptor$Builder*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>>>
>>> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>>>
>>> *agent.sources.avrosource.interceptors = addTimestamp*
>>>
>>> *agent.sources.avrosource.interceptors.addTimestamp.type =
>>> org.apache.flume.interceptor.TimestampInterceptor$Builder*
>>>
>>> * *
>>>
>>> *# Cassandra flow*
>>>
>>> *agent.channels.channel1.type = FILE*
>>>
>>> *agent.channels.channel1.checkpointDir = file-channel1/check*
>>>
>>> *agent.channels.channel1.dataDirs = file-channel1/data*
>>>
>>> * *
>>>
>>> *agent.sinks.cassandraSink.channel = channel1*
>>>
>>> *agent.sinks.cassandraSink.type =
>>> com.btoddb.flume.sinks.cassandra.CassandraSink*
>>>
>>> *agent.sinks.cassandraSink.hosts = localhost*
>>>
>>> *agent.sinks.cassandraSink.port = 9160*
>>>
>>> *agent.sinks.cassandraSink.keyspace-name  = logs*
>>>
>>> *agent.sinks.cassandraSink.records-colfam  = records*
>>>
>>> *
>>> *
>>>
>>> *
>>> *
>>>
>>> *
>>> *
>>>
>>> *Am running this using the command :-*
>>>
>>> *
>>> *
>>>
>>> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
>>> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
>>> -Dflume.root.logger=DEBUG,console
>>>
>>>
>>> Got the error while running :-
>>>
>>> * *
>>>
>>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>> [ERROR -
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
>>> exception while processing in Cassandra Sink
>>>
>>> java.lang.IllegalArgumentException: Missing flume header attribute,
>>> 'key' - cannot process this event
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>
>>> at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> atjava.lang.Thread.run(Thread.java:722)
>>>
>>> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
>>> [ERROR -
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
>>> to deliver event. Exception follows.
>>>
>>> org.apache.flume.EventDeliveryException: Failed to persist message
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)
>>>
>>> at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> atjava.lang.Thread.run(Thread.java:722)
>>>
>>> Caused by: java.lang.IllegalArgumentException: Missing flume header
>>> attribute, 'key' - cannot process this event
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>>>
>>> at
>>> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>>>
>>>         ... 3 more
>>>
>>> I got one solution as Key is Src+Key but am not getting how to configure
>>> it.
>>> So can any one please help me out to solve this problem.
>>> So
>>>
>>>
>>
>>
>> --
>> Nitin Pawar
>>
>
>


-- 
Nitin Pawar

Mime
View raw message