flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nikhil Gs <gsnikhil1432...@gmail.com>
Subject Re: Constant Flume Error
Date Sun, 30 Aug 2015 19:33:52 GMT
Hello Team and Hari,

Thanks for your reply Hari Shreedharan. If you observe the conf file,

pnmtest2.sources.SPOOL.spoolDir      = /home/a_nikhil.gopishetti/pnm
#pnmtest2.sources.SPOOL.spoolDir      =
/home/s_sdldalplhdxxxedh/pnmtest2-poll-results
#pnmtest2.sources.SPOOL.spoolDir  =
/home/s_sdldalplhdxxxedh/pnm-poll-results-uat

Out of the three spool dir, I am using only one spool directory i.e.
pnmtest2.sources.SPOOL.spoolDir      = /home/a_nikhil.gopishetti/pnm. The
other two are commented with the # symbol.

When coming to the deserializer file, below is the code for it;

public class WholeFileDeserializer implements EventDeserializer {

    private Context context = null;
    private ResettableInputStream stream = null;
    private volatile boolean isOpen;


    public WholeFileDeserializer(Context context, ResettableInputStream in)
{
        this.context = context;
        this.stream = in;
        this.isOpen = true;

    }

    @Override
    public Event readEvent() throws IOException {
        ensureOpen();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        byte[] buffer = new byte[4096];

        int rc = 0;
        while (rc != -1) {
            rc = stream.read(buffer, 0, 4096);
            if (rc != -1) {
                out.write(buffer, 0, rc);
            }
        }


        if (out.size() > 0) {
            System.out.println("Read event " + out);
            return EventBuilder.withBody(out.toByteArray());
        } else {
            return null;
        }
    }

    @Override
    public List<Event> readEvents(int i) throws IOException {
        ensureOpen();

        if (i > 1) throw new RuntimeException("WholeFileDeserializer
creates one event for the whole file");

        List<Event> events = new ArrayList<Event>(1);

        Event event = readEvent();
        if (event != null) {
            events.add(event);
        } else {
            return events;

        }
        return events;
    }

    @Override
    public void mark() throws IOException {
        ensureOpen();
        stream.mark();
    }

    @Override
    public void reset() throws IOException {
        ensureOpen();
        stream.reset();
    }

    @Override
    public void close() throws IOException {
        if (isOpen) {
            reset();
            stream.close();
            isOpen = false;
        }
    }

    private void ensureOpen() {
        if (!isOpen) {
            throw new IllegalStateException("Serializer has been closed");
        }
    }

    public static class Builder implements EventDeserializer.Builder {

        @Override
        public EventDeserializer build(Context context,
ResettableInputStream in) {
            return new WholeFileDeserializer(context, in);
        }
    }

}

Your valuable time and suggestions are highly required and appreciated.

Thanks!




Regards,
Nik.

On Sun, Aug 30, 2015 at 2:17 PM, Hari Shreedharan <hshreedharan@cloudera.com
> wrote:

> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible
> for Automatic Cleanup! (hshreedharan@cloudera.com) Add cleanup rule
> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DsGvPxkXH8XyTo4u7ZGWZpCn98ALFGASh9HxPB9YGPiS7BsrH2MU6k7xzT9ts2nZJbS5sTa%252FdS64Ll1zLbDf6mDmz2U%252Fvp7rdEBwhsYz8TGLwhDf6EAYngiAS6L5VX6Mf6XUMWaJEA2O4m9%252F5LUeOFw%253D%253D%26key%3DatoK4Lda9tEFLFe85OcfoCW3bKRUAprL6GYyn%252BQx3QI%253D&tc_serial=22470161841&tc_rand=2113902114&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
> | More info
> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=22470161841&tc_rand=2113902114&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>
> It looks like the deserializer you are using is throwing the exception
> causing the source to not work. Also spool dir  source can work with only
> one directory, so it is inlet reading files from one of the specified
> directories, not all three of them.
>
> On Sunday, August 30, 2015, Nikhil Gs <gsnikhil1432010@gmail.com> wrote:
>
>> Hello Team,
>>
>> Thank you in advance for your cooperation and time.
>>
>> Facing this issue several times. Tried to restart flume and build the
>> jar. But files are not deleting from the flume spool directory and below is
>> my error and flume config file.
>>
>> Any suggestions would be great helpful. It is urgent because of
>> production.
>>
>>
>> # Please paste flume.conf here. Example:
>>
>> # Sources, channels, and sinks are defined per
>> # agent name, in this case 'pnmtest2'.
>> pnmtest2.sources  = SPOOL
>> pnmtest2.channels = MemChanneltest2
>> pnmtest2.sinks    = AVRO
>>
>> # For each source, channel, and sink, set
>> # standard properties.
>> pnmtest2.sources.SPOOL.type          = spooldir
>> pnmtest2.sources.SPOOL.spoolDir      = /home/a_nikhil.gopishetti/pnm
>> #pnmtest2.sources.SPOOL.spoolDir      =
>> /home/s_sdldalplhdxxxedh/pnmtest2-poll-results
>> #pnmtest2.sources.SPOOL.spoolDir  =
>> /home/s_sdldalplhdxxxedh/pnm-poll-results-uat
>> pnmtest2.sources.SPOOL.ignorePattern = \.*tmp$
>> pnmtest2.sources.SPOOL.channels      = MemChanneltest2
>> pnmtest2.sources.SPOOL.fileHeader    = true
>> pnmtest2.sources.SPOOL.deletePolicy  = immediate
>> pnmtest2.sources.SPOOL.consumeOrder  = oldest
>> pnmtest2.sources.SPOOL.batchSize     = 100
>>
>> pnmtest2.sources.SPOOL.interceptors = time
>> pnmtest2.sources.SPOOL.interceptors.time.type =
>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>> pnmtest2.sources.SPOOL.deserializer  =
>> com.suddenlink.flume.WholeFileDeserializer$Builder
>>
>> pnmtest2.sinks.AVRO.type         = avro
>> pnmtest2.sinks.AVRO.channel      = MemChanneltest2
>> pnmtest2.sinks.AVRO.hostname = sdldalplhdw02.suddenlink.cequel3.com
>> pnmtest2.sinks.AVRO.port     = 40002
>> pnmtest2.sinks.AVRO.batch-size = 100
>> pnmtest2.sinks.AVRO.connect-timeout = 40000
>>
>> #pnmtest2.channels.MemChanneltest1.capacity = 10000
>> #pnmtest2.channels.MemChanneltest1.type   = memory
>>
>> pnmtest2.channels.MemChanneltest2.capacity = 1000000
>> pnmtest2.channels.MemChanneltest2.type   = memory
>>
>>
>> *Flume Log Error*
>>
>>
>> 12:02:57.363 PMINFOorg.mortbay.log
>>
>> Started SelectChannelConnector@0.0.0.0:41414
>>
>> 12:02:57.621 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Rpc sink AVRO started.
>>
>> 12:07:30.095 PMERRORorg.apache.flume.source.SpoolDirectorySource
>>
>> *FATAL: Spool Directory source SPOOL: { spoolDir: /home/a_nikhil.gopishetti/pnm/
}: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue
processing.
>> java.lang.RuntimeException: WholeFileDeserializer creates one event for the whole
file
>> 	at com.suddenlink.flume.WholeFileDeserializer.readEvents(WholeFileDeserializer.java:59)
>> 	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252)
>> 	at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
>> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> 	at java.lang.Thread.run(Thread.java:745)*
>>
>> 1:12:31.161 PMINFOorg.apache.flume.lifecycle.LifecycleSupervisor
>>
>> Stopping lifecycle supervisor 11
>>
>> 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: CHANNEL, name: MemChanneltest2 stopped
>>
>> 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.start.time == 1440954177242
>>
>> 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.stop.time == 1440958351163
>>
>> 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.capacity == 1000000
>>
>> 1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.current.size ==
0
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.event.put.attempt
== 0
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.event.put.success
== 0
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.event.take.attempt
== 523
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.event.take.success
== 0
>>
>> 1:12:31.164
>> PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider
>>
>> Configuration provider stopping
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: SOURCE, name: SPOOL stopped
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. source.start.time == 1440954177279
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. source.stop.time == 1440958351164
>>
>> 1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.append-batch.accepted == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.append-batch.received == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.append.accepted == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.append.received == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.events.accepted == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.events.received == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SOURCE, name: SPOOL. src.open-connection.count == 0
>>
>> 1:12:31.165 PMINFOorg.apache.flume.source.SpoolDirectorySource
>>
>> SpoolDir source SPOOL stopped. Metrics: SOURCE:SPOOL{src.events.accepted=0, src.events.received=0,
src.append.accepted=0, src.append-batch.accepted=0, src.open-connection.count=0, src.append-batch.received=0,
src.append.received=0}
>>
>> 1:12:31.165 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Rpc sink AVRO stopping...
>>
>> 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: SINK, name: AVRO stopped
>>
>> 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.start.time == 1440954177243
>>
>> 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.stop.time == 1440958351170
>>
>> 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.batch.complete == 0
>>
>> 1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.batch.empty == 523
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.batch.underflow == 0
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.connection.closed.count == 1
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.connection.creation.count == 1
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.connection.failed.count == 0
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.event.drain.attempt == 0
>>
>> 1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Shutdown Metric for type: SINK, name: AVRO. sink.event.drain.sucess == 0
>>
>> 1:12:31.171 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Rpc sink AVRO stopped. Metrics: SINK:AVRO{sink.connection.closed.count=1, sink.event.drain.attempt=0,
sink.batch.underflow=0, sink.connection.failed.count=0, sink.connection.creation.count=1,
sink.event.drain.sucess=0, sink.batch.empty=523, sink.batch.complete=0}
>>
>> 1:12:31.171 PMINFOorg.mortbay.log
>>
>> Stopped SelectChannelConnector@0.0.0.0:41414
>>
>> 1:12:46.490
>> PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider
>>
>> Configuration provider starting
>>
>> 1:12:46.513
>> PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider
>>
>> Reloading configuration file:/var/run/cloudera-scm-agent/process/5683-flume-AGENT/flume.conf
>>
>> 1:12:46.520 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Added sinks: AVRO Agent: pnmtest2
>>
>> 1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Processing:AVRO
>>
>> 1:12:46.554 PMINFOorg.apache.flume.conf.FlumeConfiguration
>>
>> Post-validation flume configuration contains configuration for agents: [pnmtest2]
>>
>> 1:12:46.554 PMINFOorg.apache.flume.node.AbstractConfigurationProvider
>>
>> Creating channels
>>
>> 1:12:46.564 PMINFOorg.apache.flume.channel.DefaultChannelFactory
>>
>> Creating instance of channel MemChanneltest2 type memory
>>
>> 1:12:46.569 PMINFOorg.apache.flume.node.AbstractConfigurationProvider
>>
>> Created channel MemChanneltest2
>>
>> 1:12:46.570 PMINFOorg.apache.flume.source.DefaultSourceFactory
>>
>> Creating instance of source SPOOL, type spooldir
>>
>> 1:12:46.597 PMINFOorg.apache.flume.sink.DefaultSinkFactory
>>
>> Creating instance of sink: AVRO, type: avro
>>
>> 1:12:46.604 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Connection reset is set to 0. Will not reset connection to next hop
>>
>> 1:12:46.606 PMINFOorg.apache.flume.node.AbstractConfigurationProvider
>>
>> Channel MemChanneltest2 connected to [SPOOL, AVRO]
>>
>> 1:12:46.615 PMINFOorg.apache.flume.node.Application
>>
>> Starting new configuration:{ sourceRunners:{SPOOL=EventDrivenSourceRunner: { source:Spool
Directory source SPOOL: { spoolDir: /home/a_nikhil.gopishetti/pnm } }} sinkRunners:{AVRO=SinkRunner:
{ policy:org.apache.flume.sink.DefaultSinkProcessor@452d9a56 counterGroup:{ name:null counters:{}
} }} channels:{MemChanneltest2=org.apache.flume.channel.MemoryChannel{name: MemChanneltest2}}
}
>>
>> 1:12:46.615 PMINFOorg.apache.flume.node.Application
>>
>> Starting Channel MemChanneltest2
>>
>> 1:12:46.674 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Monitored counter group for type: CHANNEL, name: MemChanneltest2: Successfully registered
new MBean.
>>
>> 1:12:46.675 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: CHANNEL, name: MemChanneltest2 started
>>
>> 1:12:46.675 PMINFOorg.apache.flume.node.Application
>>
>> Starting Sink AVRO
>>
>> 1:12:46.675 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Starting RpcSink AVRO { host: sdldalplhdw02.suddenlink.cequel3.com, port: 40002 }...
>>
>> 1:12:46.675 PMINFOorg.apache.flume.node.Application
>>
>> Starting Source SPOOL
>>
>> 1:12:46.676 PMINFOorg.apache.flume.source.SpoolDirectorySource
>>
>> SpoolDirectorySource source starting with directory: /home/a_nikhil.gopishetti/pnm
>>
>> 1:12:46.676 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Monitored counter group for type: SINK, name: AVRO: Successfully registered new MBean.
>>
>> 1:12:46.676 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: SINK, name: AVRO started
>>
>> 1:12:46.676 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Rpc sink AVRO: Building RpcClient with hostname: sdldalplhdw02.suddenlink.cequel3.com,
port: 40002
>>
>> 1:12:46.676 PMINFOorg.apache.flume.sink.AvroSink
>>
>> Attempting to create Avro Rpc client.
>>
>> 1:12:46.699 PMWARNorg.apache.flume.api.NettyAvroRpcClient
>>
>> Using default maxIOWorkers
>>
>> 1:12:46.706 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Monitored counter group for type: SOURCE, name: SPOOL: Successfully registered new
MBean.
>>
>> 1:12:46.706 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
>>
>> Component type: SOURCE, name: SPOOL started
>>
>> 1:12:46.718 PMINFOorg.mortbay.log
>>
>> Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
>>
>> 1:12:46.783 PMINFOorg.mortbay.log
>>
>> jetty-6.1.26.cloudera.4
>>
>> 1:12:46.829 PMINFOorg.mortbay.log
>>
>> Started SelectChannelConnector@0.0.0.0:41414
>>
>> 1:12:47.068 PMINFOorg.apache.flume.sink.AbstractRpcSink
>>
>> Rpc sink AVRO started.
>>
>>
>>
>> Regards,
>> Nik.
>>
>
>
> --
>
> Thanks,
> Hari
>
>
>

Mime
View raw message