flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Allan Feid <allanf...@gmail.com>
Subject Re: YAMLException in the elasticsearch sink
Date Thu, 13 Jun 2013 19:33:36 GMT
After even further investigation, it seems the ContentBuilderUtil
calls org.elasticsearch.common.xcontent.XContentFactory, specifically the
xContentType method seen here:

https://github.com/elasticsearch/elasticsearch/blob/master/src/main/java/org/elasticsearch/common/xcontent/XContentFactory.java#L116

If that function returns null, then it just tries to force the data to be a
string, otherwise it does some magic parsing based on the contentType. I
believe that this is where the problem happens. The xContentType function
thinks my string is YAML, then the YAML parser fails and isn't caught. Does
it make sense to have the try/catch in addComplexField catch the YAML
exception and fall back to using a addSimpleField? It seems the code
already does this for JSON related exceptions.

Thanks,
Allan


On Thu, Jun 13, 2013 at 1:11 PM, Allan Feid <allanfeid@gmail.com> wrote:

> Edward,
>
> I think I've ran into a very similar issue while writing a custom
> interceptor. I simply catch the exception and log it when it happens, and
> this is what the log body looks like:
>
>
> foo¤data¤1371126476.436¤0.005¤555¤10.1.1.1¤HTTP/1.1¤GET¤http¤vhost¤/path/url¤¤-¤200¤
> referrer.com/search/?query=\x8D\x91\x89\xEF\x8Bc\x8E\x96\x93\xB0<http://referrer.com/search/?query=%5Cx8D%5Cx91%5Cx89%5CxEF%5Cx8Bc%5Cx8E%5Cx96%5Cx93%5CxB0>
> ¤-¤-¤-
>
> I believe the query string here is the culprit (I know the ¤ character
> works fine in utf8). I think ideally there would be a way to take the
> \x8D\x91.. data and leave it as a literal string, but I currently don't
> know how to do that.
>
> Thanks,
> Allan
>
>
> On Thu, Jun 13, 2013 at 12:13 PM, Edward Sargisson <esarge@pobox.com>wrote:
>
>> Hi Allan,
>> So it appears that flogger is simply grabbing standard input and put it
>> into the body - which is fine.
>> Can you track the error down to a specific line in your input file? I
>> would be interested to know how it is encoded.
>>
>> Cheers,
>> Edward
>>
>>
>> "
>> Edward,
>>
>> Flogger is available here:
>> https://github.com/cloudera/flume/tree/master/contrib/flogger
>>
>> I've forked it to accept multiple -t args, but it basically uses the
>> legacy thrift/rpc protocol to add events from STDIN. Both the file_roll and
>> HDFS sinks do not run into UTF-8 errors. The architecture is basically tail
>> | flogger -> local flume instance -> log processing flume instance -> {
>> hdfs, file_roll, elasticsearch }. I can send specific configs if necessary,
>> but it's all pretty standard as per the User Guide.
>>
>> Thanks,
>> Allan
>>
>>
>> On Wed, Jun 12, 2013 at 12:14 PM, Edward Sargisson <esarge@pobox.com>wrote:
>> Hi Allan,
>> I think I would run it a debugger and look at the buffer that way. You
>> should be able to put
>>
>>
>> JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,address=localhost:9009,server=y,suspend=n"
>>
>> into your /etc/flume-ng/conf/flume-env.conf and then attach a debugger
>> with a break point on org.apache.flume.sink.
>> elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:178)
>>
>> You could try the file_roll sink but I'm not sure if it won't munge the
>> character sets itself.
>>
>> Can you send me a link to flogger and your configuration for it? I'm not
>> familiar with it.
>>
>> Cheers,
>> Edward
>>
>> "
>> Edward,
>>
>> Thanks for the reply. I'm not encoding my events in any specific
>> character set. I'm using flogger to send application logs (nodejs, ruby,
>> perl etc) into my flume infrastructure. It seems that only the
>> ElasticSearchSink encounters this issue. I'm not sure if the HDFS or file
>> roll sinks are forcing an encoding before trying to process (haven't
>> checked the code yet). Is there an easy way to have flume output the hex
>> data of an event? I'd love to provide the hex alongside the exception.
>>
>> Thanks,
>> Allan
>>
>>
>> On Tue, Jun 11, 2013 at 12:33 PM, Edward Sargisson <esarge@pobox.com>wrote:
>>
>>> Hi Allan,
>>> I would like to see the contents of the event you are trying to store -
>>> in hex - paired with the exception that relates to that message.
>>> This, "Invalid UTF-8 start byte 0xfc (at char #81, byte #-1)" indicates
>>> that that there is a problem with the data and the character sets. In other
>>> words, are you encoding your data to be sent to Flume in UTF-8 or something
>>> else?
>>>
>>> Cheers,
>>> Edward
>>>
>>>
>>> "
>>> I think this might have to deal specifically with the LogStash
>>> serializer, but I am unsure. After a period of time, it seems some of my
>>> events cause an exception and eventually fill up my memory channel. Below
>>> is the stacktrace, any help would be greatly appreciated. I can file a bug
>>> report but would like to know what kind of information to provide.
>>>
>>> 10 Jun 2013 09:52:34,360 ERROR
>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>>> event. Exception follows.
>>> org.elasticsearch.common.jackson.dataformat.yaml.snakeyaml.error.YAMLException:
>>> java.io.CharConversionException: Invalid UTF-8 start byte 0xfc (at char
>>> #81, byte #-1)
>>>  at
>>> org.elasticsearch.common.jackson.dataformat.yaml.snakeyaml.reader.StreamReader.update(StreamReader.java:198)
>>> at
>>> org.elasticsearch.common.jackson.dataformat.yaml.snakeyaml.reader.StreamReader.<init>(StreamReader.java:62)
>>>  at
>>> org.elasticsearch.common.jackson.dataformat.yaml.YAMLParser.<init>(YAMLParser.java:147)
>>> at
>>> org.elasticsearch.common.jackson.dataformat.yaml.YAMLFactory._createParser(YAMLFactory.java:530)
>>>  at
>>> org.elasticsearch.common.jackson.dataformat.yaml.YAMLFactory.createJsonParser(YAMLFactory.java:420)
>>> at
>>> org.elasticsearch.common.xcontent.yaml.YamlXContent.createParser(YamlXContent.java:83)
>>>  at
>>> org.apache.flume.sink.elasticsearch.ContentBuilderUtil.addComplexField(ContentBuilderUtil.java:61)
>>> at
>>> org.apache.flume.sink.elasticsearch.ContentBuilderUtil.appendField(ContentBuilderUtil.java:47)
>>>  at
>>> org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.appendBody(ElasticSearchLogStashEventSerializer.java:87)
>>> at
>>> org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.getContentBuilder(ElasticSearchLogStashEventSerializer.java:79)
>>>  at
>>> org.apache.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:178)
>>> at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>  at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>> at java.lang.Thread.run(Thread.java:662)
>>> Caused by: java.io.CharConversionException: Invalid UTF-8 start byte
>>> 0xfc (at char #81, byte #-1)
>>> at
>>> org.elasticsearch.common.jackson.dataformat.yaml.UTF8Reader.reportInvalidInitial(UTF8Reader.java:395)
>>>  at
>>> org.elasticsearch.common.jackson.dataformat.yaml.UTF8Reader.read(UTF8Reader.java:247)
>>> at
>>> org.elasticsearch.common.jackson.dataformat.yaml.UTF8Reader.read(UTF8Reader.java:157)
>>>  at
>>> org.elasticsearch.common.jackson.dataformat.yaml.snakeyaml.reader.StreamReader.update(StreamReader.java:182)
>>> ... 13 more
>>> "
>>> ""
>>> Edward,
>>>
>>> Flogger is available here:
>>> https://github.com/cloudera/flume/tree/master/contrib/flogger
>>>
>>> I've forked it to accept multiple -t args, but it basically uses the
>>> legacy thrift/rpc protocol to add events from STDIN. Both the file_roll and
>>> HDFS sinks do not run into UTF-8 errors. The architecture is basically tail
>>> | flogger -> local flume instance -> log processing flume instance ->
{
>>> hdfs, file_roll, elasticsearch }. I can send specific configs if necessary,
>>> but it's all pretty standard as per the User Guide.
>>>
>>> Thanks,
>>> Allan
>>>
>>>
>>> On Wed, Jun 12, 2013 at 12:14 PM, Edward Sargisson <esarge@pobox.com>wrote:
>>> Hi Allan,
>>> I think I would run it a debugger and look at the buffer that way. You
>>> should be able to put
>>>
>>>
>>> JAVA_OPTS="-agentlib:jdwp=transport=dt_socket,address=localhost:9009,server=y,suspend=n"
>>>
>>> into your /etc/flume-ng/conf/flume-env.conf and then attach a debugger
>>> with a break point on org.apache.flume.sink.
>>> elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:178)
>>>
>>> You could try the file_roll sink but I'm not sure if it won't munge the
>>> character sets itself.
>>>
>>> Can you send me a link to flogger and your configuration for it? I'm not
>>> familiar with it.
>>>
>>> Cheers,
>>> Edward
>>>
>>> "
>>> Edward,
>>>
>>> Thanks for the reply. I'm not encoding my events in any specific
>>> character set. I'm using flogger to send application logs (nodejs, ruby,
>>> perl etc) into my flume infrastructure. It seems that only the
>>> ElasticSearchSink encounters this issue. I'm not sure if the HDFS or file
>>> roll sinks are forcing an encoding before trying to process (haven't
>>> checked the code yet). Is there an easy way to have flume output the hex
>>> data of an event? I'd love to provide the hex alongside the exception.
>>>
>>> Thanks,
>>> Allan
>>>
>>>
>>> On Tue, Jun 11, 2013 at 12:33 PM, Edward Sargisson <esarge@pobox.com>wrote:
>>>
>>>> Hi Allan,
>>>> I would like to see the contents of the event you are trying to store -
>>>> in hex - paired with the exception that relates to that message.
>>>> This, "Invalid UTF-8 start byte 0xfc (at char #81, byte #-1)" indicates
>>>> that that there is a problem with the data and the character sets. In other
>>>> words, are you encoding your data to be sent to Flume in UTF-8 or something
>>>> else?
>>>>
>>>> Cheers,
>>>> Edward
>>>>
>>>>
>>>> "
>>>> I think this might have to deal specifically with the LogStash
>>>> serializer, but I am unsure. After a period of time, it seems some of my
>>>> events cause an exception and eventually fill up my memory channel. Below
>>>> is the stacktrace, any help would be greatly appreciated. I can file a bug
>>>> report but would like to know what kind of information to provide.
>>>>
>>>> 10 Jun 2013 09:52:34,360 ERROR
>>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>>> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>>>> event. Exception follows.
>>>> org.elasticsearch.common.jackson.dataformat.yaml.snakeyaml.error.YAMLException:
>>>> java.io.CharConversionException: Invalid UTF-8 start byte 0xfc (at char
>>>> #81, byte #-1)
>>>>  at
>>>> org.elasticsearch.common.jackson.dataformat.yaml.snakeyaml.reader.StreamReader.update(StreamReader.java:198)
>>>> at
>>>> org.elasticsearch.common.jackson.dataformat.yaml.snakeyaml.reader.StreamReader.<init>(StreamReader.java:62)
>>>>  at
>>>> org.elasticsearch.common.jackson.dataformat.yaml.YAMLParser.<init>(YAMLParser.java:147)
>>>> at
>>>> org.elasticsearch.common.jackson.dataformat.yaml.YAMLFactory._createParser(YAMLFactory.java:530)
>>>>  at
>>>> org.elasticsearch.common.jackson.dataformat.yaml.YAMLFactory.createJsonParser(YAMLFactory.java:420)
>>>> at
>>>> org.elasticsearch.common.xcontent.yaml.YamlXContent.createParser(YamlXContent.java:83)
>>>>  at
>>>> org.apache.flume.sink.elasticsearch.ContentBuilderUtil.addComplexField(ContentBuilderUtil.java:61)
>>>> at
>>>> org.apache.flume.sink.elasticsearch.ContentBuilderUtil.appendField(ContentBuilderUtil.java:47)
>>>>  at
>>>> org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.appendBody(ElasticSearchLogStashEventSerializer.java:87)
>>>> at
>>>> org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer.getContentBuilder(ElasticSearchLogStashEventSerializer.java:79)
>>>>  at
>>>> org.apache.flume.sink.elasticsearch.ElasticSearchSink.process(ElasticSearchSink.java:178)
>>>> at
>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>  at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>> at java.lang.Thread.run(Thread.java:662)
>>>> Caused by: java.io.CharConversionException: Invalid UTF-8 start byte
>>>> 0xfc (at char #81, byte #-1)
>>>> at
>>>> org.elasticsearch.common.jackson.dataformat.yaml.UTF8Reader.reportInvalidInitial(UTF8Reader.java:395)
>>>>  at
>>>> org.elasticsearch.common.jackson.dataformat.yaml.UTF8Reader.read(UTF8Reader.java:247)
>>>> at
>>>> org.elasticsearch.common.jackson.dataformat.yaml.UTF8Reader.read(UTF8Reader.java:157)
>>>>  at
>>>> org.elasticsearch.common.jackson.dataformat.yaml.snakeyaml.reader.StreamReader.update(StreamReader.java:182)
>>>> ... 13 more
>>>> "
>>>> "
>>>
>>>
>

Mime
View raw message