flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matt Wise <m...@nextdoor.com>
Subject Re: ElasticSearchSink does not work
Date Tue, 18 Jun 2013 18:36:51 GMT
In order for me to use the elasticsearch sink, I had to install the ElasticSearch JAR packages
onto my Flume nodes and add them to the environment through the flume-env.sh script. Here's
our puppet flume-env.sh template:

> # Give Flume more memory and pre-allocate, enable remote monitoring via JMX
> JAVA_OPTS="-Xms100m -Xmx<%= flume_max_mem.to_i %>m -Dcom.sun.management.jmxremote
-Dflume.monitoring.type=http -Dflume.monitoring.port=<%= flume_monitoring_port %>"
> 
> # Note that the Flume conf directory is always included in the classpath.
> FLUME_CLASSPATH="<%= elasticsearch_dest %>/lib/*"

--Matt

On Jun 12, 2013, at 8:04 AM, shushuai zhu <sszhu@yahoo.com> wrote:

> Hi, just a quick update. Found some other site to download apache-flume-1.3.1-bin.tar.gz
like:
>  
> http://apache.mesi.com.ar/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz
>  
> After the installation and running, still got the same class not found exception (see
below log messages). I suspect some additional jar file is needed for ElasticSearchSink. Anyone
have any idea?
>  
> Thanks.
>  
> Shushuai
>  
>  
> From: shushuai zhu <sszhu@yahoo.com>
> To: "user@flume.apache.org" <user@flume.apache.org> 
> Sent: Wednesday, June 12, 2013 10:35 AM
> Subject: Re: ElasticSearchSink does not work
> 
> Allan, thanks for the reply. In my case, I only used one channel and one sink at the
same time.
>  
> About 10 minutes after the data were sent to the Flume agent, some messages were logged
in flume.log (see below). It says class org/elasticsearch/common/transport/TransportAddress
was not found. This seems indicating that the Cloudera version of Flume does not support ElasticSearchSink.
Anyway to add the missing class or some jar file?
>  
> I also tried to download the flume from Flume site:
>  
> http://flume.apache.org/download.html
> http://www.apache.org/dyn/closer.cgi/flume/1.3.1/apache-flume-1.3.1-bin.tar.gz
>  
> But the downloaded apache-flume-1.3.1-bin.tar.gz is complained as not a gzip file nor
a tar file on my Linux box (Red Hat 5). Can anyone let me know the exact downloading process?
If possible, please provide some step-by-step instruction for downloading and installation.
>  
> Thanks.
>  
> Shushuai
>  
>  
> -------------------------------------------------------------------------------------------------------------
> 11 Jun 2013 19:40:37,082 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:61)
 - Configuration provider starting
> 11 Jun 2013 19:40:37,114 INFO  [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:133)
 - Reloading configuration file:conf/flume.conf
> 11 Jun 2013 19:40:37,121 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)
 - Processing:k1
> 11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)
 - Processing:k1
> 11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)
 - Processing:k1
> 11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)
 - Processing:k1
> 11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)
 - Processing:k1
> 11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:930)
 - Added sinks: k1 Agent: agent1
> 11 Jun 2013 19:40:37,122 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)
 - Processing:k1
> 11 Jun 2013 19:40:37,123 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)
 - Processing:k1
> 11 Jun 2013 19:40:37,123 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1016)
 - Processing:k1
> 11 Jun 2013 19:40:37,457 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration.validateConfiguration:140)
 - Post-validation flume configuration contains configuration for agents: [agent1]
> 11 Jun 2013 19:40:37,457 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:150)
 - Creating channels
> 11 Jun 2013 19:40:37,464 INFO  [conf-file-poller-0] (org.apache.flume.channel.DefaultChannelFactory.create:40)
 - Creating instance of channel ch1 type memory
> 11 Jun 2013 19:40:37,468 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:205)
 - Created channel ch1
> 11 Jun 2013 19:40:37,469 INFO  [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:39)
 - Creating instance of source avro-source1, type avro
> 11 Jun 2013 19:40:37,484 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:40)
 - Creating instance of sink: k1, type: org.apache.flume.sink.elasticsearch.ElasticSearchSink
> 11 Jun 2013 19:40:37,489 ERROR [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:145)
 - Failed to start agent because dependencies were not found in classpath. Error follows.
> java.lang.NoClassDefFoundError: org/elasticsearch/common/transport/TransportAddress
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:186)
>         at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:67)
>         at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41)
>         at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415)
>         at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
>         at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>         at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:165)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>         at java.lang.Thread.run(Thread.java:679)
> Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.transport.TransportAddress
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:217)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
>         ... 15 more
> ----------------------------------------------------------------------------------------------------------
> 
> From: Allan Feid <allanfeid@gmail.com>
> To: user@flume.apache.org; shushuai zhu <sszhu@yahoo.com> 
> Sent: Wednesday, June 12, 2013 10:09 AM
> Subject: Re: ElasticSearchSink does not work
> Hi Shushuai,
> 
> I've had a similar issue, and in my case it was because I was using the same channel
for multiple sinks. I believe what happens is whatever sink can remove the event from the
queue first will have it written out, but I don't know the specifics since I haven't had a
chance to read through the codebase. If you add a second channel for your elasticsearch sink
and make sure your avro-source writes to both channels, you should see data going to both
locations.
>  
> Thanks,
> Allan
> 
> On Tue, Jun 11, 2013 at 10:37 PM, shushuai zhu <sszhu@yahoo.com> wrote:
> Hi,
>  
> I am new to Flume. I am trying to send data using Flume Client perl API to Flume Avro
source then ElasticSearchSink to an ElasticSearch engine. I could make the file_roll sink
to work by sending the data to some file. However, I am encountering issue with ElasticSearchSink.
The data did not go through to ElasticSearch engine:
>  
> use Flume::Client;
> my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name', port
=> 41414);
> my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => $ng_client);
> my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => {},
body => "hello, this is sent from perl (using FlumeNG)"}]);
> print "$response\n";    # response will be 'OK' on success
>  
> since the returned $response is not defined (again this worked when file_roll sink was
used).
>  
> The ElasticSearch engine is working since I could load data to it via LogStash's EalsticSearch
output type.
>  
> The Flume agent was installed by downloading tarball from Cloudera:
>  
> http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.3.0-cdh4.3.0.tar.gz
>  
> The flume.conf is as followings. I played around the "hostNames" with full name, IP address,
etc. Do not see output message in flume.log. Could someone let me know what could potentially
cause the issue?
>  
> Thanks.
>  
> Shushuai
>  
>  
>  
> # Define a memory channel called ch1 on agent1
> agent1.channels = ch1
> agent1.channels.ch1.type = memory
>  
> # Define an Avro source called avro-source1 on agent1 and tell it to bind to 0.0.0.0:41414.
Connect it to channel ch1.
> agent1.sources = avro-source1
> agent1.sources.avro-source1.channels = ch1
> agent1.sources.avro-source1.type = avro
> agent1.sources.avro-source1.bind = 0.0.0.0
> agent1.sources.avro-source1.port = 41414
>  
> # Define a local file sink that simply logs all events it receives (this works well)
> #agent1.sinks = localout
> #agent1.sinks.localout.type = file_roll
> #agent1.sinks.localout.sink.directory = /scratch/flume-ng/log
> #agent1.sinks.localout.sink.rollInterval = 0
> #agent1.sinks.localout.channel = ch1
>  
> # Define ElasticSearchSink sink (this does not work)
> agent1.sinks = k1
> agent1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
> agent1.sinks.k1.hostNames = localhost:9300
> agent1.sinks.k1.indexName = flume
> agent1.sinks.k1.indexType = logs
> agent1.sinks.k1.clusterName = elasticsearch
> agent1.sinks.k1.batchSize = 2
> agent1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
> agent1.sinks.k1.channel = ch1
> 
> 
> 
> 
> 


Mime
View raw message