Hello Team and Hari,

Thanks for your reply Hari Shreedharan. If you observe the conf file,

pnmtest2.sources.SPOOL.spoolDir      = /home/a_nikhil.gopishetti/pnm
#pnmtest2.sources.SPOOL.spoolDir      = /home/s_sdldalplhdxxxedh/pnmtest2-poll-results
#pnmtest2.sources.SPOOL.spoolDir  = /home/s_sdldalplhdxxxedh/pnm-poll-results-uat

Out of the three spool dir, I am using only one spool directory i.e. pnmtest2.sources.SPOOL.spoolDir      = /home/a_nikhil.gopishetti/pnm. The other two are commented with the # symbol.

When coming to the deserializer file, below is the code for it;

public class WholeFileDeserializer implements EventDeserializer {

    private Context context = null;
    private ResettableInputStream stream = null;
    private volatile boolean isOpen;


    public WholeFileDeserializer(Context context, ResettableInputStream in) {
        this.context = context;
        this.stream = in;
        this.isOpen = true;

    }

    @Override
    public Event readEvent() throws IOException {
        ensureOpen();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        byte[] buffer = new byte[4096];

        int rc = 0;
        while (rc != -1) {
            rc = stream.read(buffer, 0, 4096);
            if (rc != -1) {
                out.write(buffer, 0, rc);
            }
        }


        if (out.size() > 0) {
            System.out.println("Read event " + out);
            return EventBuilder.withBody(out.toByteArray());
        } else {
            return null;
        }
    }

    @Override
    public List<Event> readEvents(int i) throws IOException {
        ensureOpen();

        if (i > 1) throw new RuntimeException("WholeFileDeserializer creates one event for the whole file");

        List<Event> events = new ArrayList<Event>(1);

        Event event = readEvent();
        if (event != null) {
            events.add(event);
        } else {
            return events;

        }
        return events;
    }

    @Override
    public void mark() throws IOException {
        ensureOpen();
        stream.mark();
    }

    @Override
    public void reset() throws IOException {
        ensureOpen();
        stream.reset();
    }

    @Override
    public void close() throws IOException {
        if (isOpen) {
            reset();
            stream.close();
            isOpen = false;
        }
    }

    private void ensureOpen() {
        if (!isOpen) {
            throw new IllegalStateException("Serializer has been closed");
        }
    }

    public static class Builder implements EventDeserializer.Builder {

        @Override
        public EventDeserializer build(Context context, ResettableInputStream in) {
            return new WholeFileDeserializer(context, in);
        }
    }

}

Your valuable time and suggestions are highly required and appreciated.

Thanks!




Regards,
Nik.

On Sun, Aug 30, 2015 at 2:17 PM, Hari Shreedharan <hshreedharan@cloudera.com> wrote:
Boxbe This message is eligible for Automatic Cleanup! (hshreedharan@cloudera.com) Add cleanup rule | More info

It looks like the deserializer you are using is throwing the exception causing the source to not work. Also spool dir  source can work with only one directory, so it is inlet reading files from one of the specified directories, not all three of them.

On Sunday, August 30, 2015, Nikhil Gs <gsnikhil1432010@gmail.com> wrote:
Hello Team,

Thank you in advance for your cooperation and time. 

Facing this issue several times. Tried to restart flume and build the jar. But files are not deleting from the flume spool directory and below is my error and flume config file.

Any suggestions would be great helpful. It is urgent because of production.


# Please paste flume.conf here. Example:

# Sources, channels, and sinks are defined per
# agent name, in this case 'pnmtest2'.
pnmtest2.sources  = SPOOL
pnmtest2.channels = MemChanneltest2
pnmtest2.sinks    = AVRO

# For each source, channel, and sink, set
# standard properties.
pnmtest2.sources.SPOOL.type          = spooldir
pnmtest2.sources.SPOOL.spoolDir      = /home/a_nikhil.gopishetti/pnm
#pnmtest2.sources.SPOOL.spoolDir      = /home/s_sdldalplhdxxxedh/pnmtest2-poll-results
#pnmtest2.sources.SPOOL.spoolDir  = /home/s_sdldalplhdxxxedh/pnm-poll-results-uat
pnmtest2.sources.SPOOL.ignorePattern = \.*tmp$
pnmtest2.sources.SPOOL.channels      = MemChanneltest2
pnmtest2.sources.SPOOL.fileHeader    = true
pnmtest2.sources.SPOOL.deletePolicy  = immediate
pnmtest2.sources.SPOOL.consumeOrder  = oldest
pnmtest2.sources.SPOOL.batchSize     = 100

pnmtest2.sources.SPOOL.interceptors = time
pnmtest2.sources.SPOOL.interceptors.time.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
pnmtest2.sources.SPOOL.deserializer  = com.suddenlink.flume.WholeFileDeserializer$Builder

pnmtest2.sinks.AVRO.type         = avro
pnmtest2.sinks.AVRO.channel      = MemChanneltest2
pnmtest2.sinks.AVRO.hostname = sdldalplhdw02.suddenlink.cequel3.com
pnmtest2.sinks.AVRO.port     = 40002
pnmtest2.sinks.AVRO.batch-size = 100
pnmtest2.sinks.AVRO.connect-timeout = 40000

#pnmtest2.channels.MemChanneltest1.capacity = 10000
#pnmtest2.channels.MemChanneltest1.type   = memory

pnmtest2.channels.MemChanneltest2.capacity = 1000000
pnmtest2.channels.MemChanneltest2.type   = memory


Flume Log Error


12:02:57.363 PMINFOorg.mortbay.log
Started SelectChannelConnector@0.0.0.0:41414
12:02:57.621 PMINFOorg.apache.flume.sink.AbstractRpcSink
Rpc sink AVRO started.
12:07:30.095 PMERRORorg.apache.flume.source.SpoolDirectorySource
FATAL: Spool Directory source SPOOL: { spoolDir: /home/a_nikhil.gopishetti/pnm/ }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.RuntimeException: WholeFileDeserializer creates one event for the whole file
	at com.suddenlink.flume.WholeFileDeserializer.readEvents(WholeFileDeserializer.java:59)
	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:252)
	at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
1:12:31.161 PMINFOorg.apache.flume.lifecycle.LifecycleSupervisor
Stopping lifecycle supervisor 11
1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Component type: CHANNEL, name: MemChanneltest2 stopped
1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.start.time == 1440954177242
1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.stop.time == 1440958351163
1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.capacity == 1000000
1:12:31.163 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.current.size == 0
1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.event.put.attempt == 0
1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.event.put.success == 0
1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.event.take.attempt == 523
1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: CHANNEL, name: MemChanneltest2. channel.event.take.success == 0
1:12:31.164 PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider
Configuration provider stopping
1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Component type: SOURCE, name: SPOOL stopped
1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SOURCE, name: SPOOL. source.start.time == 1440954177279
1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SOURCE, name: SPOOL. source.stop.time == 1440958351164
1:12:31.164 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SOURCE, name: SPOOL. src.append-batch.accepted == 0
1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SOURCE, name: SPOOL. src.append-batch.received == 0
1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SOURCE, name: SPOOL. src.append.accepted == 0
1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SOURCE, name: SPOOL. src.append.received == 0
1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SOURCE, name: SPOOL. src.events.accepted == 0
1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SOURCE, name: SPOOL. src.events.received == 0
1:12:31.165 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SOURCE, name: SPOOL. src.open-connection.count == 0
1:12:31.165 PMINFOorg.apache.flume.source.SpoolDirectorySource
SpoolDir source SPOOL stopped. Metrics: SOURCE:SPOOL{src.events.accepted=0, src.events.received=0, src.append.accepted=0, src.append-batch.accepted=0, src.open-connection.count=0, src.append-batch.received=0, src.append.received=0}
1:12:31.165 PMINFOorg.apache.flume.sink.AbstractRpcSink
Rpc sink AVRO stopping...
1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Component type: SINK, name: AVRO stopped
1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SINK, name: AVRO. sink.start.time == 1440954177243
1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SINK, name: AVRO. sink.stop.time == 1440958351170
1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SINK, name: AVRO. sink.batch.complete == 0
1:12:31.170 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SINK, name: AVRO. sink.batch.empty == 523
1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SINK, name: AVRO. sink.batch.underflow == 0
1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SINK, name: AVRO. sink.connection.closed.count == 1
1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SINK, name: AVRO. sink.connection.creation.count == 1
1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SINK, name: AVRO. sink.connection.failed.count == 0
1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SINK, name: AVRO. sink.event.drain.attempt == 0
1:12:31.171 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Shutdown Metric for type: SINK, name: AVRO. sink.event.drain.sucess == 0
1:12:31.171 PMINFOorg.apache.flume.sink.AbstractRpcSink
Rpc sink AVRO stopped. Metrics: SINK:AVRO{sink.connection.closed.count=1, sink.event.drain.attempt=0, sink.batch.underflow=0, sink.connection.failed.count=0, sink.connection.creation.count=1, sink.event.drain.sucess=0, sink.batch.empty=523, sink.batch.complete=0}
1:12:31.171 PMINFOorg.mortbay.log
Stopped SelectChannelConnector@0.0.0.0:41414
1:12:46.490 PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider
Configuration provider starting
1:12:46.513 PMINFOorg.apache.flume.node.PollingPropertiesFileConfigurationProvider
Reloading configuration file:/var/run/cloudera-scm-agent/process/5683-flume-AGENT/flume.conf
1:12:46.520 PMINFOorg.apache.flume.conf.FlumeConfiguration
Processing:AVRO
1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration
Processing:AVRO
1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration
Processing:AVRO
1:12:46.521 PMINFOorg.apache.flume.conf.FlumeConfiguration
Processing:AVRO
1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration
Added sinks: AVRO Agent: pnmtest2
1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration
Processing:AVRO
1:12:46.522 PMINFOorg.apache.flume.conf.FlumeConfiguration
Processing:AVRO
1:12:46.554 PMINFOorg.apache.flume.conf.FlumeConfiguration
Post-validation flume configuration contains configuration for agents: [pnmtest2]
1:12:46.554 PMINFOorg.apache.flume.node.AbstractConfigurationProvider
Creating channels
1:12:46.564 PMINFOorg.apache.flume.channel.DefaultChannelFactory
Creating instance of channel MemChanneltest2 type memory
1:12:46.569 PMINFOorg.apache.flume.node.AbstractConfigurationProvider
Created channel MemChanneltest2
1:12:46.570 PMINFOorg.apache.flume.source.DefaultSourceFactory
Creating instance of source SPOOL, type spooldir
1:12:46.597 PMINFOorg.apache.flume.sink.DefaultSinkFactory
Creating instance of sink: AVRO, type: avro
1:12:46.604 PMINFOorg.apache.flume.sink.AbstractRpcSink
Connection reset is set to 0. Will not reset connection to next hop
1:12:46.606 PMINFOorg.apache.flume.node.AbstractConfigurationProvider
Channel MemChanneltest2 connected to [SPOOL, AVRO]
1:12:46.615 PMINFOorg.apache.flume.node.Application
Starting new configuration:{ sourceRunners:{SPOOL=EventDrivenSourceRunner: { source:Spool Directory source SPOOL: { spoolDir: /home/a_nikhil.gopishetti/pnm } }} sinkRunners:{AVRO=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@452d9a56 counterGroup:{ name:null counters:{} } }} channels:{MemChanneltest2=org.apache.flume.channel.MemoryChannel{name: MemChanneltest2}} }
1:12:46.615 PMINFOorg.apache.flume.node.Application
Starting Channel MemChanneltest2
1:12:46.674 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Monitored counter group for type: CHANNEL, name: MemChanneltest2: Successfully registered new MBean.
1:12:46.675 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Component type: CHANNEL, name: MemChanneltest2 started
1:12:46.675 PMINFOorg.apache.flume.node.Application
Starting Sink AVRO
1:12:46.675 PMINFOorg.apache.flume.sink.AbstractRpcSink
Starting RpcSink AVRO { host: sdldalplhdw02.suddenlink.cequel3.com, port: 40002 }...
1:12:46.675 PMINFOorg.apache.flume.node.Application
Starting Source SPOOL
1:12:46.676 PMINFOorg.apache.flume.source.SpoolDirectorySource
SpoolDirectorySource source starting with directory: /home/a_nikhil.gopishetti/pnm
1:12:46.676 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Monitored counter group for type: SINK, name: AVRO: Successfully registered new MBean.
1:12:46.676 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Component type: SINK, name: AVRO started
1:12:46.676 PMINFOorg.apache.flume.sink.AbstractRpcSink
Rpc sink AVRO: Building RpcClient with hostname: sdldalplhdw02.suddenlink.cequel3.com, port: 40002
1:12:46.676 PMINFOorg.apache.flume.sink.AvroSink
Attempting to create Avro Rpc client.
1:12:46.699 PMWARNorg.apache.flume.api.NettyAvroRpcClient
Using default maxIOWorkers
1:12:46.706 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Monitored counter group for type: SOURCE, name: SPOOL: Successfully registered new MBean.
1:12:46.706 PMINFOorg.apache.flume.instrumentation.MonitoredCounterGroup
Component type: SOURCE, name: SPOOL started
1:12:46.718 PMINFOorg.mortbay.log
Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
1:12:46.783 PMINFOorg.mortbay.log
jetty-6.1.26.cloudera.4
1:12:46.829 PMINFOorg.mortbay.log
Started SelectChannelConnector@0.0.0.0:41414
1:12:47.068 PMINFOorg.apache.flume.sink.AbstractRpcSink
Rpc sink AVRO started.


Regards,
Nik.


--

Thanks,
Hari