flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shushuai zhu <ss...@yahoo.com>
Subject Re: How to use Flume Client perl API to send data to ElasticSearch?
Date Fri, 14 Jun 2013 18:52:05 GMT
Edward,
 
Thanks for the reply. I tried using both epoch time and date for timestamp but either of them
makes the ElasticSearch to be viewed by Kibana. The perl call looks like:
 
my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => {"\@source"
=> "default", "\@timestamp" => 1369769253000, "\@source_host" => "null", "\@source_path"
=> "default", "\@message" => "abc", "\@type" => "flume-input"}, body => "hello,
this is sent from perl (using FlumeNG)"}]);
print "$response\n";    # response will be 'OK' on success

I could always query the ElasticSearch with REST API. A returned example result in JSON looks
like:
 
{
_index: "flume-2013-06-14"
_type: "logs"
_id: "iug8xyVWRK6Qm7Z22ohCXw"
_score: 1
_source: {
body: "hello, this is sent from perl (using FlumeNG)"
@timestamp: "2013-05-28T19:27:49.947Z"
@message: "abc"
@source: "default"
@type: "flume-input"
@source_host: "null"
@source_path: "default"
}
}
 
A returned example result from the ElasticSearch built with Redis+LogStash looks like:
 
{
_index: "logstash-2013.05.28"
_type: "redis-input"
_id: "I3AJHKzhQ1GXWc1Lb8WlFQ"
_score: 1
_source: {
@source: "default"
@tags: [0]
@fields: {
OBS_FIELD_PRIORITY: "DEBUG"
OBS_LOG_ENTRY_CONTENT: "2013-05-28 12:34:21,224 [#SYSTEM_POOL#:WorkManagerSvc] DEBUG workmanager.WorkManagerStats
logp.251 - WeightedAvg.summarize[RunningThreads] : Avg : 1.0, min : 1, max : 1, lastVal :
1, dur : 60014812861, totdur : 60014812861 "
OBS_FIELD_CLASS: "workmanager.WorkManagerStats"
OBS_LOG_FILE: "/scratch/work/system/log/abc.trc"
OBS_FIELD_MESSAGE: "WeightedAvg.summarize[RunningThreads] : Avg : 1.0, min : 1, max : 1, lastVal
: 1, dur : 60014812861, totdur : 60014812861 "
OBS_FIELD_LOGGER: "#SYSTEM_POOL#:WorkManagerSvc"
OBS_LOG_ENTRY_LENGTH: "226"
OBS_FIELD_METHOD: "logp.251"
OBS_FIELD_TIME: "1369769661224"
}
@timestamp: "2013-05-28T19:34:50.672Z"
@source_host: null
@source_path: "default"
@type: "redis-input"
}
}
 
Not sure what exactly in ElasticSearch causes the issue for Kibana. The "body" name/value
seems incompatible with others, but I could not control it (it still shows with an empty string
even if I removed it from the API call).
 
Another major issue I have with the perl API is, how could I add the @fields data? When I
have another level of JSON in "hearders", it became an object string when querying with REST.

 
Shushuai
 

________________________________
 From: Edward Sargisson <esarge@pobox.com>
To: user <user@flume.apache.org> 
Sent: Friday, June 14, 2013 11:53 AM
Subject: How to use Flume Client perl API to send data to ElasticSearch?
  


Hi Shushuai,
The Flume event must have a header with the name timestamp (all lower case) which is Unix
milliseconds since 1970.

Without this, the event gets sent to elasticsearch but Kibana can never read it.

The ElasticSearchSink converts the timestamp (millis) header into the @timestamp for Kibana.


Cheers,
Edward


"

Hi,

I am sending data in JSON format using Flume Client perl API to Flume 
Avro source then ElasticSearchSink to an ElasticSearch engine. I could 
send the data to ElasticSearch engine since I saw new indexes created in the engine, but the
data are not formatted correctly or partially lost 
since they could not be viewed via Kibana.

An example of the data in JSON format is:

{"@timestamp":"2013-05-28T23:59:43.300Z","@source_host":"Source host name","@type":"flume-input","@fields":{"FIELD_PRIORITY":"DEBUG","FIELD_CLASS":"procedure.engine","FIELD_FILE":"/scratch/abc/abc.log","FIELD_MESSAGE":"The
debug message","FIELD_TIME":"1369785558004"}} 

Earlier, I used LogStash to pull JSON data from Redis to ElasticSearch, and the LogStash conf
file looks like:

input {
    redis {
        host => "abc"
        type => "redis-input"
        data_type => "list"
        key => "logstash"
        format => "json"
    }
}
output {
    elasticsearch {
        embedded => false
        bind_host => "abc"
        host => "abc"
        port => 9300
  }
}

The data were correctly input to ElasticSearch and could be viewed via Kibana. 

The sample perl API call for FlumeNG shown in Flume Client doc is: 

use Flume::Client;
my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name', port =>
41414);
my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => $ng_client);
 my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => {},
body => "hello, this is sent from perl (using FlumeNG)"}]);
print "$response\n";    # response will be 'OK' on success 

I tried putting the JSON data in "body" or "headers" or some other ways 
in the above example, but could not get the right results in 
ElasticSearch. Could someone tell me how to use the API to send the 
above example data to ElasticSearch?

Shushuai"
Mime
View raw message