flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ashish <paliwalash...@gmail.com>
Subject Re: Problems loading custom Serializer
Date Wed, 18 Feb 2015 11:29:04 GMT
Need to implement Builder like this inside your class (refer
org.apache.flume.serialization.LineDeserializer)

public static class Builder implements EventDeserializer.Builder {

  @Override
  public EventDeserializer build(Context context, ResettableInputStream in) {
    return new LineDeserializer(context, in);
  }

}

On Wed, Feb 18, 2015 at 4:44 PM, Tavira Manjon Juan Francisco
<juftavira@produban.com> wrote:
> Hi,
>
> I’ve developed a test custom serializer following examples and the way other
> decoders I’ve seen. The code itself does little, the real problem is an
> exception while trying to load:
>
>
>
> 2015-02-18 12:04:39,452 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]
> Component type: SINK, name: test07_sink started
>
> 2015-02-18 12:04:39,464 (lifecycleSupervisor-1-1) [INFO -
> org.apache.flume.sink.RollingFileSink.start(RollingFileSink.java:136)]
> RollingFileSink test07_sink started.
>
> 2015-02-18 12:04:39,481 (lifecycleSupervisor-1-0) [DEBUG -
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:132)]
> Initializing ReliableSpoolingFileEventReader with
> directory=/opt/bigdata/flume/files-in, metaDir=.flumespool,
> deserializer=LINE
>
> 2015-02-18 12:04:39,482 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)]
> Polling sink runner starting
>
> 2015-02-18 12:04:39,483 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [DEBUG -
> org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:167)]
> Opening output stream for file /opt/bigdata/flume/file-sink/1424257479324-1
>
> 2015-02-18 12:04:39,486 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [DEBUG -
> org.apache.flume.serialization.EventSerializerFactory.getInstance(EventSerializerFactory.java:48)]
> Not in enum, loading builder class: com.produban.flume.TestSerializer
>
> 2015-02-18 12:04:39,488 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]
> Unable to deliver event. Exception follows.
>
> org.apache.flume.FlumeException: Unable to instantiate Builder from
> com.produban.flume.TestSerializer: does not appear to implement
> org.apache.flume.serialization.EventSerializer$Builder
>
>         at
> org.apache.flume.serialization.EventSerializerFactory.getInstance(EventSerializerFactory.java:64)
>
>         at
> org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:171)
>
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> 2015-02-18 12:04:39,513 (lifecycleSupervisor-1-0) [DEBUG -
> org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:154)]
> Successfully created and deleted canary file:
> /opt/bigdata/flume/files-in/flume-spooldir-perm-check-4651476069921147664.canary
>
>
>
> Configuration for the agent points to:
>
>
>
> # OpenBank | BigData | test07: file as sink
>
> # Primer destino en disco
>
> test07.sinks.test07_sink.type = file_roll
>
> test07.sinks.test07_sink.channel = test07_channel
>
> test07.sinks.test07_sink.sink.directory = /opt/bigdata/flume/file-sink
>
> test07.sinks.test07_sink.sink.serializer = com.produban.flume.TestSerializer
>
>
>
>
>
> And class code is simple, it compiles properly:
>
>
>
> package com.produban.flume;
>
>
>
> import java.io.IOException;
>
> import java.io.OutputStream;
>
>
>
> import org.apache.flume.Context;
>
> import org.apache.flume.Event;
>
> import org.apache.flume.conf.Configurable;
>
> import org.apache.flume.serialization.*;
>
>
>
> public class TestSerializer  implements EventSerializer, Configurable {
>
>
>
>                 private boolean appendNewline = false;
>
>                 private OutputStream out;
>
>
>
>                 public TestSerializer(OutputStream out, Context context,
> Object object) {
>
>                                 // TODO Auto-generated constructor stub
>
>                                 this.appendNewline =
> context.getBoolean("appendNewline", Boolean.valueOf(true)).booleanValue();
>
>                                 this.out = out;
>
>
>
>                 }
>
>
>
>                 public EventSerializer build(Context arg0, OutputStream
> arg1) {
>
>                                 // TODO Auto-generated method stub
>
>                                 return null;
>
>                 }
>
>
>
>                 @Override
>
>                 public void afterCreate() throws IOException {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
>                 @Override
>
>                 public void afterReopen() throws IOException {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
>                 @Override
>
>                 public void beforeClose() throws IOException {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
>                 @Override
>
>                 public void flush() throws IOException {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
>                 @Override
>
>                 public boolean supportsReopen() {
>
>                                 // TODO Auto-generated method stub
>
>                                 return false;
>
>                 }
>
>
>
>                 @Override
>
>                 public void write(Event arg0) throws IOException {
>
>                                 // TODO Auto-generated method stub
>
>                                 String newBody;
>
>
>
>
> System.console().writer().println("TestSerializer content:");
>
>
> System.console().writer().println(arg0.getBody().toString());
>
>                     System.console().writer().println("TestSerializer to
> write:");
>
>
> newBody=arg0.getBody().toString()+","+arg0.getBody().toString().length();
>
>                                 arg0.setBody(newBody.getBytes());
>
>
> System.console().writer().println(arg0.getBody().toString());
>
>                                 this.out.write((arg0.getHeaders() + "
> ").getBytes());
>
>                     this.out.write(arg0.getBody());
>
>                     if (this.appendNewline) {
>
>                       this.out.write(10);
>
>                     }
>
>
>
>                 }
>
>
>
>                 @Override
>
>                 public void configure(Context arg0) {
>
>                                 // TODO Auto-generated method stub
>
>
>
>                 }
>
>
>
>                   public static class Builder
>
>                     implements EventSerializer.Builder
>
>                   {
>
>                     public EventSerializer build(Context context,
> OutputStream out)
>
>                     {
>
>                                 TestSerializer s = new TestSerializer(out,
> context, null);
>
>                       return s;
>
>                     }
>
>                   }
>
> }
>
>
>
>
>
> FlumeTestSink.jar is located under:
> /opt/bigdata/flume/apache-flume-1.5.2-bin/lib and loads.
>
>
>
> Any guess on the problem?
>
>
>
> Thanks in advance,
>
> Juan
>
>
>
>
>
> _____________________________________________________________________
>
> Juan Francisco Tavira Manjón
>
> Grupo Santander - Produban
>
> Dirección Global de Técnica de Sistemas
>
> Sistemas Distribuidos: BPM / Tibco
>
> Parque Empresarial La Finca - Edificio 16 planta 1
>
> Paseo del Club Deportivo s/n - 28223 Pozuelo de Alarcón (Madrid)
>
> Teléfono: +34 91 289 88 43 – Móvil: +34 615 90 92 01
>
> Email: juftavira@produban.com
>
>  “It's the ship that made the Kessel Run in less than twelve parsecs”
>
>
>
>
> ________________________________
>
> Antes de imprimir este mensaje o sus documentos anexos, asegúrese de que es
> necesario.
> Proteger el medio ambiente está en nuestras manos.
>
> Before printing this e-mail or attachments, be sure it is necessary.
> It is in our hands to protect the environment.
>
> ******************AVISO LEGAL**********************
>
> Este mensaje es privado y confidencial y solamente para la persona a la que
> va dirigido. Si usted ha recibido este mensaje por error, no debe revelar,
> copiar, distribuir o usarlo en ningún sentido. Le rogamos lo comunique al
> remitente y borre dicho mensaje y cualquier documento adjunto que pudiera
> contener. No hay renuncia a la confidencialidad ni a ningún privilegio por
> causa de transmisión errónea o mal funcionamiento.
>
> Cualquier opinión expresada en este mensaje pertenece únicamente al autor
> remitente, y no representa necesariamente la opinión de Grupo Santander, a
> no ser que expresamente se diga y el remitente esté autorizado para hacerlo.
> Los correos electrónicos no son seguros, no garantizan la confidencialidad
> ni la correcta recepción de los mismos, dado que pueden ser interceptados,
> manipulados, destruidos, llegar con demora, incompletos, o con virus. Grupo
> Santander no se hace responsable de las alteraciones que pudieran hacerse al
> mensaje una vez enviado.
>
> Este mensaje sólo tiene una finalidad de información, y no debe
> interpretarse como una oferta de venta o de compra de valores ni de
> instrumentos financieros relacionados. En el caso de que el destinatario de
> este mensaje no consintiera la utilización del correo electrónico vía
> Internet, rogamos lo ponga en nuestro conocimiento.
>
>
>
> **********************DISCLAIMER*****************
>
> This message is private and confidential and it is intended exclusively for
> the addressee. If you receive this message by mistake, you should not
> disseminate, distribute or copy this e-mail. Please inform the sender and
> delete the message and attachments from your system. No confidentiality nor
> any privilege regarding the information is waived or lost by any
> mistransmission or malfunction.
>
> Any views or opinions contained in this message are solely those of the
> author, and do not necessarily represent those of Grupo Santander, unless
> otherwise specifically stated and the sender is authorized to do so. E-mail
> transmission cannot be guaranteed to be secure, confidential, or error-free,
> as information could be intercepted, corrupted, lost, destroyed, arrive
> late, incomplete, or contain viruses. Grupo Santander does not accept
> responsibility for any changes in the contents of this message after it has
> been sent.
>
> This message is provided for informational purposes and should not be
> construed as a solicitation or offer to buy or sell any securities or
> related financial instruments. If the addressee of this message does not
> consent to the use of internet e-mail, please communicate it to us.
>
>
>
>



-- 
thanks
ashish

Blog: http://www.ashishpaliwal.com/blog
My Photo Galleries: http://www.pbase.com/ashishpaliwal

Mime
View raw message