flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shushuai zhu <ss...@yahoo.com>
Subject ElasticSearchSink does not work
Date Wed, 12 Jun 2013 02:37:03 GMT
I am new to Flume. I am trying to send data using Flume Client perl API to Flume Avro source
then ElasticSearchSink to an ElasticSearch engine. I could make the file_roll sink to work
by sending the data to some file. However, I am encountering issue with ElasticSearchSink.
The data did not go through to ElasticSearch engine:
use Flume::Client;
my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name', port =>
my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => $ng_client);
my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => {}, body
=> "hello, this is sent from perl (using FlumeNG)"}]);
print "$response\n";    # response will be 'OK' on success
since the returned $response is not defined (again this worked when file_roll sink was used).
The ElasticSearch engine is working since I could load data to it via LogStash's EalsticSearch
output type.
The Flume agent was installed by downloading tarball from Cloudera:
The flume.conf is as followings. I played around the "hostNames" with full name, IP address,
etc. Do not see output message in flume.log. Could someone let me know what could potentially
cause the issue?
# Define a memory channel called ch1 on agent1
agent1.channels = ch1
agent1.channels.ch1.type = memory
# Define an Avro source called avro-source1 on agent1 and tell it to bind to
Connect it to channel ch1.
agent1.sources = avro-source1
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind =
agent1.sources.avro-source1.port = 41414
# Define a local file sink that simply logs all events it receives (this works well)
#agent1.sinks = localout
#agent1.sinks.localout.type = file_roll
#agent1.sinks.localout.sink.directory = /scratch/flume-ng/log
#agent1.sinks.localout.sink.rollInterval = 0
#agent1.sinks.localout.channel = ch1
# Define ElasticSearchSink sink (this does not work)
agent1.sinks = k1
agent1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.k1.hostNames = localhost:9300
agent1.sinks.k1.indexName = flume
agent1.sinks.k1.indexType = logs
agent1.sinks.k1.clusterName = elasticsearch
agent1.sinks.k1.batchSize = 2
agent1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
agent1.sinks.k1.channel = ch1

View raw message