flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hari Shreedharan <hshreedha...@cloudera.com>
Subject Re: Flume Source and Sink in different hosts
Date Fri, 05 Oct 2012 18:40:14 GMT
Ah, it seems like this is because your file is growing not "too fast." The exec source does
do some "batching" by waiting for around 20 lines to come in before writing it out to the
channel. This is important to not hit performance of channels like File Channel. Can you add
this to your source config:   
batchSize = 1
If you set batch size to 1, I would not recommend using File Channel - because there will
be far too many IO ops to give good performance. You should use Memory Channel - of course,
the data will not survive a program or system crash. If you want to use File Channel, I'd
suggest with batchSize of 100 or so.


Thanks,
Hari

--  
Hari Shreedharan


On Friday, October 5, 2012 at 11:27 AM, Kumar, Suresh wrote:

> Just a quick update, it is definitely a source issue and nothing to do with flume configuration
in the sink.
>   
> I restarted the sink, I do not see the data in HBase, however if I stop the agent in
source, I do not see
> any data, but as soon as I start the agent in source, I see the data in my HBase which
is in HostB.
>   
> Thanks for any help,
> Suresh
>   
>   
> From: Kumar, Suresh [mailto:Suresh.Kumar4@emc.com]  
> Sent: Friday, October 05, 2012 11:08 AM
> To: user@flume.apache.org (mailto:user@flume.apache.org)
> Subject: RE: Flume Source and Sink in different hosts
>   
> I increased the heap size in source and sink to 1G, I now use the AsyncHBaseSink in my
sink agent configuration, it didn’t make
> that much of a difference.
>   
> I changed my source agent configuration from memory to file in HostA, I did not change
my sink agent configuration in HostB
> (it is still set to Memory Channel). I still see the latency issue (BTW, the auth.log
grows every second). However I noticed
> that if I kill the agent in HostA (source) and restart, I see entries in HBase. Am I
missing something? How often does the data
> get flushed from source to sink? Should sink also be the same channel type (file)?
>   
> Here is my conf and log for HostA (source)
>   
> flume.conf (source)
>   
> agent3.sources = tail
> agent3.channels = FileChannel-1
> agent3.sinks = avro-sink
>   
> # Define source flow
> agent3.sources.tail.type = exec
> agent3.sources.tail.command = tail -F /var/log/auth.log
> agent3.sources.tail.channels = FileChannel-1
>   
> # What kind of channel
> agent3.channels.FileChannel-1.type = file
> agent3.channels.FileChannel-1.checkpointDir = /tmp/checkpoint
> agent3.channels.FileChannel-1.dataDirs = /tmp/data
>   
> # avro sink properties
> agent3.sinks.avro-sink.type = avro
> agent3.sinks.avro-sink.channel = FileChannel-1
> agent3.sinks.avro-sink.hostname = sig-flume
> agent3.sinks.avro-sink.port = 41414
>   
>   
> Log (source)
>   
>   
> 2012-10-05 10:49:03,736 (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)]
Flume node starting - agent3
> 2012-10-05 10:49:03,752 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:203)]
Node manager starting
> 2012-10-05 10:49:03,752 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:67)]
Configuration provider starting
> 2012-10-05 10:49:03,760 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 12
> 2012-10-05 10:49:03,763 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:207)]
Node manager started
> 2012-10-05 10:49:03,767 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:86)]
Configuration provider started
> 2012-10-05 10:49:03,769 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf for changes
> 2012-10-05 10:49:03,772 (conf-file-poller-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:195)]
Reloading configuration file:conf/flume.conf
> 2012-10-05 10:49:03,801 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:912)]
Added sinks: avro-sink Agent: agent3
> 2012-10-05 10:49:03,802 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink
> 2012-10-05 10:49:03,803 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1002)]
Created context for avro-sink: hostname
> 2012-10-05 10:49:03,803 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink
> 2012-10-05 10:49:03,803 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink
> 2012-10-05 10:49:03,804 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink
> 2012-10-05 10:49:03,805 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:295)]
Starting validation of configuration for agent: agent3, initial-configuration: AgentConfiguration[agent3]
> SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=FileChannel-1,
type=exec} }}
> CHANNELS: {FileChannel-1={ parameters:{checkpointDir=/tmp/checkpoint, dataDirs=/tmp/data,
type=file} }}
> SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=FileChannel-1}
}}
>   
> 2012-10-05 10:49:03,823 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:450)]
Created channel FileChannel-1
> 2012-10-05 10:49:03,850 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:655)]
Creating sink: avro-sink using AVRO
> 2012-10-05 10:49:03,860 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:353)]
Post validation configuration for agent3
> AgentConfiguration created without Configuration stubs for which only basic syntactical
validation was performed[agent3]
> SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=FileChannel-1,
type=exec} }}
> CHANNELS: {FileChannel-1={ parameters:{checkpointDir=/tmp/checkpoint, dataDirs=/tmp/data,
type=file} }}
> SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=FileChannel-1}
}}
>   
> 2012-10-05 10:49:03,860 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:117)]
Channels:FileChannel-1
>   
> 2012-10-05 10:49:03,861 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:118)]
Sinks avro-sink
>   
> 2012-10-05 10:49:03,863 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:119)]
Sources tail
>   
> 2012-10-05 10:49:03,864 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:122)]
Post-validation flume configuration contains configuration  for agents: [agent3]
> 2012-10-05 10:49:03,865 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:249)]
Creating channels
> 2012-10-05 10:49:03,866 (conf-file-poller-0) [DEBUG - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:68)]
Creating instance of channel FileChannel-1 type file
> 2012-10-05 10:49:03,899 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: CHANNEL, name: FileChannel-1, registered successfully.
> 2012-10-05 10:49:03,900 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:273)]
created channel FileChannel-1
> 2012-10-05 10:49:03,900 (conf-file-poller-0) [DEBUG - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:74)]
Creating instance of source tail, type exec
> 2012-10-05 10:49:03,934 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:70)]
Creating instance of sink: avro-sink, type: avro
> 2012-10-05 10:49:03,945 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: SINK, name: avro-sink, registered successfully.
> 2012-10-05 10:49:03,947 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:106)]
Starting new configuration:{ sourceRunners:{tail=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:tail,state:IDLE}
}} sinkRunners:{avro-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@68814013
counterGroup:{ name:null counters:{} } }} channels:{FileChannel-1=FileChannel FileChannel-1
{ dataDirs: [/tmp/data] }} }
> 2012-10-05 10:49:03,947 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:113)]
Starting Channel FileChannel-1
> 2012-10-05 10:49:03,951 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:128)]
Waiting for channel: FileChannel-1 to start. Sleeping for 500 ms
> 2012-10-05 10:49:03,952 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:241)]
Starting FileChannel FileChannel-1 { dataDirs: [/tmp/data] }...
> 2012-10-05 10:49:03,993 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.<init>(Log.java:248)]
Encryption is not enabled
> 2012-10-05 10:49:04,000 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:292)]
Replay started
> 2012-10-05 10:49:04,023 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:304)]
Found NextFileID 1, from [/tmp/data/log-1]
> 2012-10-05 10:49:04,039 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>(EventQueueBackingStoreFileV3.java:46)]
Starting up with /tmp/checkpoint/checkpoint and /tmp/checkpoint/checkpoint.meta
> 2012-10-05 10:49:04,039 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>(EventQueueBackingStoreFileV3.java:50)]
Reading checkpoint metadata from /tmp/checkpoint/checkpoint.meta
> 2012-10-05 10:49:04,100 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:336)]
Last Checkpoint Fri Oct 05 10:43:53 PDT 2012, queue depth = 0
> 2012-10-05 10:49:04,107 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:355)]
Replaying logs with v2 replay logic
> 2012-10-05 10:49:04,110 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:223)]
Starting replay of [/tmp/data/log-1]
> 2012-10-05 10:49:04,113 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue$InflightEventWrapper.deserialize(FlumeEventQueue.java:502)]
Reached end of inflights buffer. Long buffer position =2
> 2012-10-05 10:49:04,117 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue$InflightEventWrapper.deserialize(FlumeEventQueue.java:502)]
Reached end of inflights buffer. Long buffer position =2
> 2012-10-05 10:49:04,118 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:236)]
Replaying /tmp/data/log-1
> 2012-10-05 10:49:04,133 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.tools.DirectMemoryUtils.getDefaultDirectMemorySize(DirectMemoryUtils.java:113)]
Unable to get maxDirectMemory from VM: NoSuchMethodException: sun.misc.VM.maxDirectMemory(null)
> 2012-10-05 10:49:04,139 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.tools.DirectMemoryUtils.allocate(DirectMemoryUtils.java:47)]
Direct Memory Allocation:  Allocation = 1048576, Allocated = 0, MaxDirectMemorySize = 954466304,
Remaining = 954466304
> 2012-10-05 10:49:04,181 (lifecycleSupervisor-1-0) [WARN - org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition(LogFile.java:431)]
Checkpoint for file(/tmp/data/log-1) is: 1349459033373, which is beyond the requested checkpoint
time: 1349459033373 and position 0
> 2012-10-05 10:49:04,218 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$SequentialReader.next(LogFile.java:452)]
Encountered EOF at 2423 in /tmp/data/log-1
> 2012-10-05 10:49:04,219 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:320)]
read: 17, put: 16, take: 0, rollback: 0, commit: 1, skip: 0, eventCount:16
> 2012-10-05 10:49:04,222 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:362)]
Rolling /tmp/data
> 2012-10-05 10:49:04,223 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.roll(Log.java:726)]
Roll start /tmp/data
> 2012-10-05 10:49:04,229 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$Writer.<init>(LogFile.java:138)]
Opened /tmp/data/log-2
> 2012-10-05 10:49:04,253 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.roll(Log.java:741)]
Roll end
> 2012-10-05 10:49:04,269 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:108)]
Start checkpoint for /tmp/checkpoint/checkpoint, elements to sync = 16
> 2012-10-05 10:49:04,306 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:116)]
Updating checkpoint metadata: logWriteOrderID: 1349459344074, queueSize: 16, queueHead: 999998
> 2012-10-05 10:49:04,408 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
Updating log-2.meta currentPosition = 0, logWriteOrderID = 1349459344074
> 2012-10-05 10:49:04,427 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:796)]
Updated checkpoint for file: /tmp/data/log-2 position: 0 logWriteOrderID: 1349459344074
> 2012-10-05 10:49:04,428 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$RandomReader.close(LogFile.java:317)]
Closing RandomReader /tmp/data/log-1
> 2012-10-05 10:49:04,434 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
Updating log-1.meta currentPosition = 0, logWriteOrderID = 1349459344074
> 2012-10-05 10:49:04,451 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:816)]
Updated checkpoint for file: /tmp/data/log-1logWriteOrderID 1349459344074
> 2012-10-05 10:49:04,451 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:264)]
Queue Size after replay: 16 [channel=FileChannel-1]
> 2012-10-05 10:49:04,452 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: CHANNEL, name: FileChannel-1 started
> 2012-10-05 10:49:04,453 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:141)]
Starting Sink avro-sink
> 2012-10-05 10:49:04,457 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:152)]
Starting Source tail
> 2012-10-05 10:49:04,459 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:239)]
Starting AvroSink avro-sink { host: sig-flume, port: 41414 }...
> 2012-10-05 10:49:04,460 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: SINK, name: avro-sink started
> 2012-10-05 10:49:04,462 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)]
Exec source starting with command:tail -F /var/log/auth.log
> 2012-10-05 10:49:04,462 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
Avro sink avro-sink: Building RpcClient with hostname: sig-flume, port: 41414
> 2012-10-05 10:49:04,477 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.source.ExecSource.start(ExecSource.java:173)]
Exec source started
> 2012-10-05 10:49:04,524 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:420)]
Batch size string = null
> 2012-10-05 10:49:05,341 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:192)]
Avro sink avro-sink: Created RpcClient: NettyAvroRpcClient { host: sig-flume, port: 41414
}
> 2012-10-05 10:49:05,342 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:253)]
Avro sink avro-sink started.
> 2012-10-05 10:49:05,345 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)]
Polling sink runner starting
> 2012-10-05 10:49:05,363 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.channel.file.LogFile$Writer.preallocate(LogFile.java:253)]
Preallocating at position 0
> 2012-10-05 10:49:07,007 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:108)]
Start checkpoint for /tmp/checkpoint/checkpoint, elements to sync = 16
> 2012-10-05 10:49:07,047 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:116)]
Updating checkpoint metadata: logWriteOrderID: 1349459344092, queueSize: 0, queueHead: 14
> 2012-10-05 10:49:07,101 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
Updating log-2.meta currentPosition = 692, logWriteOrderID = 1349459344092
> 2012-10-05 10:49:07,122 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:796)]
Updated checkpoint for file: /tmp/data/log-2 position: 692 logWriteOrderID: 1349459344092
> 2012-10-05 10:49:07,122 (Log-BackgroundWorker-FileChannel-1) [DEBUG - org.apache.flume.channel.file.Log.removeOldLogs(Log.java:846)]
Files currently in use: [2]
> 2012-10-05 10:49:34,465 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf for changes
> 2012-10-05 10:49:37,124 (Log-BackgroundWorker-FileChannel-1) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:99)]
Checkpoint not required
>   
> From: Hari Shreedharan [mailto:hshreedharan@cloudera.com]  
> Sent: Thursday, October 04, 2012 4:25 PM
> To: user@flume.apache.org (mailto:user@flume.apache.org)
> Subject: Re: Flume Source and Sink in different hosts
>   
> It depends on what kind of guarantees you need. If you need to make sure your events
are persisted even during process/system failures, you should use the File Channel, else you
can use Memory Channel (performance of Memory Channel is obviously better).   
>  
>   
>  
> Thanks,
>  
> Hari
>  
>   
>  
> --  
>  
> Hari Shreedharan
>  
>   
>  
>  
> On Thursday, October 4, 2012 at 3:46 PM, Kumar, Suresh wrote:
> >  
> >   
> >  
> >  
> > Hari, I just noticed some entries in HBase, so this configuration does work.
> >  
> >  
> > I will retry with the changes you recommended. Do you think I should be using
> >  
> >  
> > some other channel type instead of memory?
> >  
> >  
> >   
> >  
> >  
> > Thanks,
> >  
> >  
> > Suresh
> >  
> >  
> > From: Hari Shreedharan [mailto:hshreedharan@cloudera.com]  
> > Sent: Thursday, October 04, 2012 3:40 PM
> > To: user@flume.apache.org (mailto:user@flume.apache.org)
> > Subject: Re: Flume Source and Sink in different hosts
> >  
> >  
> >  
> >   
> >  
> >  
> > Looks like your agent was set up properly. Can you increase the heap and try again?
You can do this by setting -Xmx in the flume-env.sh (http://flume-env.sh) file. Try setting
it to 1G or higher, since you are using memory channel. Also I assume the file you are tailing
is getting written to? I strongly suggest using the AsyncHBaseSink.   
> >  
> >  
> >  
> >   
> >  
> >  
> >  
> >   
> >  
> >  
> >  
> > Thanks,
> >  
> >  
> >  
> > Hari
> >  
> >  
> >  
> >   
> >  
> >  
> >  
> > --  
> >  
> >  
> >  
> > Hari Shreedharan
> >  
> >  
> >  
> >   
> >  
> >  
> >  
> > On Thursday, October 4, 2012 at 3:19 PM, Kumar, Suresh wrote:
> > >  
> > > Yes, my HBase has the table and column family, if I run the /etc/passwd test
using flume-ng client, the table
> > >  
> > >  
> > > gets populated.
> > >  
> > >  
> > >   
> > >  
> > >  
> > > Here is the log from the source agent, there is nothing much in the sink except
for which seem to benign.
> > >  
> > >  
> > >  
> > > Thanks,
> > >  
> > >  
> > > Suresh
> > >  
> > >  
> > >   
> > >  
> > >  
> > > 2012-10-04 14:59:05,622 (lifecycleSupervisor-1-0-SendThread(localhost:2181))
[DEBUG - org.apache.zookeeper.client.ZooKeeperSaslClient.clientTunneledAuthenticationInProgress(ZooKeeperSaslClient.java:515)]
Could not retrieve login configuration: java.lang.SecurityException: Unable to locate a login
configuration
> > >  
> > >  
> > > 2012-10-04 14:59:08,414 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf (file:///\\conf\flume.conf) for changes
> > >  
> > >  
> > >   
> > >  
> > >  
> > > source agent log:
> > >  
> > >  
> > >   
> > >  
> > >  
> > >   
> > >  
> > >  
> > > $ bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console
-n agent3
> > >  
> > >  
> > >   
> > >  
> > >  
> > > + exec /usr/lib/jvm/java-6-sun/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console
-cp '/opt/flume/conf:/opt/flume/lib/*' -Djava.library.path= org.apache.flume.node.Application
-f conf/flume.conf -n agent3
> > >  
> > >  
> > > 2012-10-04 15:09:30,778 (main) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 1
> > >  
> > >  
> > > 2012-10-04 15:09:30,791 (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)]
Flume node starting - agent3
> > >  
> > >  
> > > 2012-10-04 15:09:30,799 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:203)]
Node manager starting
> > >  
> > >  
> > > 2012-10-04 15:09:30,801 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:67)]
Configuration provider starting
> > >  
> > >  
> > > 2012-10-04 15:09:30,810 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 10
> > >  
> > >  
> > > 2012-10-04 15:09:30,813 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:207)]
Node manager started
> > >  
> > >  
> > > 2012-10-04 15:09:30,819 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:86)]
Configuration provider started
> > >  
> > >  
> > > 2012-10-04 15:09:30,819 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf (file:///\\conf\flume.conf) for changes
> > >  
> > >  
> > > 2012-10-04 15:09:30,821 (conf-file-poller-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:195)]
Reloading configuration file:conf/flume.conf (file:///\\conf\flume.conf)
> > >  
> > >  
> > > 2012-10-04 15:09:30,839 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:912)]
Added sinks: avro-sink Agent: agent3
> > >  
> > >  
> > > 2012-10-04 15:09:30,840 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink
> > >  
> > >  
> > > 2012-10-04 15:09:30,840 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1002)]
Created context for avro-sink: hostname
> > >  
> > >  
> > > 2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink
> > >  
> > >  
> > > 2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink
> > >  
> > >  
> > > 2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink
> > >  
> > >  
> > > 2012-10-04 15:09:30,841 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:295)]
Starting validation of configuration for agent: agent3, initial-configuration: AgentConfiguration[agent3]
> > >  
> > >  
> > > SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=MemoryChannel-1,
type=exec} }}
> > >  
> > >  
> > > CHANNELS: {MemoryChannel-1={ parameters:{type=memory} }}
> > >  
> > >  
> > > SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro,
channel=MemoryChannel-1} }}
> > >  
> > >  
> > >   
> > >  
> > >  
> > > 2012-10-04 15:09:30,854 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:450)]
Created channel MemoryChannel-1
> > >  
> > >  
> > > 2012-10-04 15:09:30,883 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:655)]
Creating sink: avro-sink using AVRO
> > >  
> > >  
> > > 2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:353)]
Post validation configuration for agent3
> > >  
> > >  
> > > AgentConfiguration created without Configuration stubs for which only basic
syntactical validation was performed[agent3]
> > >  
> > >  
> > > SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=MemoryChannel-1,
type=exec} }}
> > >  
> > >  
> > > CHANNELS: {MemoryChannel-1={ parameters:{type=memory} }}
> > >  
> > >  
> > > SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro,
channel=MemoryChannel-1} }}
> > >  
> > >  
> > >   
> > >  
> > >  
> > > 2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:117)]
Channels:MemoryChannel-1
> > >  
> > >  
> > >   
> > >  
> > >  
> > > 2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:118)]
Sinks avro-sink
> > >  
> > >  
> > >   
> > >  
> > >  
> > > 2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:119)]
Sources tail
> > >  
> > >  
> > >   
> > >  
> > >  
> > > 2012-10-04 15:09:30,885 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:122)]
Post-validation flume configuration contains configuration  for agents: [agent3]
> > >  
> > >  
> > > 2012-10-04 15:09:30,886 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:249)]
Creating channels
> > >  
> > >  
> > > 2012-10-04 15:09:30,886 (conf-file-poller-0) [DEBUG - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:68)]
Creating instance of channel MemoryChannel-1 type memory
> > >  
> > >  
> > > 2012-10-04 15:09:31,013 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: CHANNEL, name: MemoryChannel-1, registered successfully.
> > >  
> > >  
> > > 2012-10-04 15:09:31,013 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:273)]
created channel MemoryChannel-1
> > >  
> > >  
> > > 2012-10-04 15:09:31,014 (conf-file-poller-0) [DEBUG - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:74)]
Creating instance of source tail, type exec
> > >  
> > >  
> > > 2012-10-04 15:09:31,037 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:70)]
Creating instance of sink: avro-sink, type: avro
> > >  
> > >  
> > > 2012-10-04 15:09:31,045 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: SINK, name: avro-sink, registered successfully.
> > >  
> > >  
> > > 2012-10-04 15:09:31,046 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:106)]
Starting new configuration:{ sourceRunners:{tail=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:tail,state:IDLE}
}} sinkRunners:{avro-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@e949f69
counterGroup:{ name:null counters:{} } }} channels:{MemoryChannel-1=org.apache.flume.channel.MemoryChannel{name:
MemoryChannel-1}} }
> > >  
> > >  
> > > 2012-10-04 15:09:31,046 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:113)]
Starting Channel MemoryChannel-1
> > >  
> > >  
> > > 2012-10-04 15:09:31,049 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:128)]
Waiting for channel: MemoryChannel-1 to start. Sleeping for 500 ms
> > >  
> > >  
> > > 2012-10-04 15:09:31,052 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: CHANNEL, name: MemoryChannel-1 started
> > >  
> > >  
> > > 2012-10-04 15:09:31,550 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:141)]
Starting Sink avro-sink
> > >  
> > >  
> > > 2012-10-04 15:09:31,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:239)]
Starting AvroSink avro-sink { host: sig-flume, port: 41414 }...
> > >  
> > >  
> > > 2012-10-04 15:09:31,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: SINK, name: avro-sink started
> > >  
> > >  
> > > 2012-10-04 15:09:31,552 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:152)]
Starting Source tail
> > >  
> > >  
> > > 2012-10-04 15:09:31,554 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
Avro sink avro-sink: Building RpcClient with hostname: sig-flume, port: 41414
> > >  
> > >  
> > > 2012-10-04 15:09:31,561 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)]
Exec source starting with command:tail -F /var/log/auth.log
> > >  
> > >  
> > > 2012-10-04 15:09:31,586 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.source.ExecSource.start(ExecSource.java:173)]
Exec source started
> > >  
> > >  
> > > 2012-10-04 15:09:31,626 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:420)]
Batch size string = null
> > >  
> > >  
> > > 2012-10-04 15:09:32,684 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:192)]
Avro sink avro-sink: Created RpcClient: NettyAvroRpcClient { host: sig-flume, port: 41414
}
> > >  
> > >  
> > > 2012-10-04 15:09:32,685 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:253)]
Avro sink avro-sink started.
> > >  
> > >  
> > > 2012-10-04 15:09:32,689 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG
- org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner
starting
> > >  
> > >  
> > > 2012-10-04 15:10:01,565 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf (file:///\\conf\flume.conf) for changes
> > >  
> > >  
> > > 2012-10-04 15:10:31,567 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(Abstr
> > >  
> > >  
> > >   
> > >  
> > >  
> > >   
> > >  
> > >  
> > > From: Hari Shreedharan [mailto:hshreedharan@cloudera.com]  
> > > Sent: Thursday, October 04, 2012 3:02 PM
> > > To: user@flume.apache.org (mailto:user@flume.apache.org)
> > > Subject: Re: Flume Source and Sink in different hosts
> > >  
> > >  
> > >  
> > >   
> > >  
> > >  
> > > Can you send the logs also, of both agents? Does your Hbase cluster have the
said column family and table with that family?  
> > >  
> > >  
> > >  
> > >   
> > >  
> > >  
> > >  
> > > Also are you sure the files are not getting rotated out. You should use tail
-F so that your code works even with files getting rotated out.
> > >  
> > >  
> > >  
> > >   
> > >  
> > >  
> > >  
> > >   
> > >  
> > >  
> > >  
> > > Hari
> > >  
> > >  
> > >  
> > > --  
> > >  
> > >  
> > >  
> > > Hari Shreedharan
> > >  
> > >  
> > >  
> > >   
> > >  
> > >  
> > >  
> > > On Thursday, October 4, 2012 at 2:53 PM, Kumar, Suresh wrote:
> > > >  
> > > > Hello:
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > I have just downloaded and build flume-ng (apache-flume-1.3.0-SNAPSHOT).
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > My goal is to collect log data from HostA (source) and send it to HostB(sink),
my initial test (sending /etc/passwd)  
> > > >  
> > > >  
> > > > from HostA to HostB worked fine, I was also able to load the passwd file
into my HBase in HostB.
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > Now, I want to load a continuous stream of log data (using tail –f),
but I was not able to replicate the above process.
> > > >  
> > > >  
> > > > Flume just started fine in HostA, but I do not see any data being received
by HostB or in my HBase.
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > What is wrong with my configuration?
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > Thanks,
> > > >  
> > > >  
> > > > Suresh
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > Here is my flume.conf in HostA
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > agent3.sources = tail
> > > >  
> > > >  
> > > > agent3.channels = MemoryChannel-1
> > > >  
> > > >  
> > > > agent3.sinks = avro-sink
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > # Define source flow
> > > >  
> > > >  
> > > > agent3.sources.tail.type = exec
> > > >  
> > > >  
> > > > agent3.sources.tail.command = tail -f /var/log/auth.log
> > > >  
> > > >  
> > > > agent3.sources.tail.channels = MemoryChannel-1
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > # What kind of channel
> > > >  
> > > >  
> > > > agent3.channels.MemoryChannel-1.type = memory
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > # avro sink properties
> > > >  
> > > >  
> > > > agent3.sinks.avro-sink.type = avro
> > > >  
> > > >  
> > > > agent3.sinks.avro-sink.channel = MemoryChannel-1
> > > >  
> > > >  
> > > > agent3.sinks.avro-sink.hostname = hostb
> > > >  
> > > >  
> > > > agent3.sinks.avro-sink.port = 41414
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > Here is my flume.conf in HostB
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > # Define a memory channel called ch1 on agent1
> > > >  
> > > >  
> > > > agent1.channels.ch1.type = memory
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > # Define an Avro source called avro-source1 on agent1 and tell it
> > > >  
> > > >  
> > > > # to bind to 0.0.0.0:41414. Connect it to channel ch1.
> > > >  
> > > >  
> > > > agent1.sources.avro-source1.channels = ch1
> > > >  
> > > >  
> > > > agent1.sources.avro-source1.type = avro
> > > >  
> > > >  
> > > > agent1.sources.avro-source1.bind = 0.0.0.0
> > > >  
> > > >  
> > > > agent1.sources.avro-source1.port = 41414
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > # Define a logger sink that simply logs all events it receives
> > > >  
> > > >  
> > > > # and connect it to the other end of the same channel.
> > > >  
> > > >  
> > > > agent1.sinks.log-sink1.channel = ch1
> > > >  
> > > >  
> > > > agent1.sinks.log-sink1.type = logger
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > # Finally, now that we've defined all of our components, tell
> > > >  
> > > >  
> > > > # agent1 which ones we want to activate.
> > > >  
> > > >  
> > > > agent1.channels = ch1
> > > >  
> > > >  
> > > > agent1.sources = avro-source1
> > > >  
> > > >  
> > > > #agent1.sources = avro-source1
> > > >  
> > > >  
> > > > agent1.sinks = sink1
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > > agent1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
> > > >  
> > > >  
> > > > agent1.sinks.sink1.channel = ch1
> > > >  
> > > >  
> > > > agent1.sinks.sink1.table = flumedemo
> > > >  
> > > >  
> > > > agent1.sinks.sink1.columnFamily = testing
> > > >  
> > > >  
> > > > agent1.sinks.sink1.column = foo
> > > >  
> > > >  
> > > > agent1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
> > > >  
> > > >  
> > > > agent1.sinks.sink1.serializer.payloadColumn = col1
> > > >  
> > > >  
> > > > agent1.sinks.sink1.serializer.keyType = timestamp
> > > >  
> > > >  
> > > > agent1.sinks.sink1.serializer.rowPrefix = 1
> > > >  
> > > >  
> > > > agent1.sinks.sink1.serializer.suffix = timestamp
> > > >  
> > > >  
> > > > agent1.sinks.sink1.serializer.payloadColumn = pcol
> > > >  
> > > >  
> > > > agent1.sinks.sink1.serializer.incrementColumn = icol
> > > >  
> > > >  
> > > >   
> > > >  
> > > >  
> > > >  
> > > >  
> > >  
> > >  
> > >   
> > >  
> > >  
> > >  
> > >  
> > >  
> >  
> >  
> >   
> >  
> >  
> >  
> >  
> >  
>  
>   
>  
>  
>  
>  



Mime
View raw message