flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brock Noland <br...@cloudera.com>
Subject Re: Handling malformed data when using custom AvroEventSerializer and HDFS Sink
Date Fri, 03 Jan 2014 00:59:42 GMT
Thanks Wolfgang!  That is very nice.


On Thu, Jan 2, 2014 at 6:49 PM, Wolfgang Hoschek <whoschek@cloudera.com>wrote:

> FWIW, here is an example for how this could be handled in a
> MorphlineInterceptor:
>
> morphlines : [
>   {
>     id : morphline1
>     importCommands : ["org.kitesdk.**"]
>
>     commands : [
>       {
>         tryRules {
>           catchExceptions: true
>           rules : [
>             # first rule
>             {
>               commands : [
>                 # save initial state
>                 { setValues { _tmp : "@{_attachment_body}" } }
>
>                 # if JSON parsing succeeds replace _attachment_body with
> JSON jackson object
>                 { readJson {} }
>
>                 # if we reach here the JSON parsing has succeeded
>                 # restore state prior to readJson command
>                 { setValues { _attachment_body : "@{_tmp}" } }
>                 { setValues { _tmp : [] } }
>                 { setValues { _attachment_mimetype : [] } }
>               ]
>             }
>
>             # second rule is executed if the previous rule failed or threw
> an exception
>             {
>               commands : [
>                 { logDebug { format : "Marking event as malformed for
> downstream sink: {}", args: ["@{}"] } }
>                 { addValues { malformed : true } }
>               ]
>             }
>
>           ]
>         }
>       }
>     ]
>   }
> ]
>
> Also see
> http://kitesdk.org/docs/current/kite-morphlines/morphlinesReferenceGuide.html#tryRules
>
> Wolfgang.
>
> On Jan 3, 2014, at 2:03 AM, ed wrote:
>
> > Thank you Brock, Devin and Jimmy for the great information.  Dumping
> null values in the the EventSerializer write method looks really easy to do
> but I think using the custom interceptor to validate then tag the event for
> proper good/bad routing sounds like a great idea and seems to fit into the
> Flume way of doing things better.
> >
> > Thank you again!
> >
> > ~Ed
> >
> >
> > On Fri, Jan 3, 2014 at 2:40 AM, Devin Suiter RDX <dsuiter@rdx.com>
> wrote:
> > Yes, the regex interceptors and selectors can be very powerful -
> experimenting with them was really exciting.
> >
> > Brock, thanks for validating the ML idea - as with most things, the
> simplest solution is probably the way to go, and in this use case, the
> morphlines might be overkill.
> >
> > Devin Suiter
> > Jr. Data Solutions Software Engineer
> >
> > 100 Sandusky Street | 2nd Floor | Pittsburgh, PA 15212
> > Google Voice: 412-256-8556 | www.rdx.com
> >
> >
> > On Thu, Jan 2, 2014 at 12:27 PM, Brock Noland <brock@cloudera.com>
> wrote:
> > Jimmy, great to hear that method is working for you!
> >
> > Devin, regarding the morphlines question. Since ML can have arbitrary
> java plugins it *can* do just about anything. I generally think of ML as
> the T in ETL. Doing the validation in ML might make sense. In general
> though I think adding the custom header field as probably the best option
> for dealing with bad data.
> >
> > Once again, thank you everyone for using our software!
> >
> >
> > On Thu, Jan 2, 2014 at 10:10 AM, Jimmy <jimmyjack@gmail.com> wrote:
> > We are doing similar thing what Brock mentioned - simple interceptor for
> JSON validation with updating custom field in the header, then flume HDFS
> sink pushes the data to good/bad target directory based on this custom
> field.... then watch for bad directory in separate process.
> >
> > You could add notification to the flume flow, we wanted to keep it very
> simple.
> >
> >
> >
> >
> > ---------- Forwarded message ----------
> > From: Devin Suiter RDX <dsuiter@rdx.com>
> > Date: Thu, Jan 2, 2014 at 7:40 AM
> > Subject: Re: Handling malformed data when using custom
> AvroEventSerializer and HDFS Sink
> > To: user@flume.apache.org
> >
> >
> > Just throwing this out there, since I haven't had time to dig into the
> API with a big fork, but, can morphlines offer any assistance here?
> >
> > Some kind of an interceptor that would parse for malformed data, package
> the offending data and send it somewhere (email it, log it), and then
> project a valid "there was something wrong here" piece of data into the
> field then allow your channel to carry on? Or skip the projection piece and
> just move along? I was just thinking that the projection of known data into
> a field that previously had malformed data would allow you to easily locate
> those records later with the projected data, but keep your data shape
> consistent.
> >
> > Kind of looking to Brock as a sounding board as to the appropriateness
> of this as a potential solution since morphlines takes some time to really
> understand well...
> >
> > Devin Suiter
> > Jr. Data Solutions Software Engineer
> >
> > 100 Sandusky Street | 2nd Floor | Pittsburgh, PA 15212
> > Google Voice: 412-256-8556 | www.rdx.com
> >
> >
> > On Thu, Jan 2, 2014 at 10:25 AM, Brock Noland <brock@cloudera.com>
> wrote:
> >
> > On Tue, Dec 31, 2013 at 8:34 PM, ed <edorsey@gmail.com> wrote:
> > Hello,
> >
> > We are using Flume v1.4 to load JSON formatted log data into HDFS as
> Avro.  Our flume setup looks like this:
> >
> > NXLog ==> (FlumeHTTPSource -> HDFSSink w/ custom EventSerializer)
> >
> > Right now our custom EventSerializer (which extends
> AbstractAvroEventSerializer) takes the JSON input from the HTTPSource and
> converts it into an avro record of the appropriate type for the incoming
> log file.  This is working great and we use the serializer to add some
> additional "synthetic" fields to the avro record that don't exist in the
> original JSON log data.
> >
> > My question concerns how to handle malformed JSON data (or really any
> error inside of the custom EventSerializer).  It's very likely that as we
> parse the JSON there will be records where something is malformed (either
> the JSON itself, or a field is of the wrong type etc.).
> >
> > For example, a "port" field which should always be an Integer might for
> some reason have some ASCII text in it.  I'd like to catch these errors in
> the EventSerializer and then write out the bad JSON to a log file somewhere
> that we can monitor.
> >
> > Yeah it would be nice to have a better story about this in Flume.
> >
> >
> > What is the best way to do this?
> >
> > Typically people will either log it to a file or send it through another
> "flow" to a different HDFS sink.
> >
> >
> > Right now, all the logic for catching bad JSON would be inside of the
> "convert" function of the EventSerializer.  Should the convert function
> itself throw an exception that will be gracefully handled upstream
> >
> > The exception will be logged but that is it..
> >
> > or do I just return a "null" value if there was an error?  Would it be
> appropriate to log errors directly to a database from inside the
> EventSerializer convert method or would this be too slow?
> >
> > That might be too slow to do directly. If I did that I'd have a separate
> thread doing that and then an in-memory queue between the serializer and
> thread.
> >
> > What are the best practices for this type of error handling?
> >
> > If looks to me like we'd need to change AbstractAvroEventSerilizer to
> filter out nulls:
> >
> >
> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization/AbstractAvroEventSerializer.java#L106
> >
> > which we could easily do.  Since you don't want to wait for that you
> could override the write method to do this.
> >
> >
> > Thank you for any assistance!
> >
> > Best Regards,
> >
> > Ed
> >
> >
> >
> > --
> > Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
> >
> >
> >
> >
> > --
> > Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
> >
> >
>
>


-- 
Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org

Mime
View raw message