flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kumar, Suresh" <Suresh.Kum...@emc.com>
Subject RE: Flume Source and Sink in different hosts
Date Fri, 05 Oct 2012 21:55:02 GMT
 

Yes, that was it, I moved the sink to a different server where auth.log had more traffic,
I see the my data being pushed to HBase.

 

Thanks for your help.

Suresh

From: Hari Shreedharan [mailto:hshreedharan@cloudera.com] 
Sent: Friday, October 05, 2012 11:40 AM
To: user@flume.apache.org
Subject: Re: Flume Source and Sink in different hosts

 

Ah, it seems like this is because your file is growing not "too fast." The exec source does
do some "batching" by waiting for around 20 lines to come in before writing it out to the
channel. This is important to not hit performance of channels like File Channel. Can you add
this to your source config:  

batchSize = 1

If you set batch size to 1, I would not recommend using File Channel - because there will
be far too many IO ops to give good performance. You should use Memory Channel - of course,
the data will not survive a program or system crash. If you want to use File Channel, I'd
suggest with batchSize of 100 or so.

 

 

Thanks,

Hari

 

-- 

Hari Shreedharan

 

On Friday, October 5, 2012 at 11:27 AM, Kumar, Suresh wrote:

	Just a quick update, it is definitely a source issue and nothing to do with flume configuration
in the sink.

	 

	I restarted the sink, I do not see the data in HBase, however if I stop the agent in source,
I do not see

	any data, but as soon as I start the agent in source, I see the data in my HBase which is
in HostB.

	 

	Thanks for any help,

	Suresh

	 

	 

	From: Kumar, Suresh [mailto:Suresh.Kumar4@emc.com] 
	Sent: Friday, October 05, 2012 11:08 AM
	To: user@flume.apache.org
	Subject: RE: Flume Source and Sink in different hosts

	 

	I increased the heap size in source and sink to 1G, I now use the AsyncHBaseSink in my sink
agent configuration, it didn’t make

	that much of a difference.

	 

	I changed my source agent configuration from memory to file in HostA, I did not change my
sink agent configuration in HostB

	(it is still set to Memory Channel). I still see the latency issue (BTW, the auth.log grows
every second). However I noticed

	that if I kill the agent in HostA (source) and restart, I see entries in HBase. Am I missing
something? How often does the data

	get flushed from source to sink? Should sink also be the same channel type (file)?

	 

	Here is my conf and log for HostA (source)

	 

	flume.conf (source)

	 

	agent3.sources = tail

	agent3.channels = FileChannel-1

	agent3.sinks = avro-sink

	 

	# Define source flow

	agent3.sources.tail.type = exec

	agent3.sources.tail.command = tail -F /var/log/auth.log

	agent3.sources.tail.channels = FileChannel-1

	 

	# What kind of channel

	agent3.channels.FileChannel-1.type = file

	agent3.channels.FileChannel-1.checkpointDir = /tmp/checkpoint

	agent3.channels.FileChannel-1.dataDirs = /tmp/data

	 

	# avro sink properties

	agent3.sinks.avro-sink.type = avro

	agent3.sinks.avro-sink.channel = FileChannel-1

	agent3.sinks.avro-sink.hostname = sig-flume

	agent3.sinks.avro-sink.port = 41414

	 

	 

	Log (source)

	 

	 

	2012-10-05 10:49:03,736 (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)]
Flume node starting - agent3

	2012-10-05 10:49:03,752 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:203)]
Node manager starting

	2012-10-05 10:49:03,752 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:67)]
Configuration provider starting

	2012-10-05 10:49:03,760 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 12

	2012-10-05 10:49:03,763 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:207)]
Node manager started

	2012-10-05 10:49:03,767 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:86)]
Configuration provider started

	2012-10-05 10:49:03,769 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf <file:///\\conf\flume.conf>  for changes

	2012-10-05 10:49:03,772 (conf-file-poller-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:195)]
Reloading configuration file:conf/flume.conf <file:///\\conf\flume.conf> 

	2012-10-05 10:49:03,801 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:912)]
Added sinks: avro-sink Agent: agent3

	2012-10-05 10:49:03,802 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

	2012-10-05 10:49:03,803 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1002)]
Created context for avro-sink: hostname

	2012-10-05 10:49:03,803 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

	2012-10-05 10:49:03,803 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

	2012-10-05 10:49:03,804 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

	2012-10-05 10:49:03,805 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:295)]
Starting validation of configuration for agent: agent3, initial-configuration: AgentConfiguration[agent3]

	SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=FileChannel-1, type=exec}
}}

	CHANNELS: {FileChannel-1={ parameters:{checkpointDir=/tmp/checkpoint, dataDirs=/tmp/data,
type=file} }}

	SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=FileChannel-1}
}}

	 

	2012-10-05 10:49:03,823 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:450)]
Created channel FileChannel-1

	2012-10-05 10:49:03,850 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:655)]
Creating sink: avro-sink using AVRO

	2012-10-05 10:49:03,860 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:353)]
Post validation configuration for agent3

	AgentConfiguration created without Configuration stubs for which only basic syntactical validation
was performed[agent3]

	SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=FileChannel-1, type=exec}
}}

	CHANNELS: {FileChannel-1={ parameters:{checkpointDir=/tmp/checkpoint, dataDirs=/tmp/data,
type=file} }}

	SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=FileChannel-1}
}}

	 

	2012-10-05 10:49:03,860 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:117)]
Channels:FileChannel-1

	 

	2012-10-05 10:49:03,861 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:118)]
Sinks avro-sink

	 

	2012-10-05 10:49:03,863 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:119)]
Sources tail

	 

	2012-10-05 10:49:03,864 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:122)]
Post-validation flume configuration contains configuration  for agents: [agent3]

	2012-10-05 10:49:03,865 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:249)]
Creating channels

	2012-10-05 10:49:03,866 (conf-file-poller-0) [DEBUG - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:68)]
Creating instance of channel FileChannel-1 type file

	2012-10-05 10:49:03,899 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: CHANNEL, name: FileChannel-1, registered successfully.

	2012-10-05 10:49:03,900 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:273)]
created channel FileChannel-1

	2012-10-05 10:49:03,900 (conf-file-poller-0) [DEBUG - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:74)]
Creating instance of source tail, type exec

	2012-10-05 10:49:03,934 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:70)]
Creating instance of sink: avro-sink, type: avro

	2012-10-05 10:49:03,945 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: SINK, name: avro-sink, registered successfully.

	2012-10-05 10:49:03,947 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:106)]
Starting new configuration:{ sourceRunners:{tail=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:tail,state:IDLE}
}} sinkRunners:{avro-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@68814013
counterGroup:{ name:null counters:{} } }} channels:{FileChannel-1=FileChannel FileChannel-1
{ dataDirs: [/tmp/data] }} }

	2012-10-05 10:49:03,947 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:113)]
Starting Channel FileChannel-1

	2012-10-05 10:49:03,951 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:128)]
Waiting for channel: FileChannel-1 to start. Sleeping for 500 ms

	2012-10-05 10:49:03,952 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:241)]
Starting FileChannel FileChannel-1 { dataDirs: [/tmp/data] }...

	2012-10-05 10:49:03,993 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.<init>(Log.java:248)]
Encryption is not enabled

	2012-10-05 10:49:04,000 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:292)]
Replay started

	2012-10-05 10:49:04,023 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:304)]
Found NextFileID 1, from [/tmp/data/log-1]

	2012-10-05 10:49:04,039 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>(EventQueueBackingStoreFileV3.java:46)]
Starting up with /tmp/checkpoint/checkpoint and /tmp/checkpoint/checkpoint.meta

	2012-10-05 10:49:04,039 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>(EventQueueBackingStoreFileV3.java:50)]
Reading checkpoint metadata from /tmp/checkpoint/checkpoint.meta

	2012-10-05 10:49:04,100 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:336)]
Last Checkpoint Fri Oct 05 10:43:53 PDT 2012, queue depth = 0

	2012-10-05 10:49:04,107 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:355)]
Replaying logs with v2 replay logic

	2012-10-05 10:49:04,110 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:223)]
Starting replay of [/tmp/data/log-1]

	2012-10-05 10:49:04,113 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue$InflightEventWrapper.deserialize(FlumeEventQueue.java:502)]
Reached end of inflights buffer. Long buffer position =2

	2012-10-05 10:49:04,117 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue$InflightEventWrapper.deserialize(FlumeEventQueue.java:502)]
Reached end of inflights buffer. Long buffer position =2

	2012-10-05 10:49:04,118 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:236)]
Replaying /tmp/data/log-1

	2012-10-05 10:49:04,133 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.tools.DirectMemoryUtils.getDefaultDirectMemorySize(DirectMemoryUtils.java:113)]
Unable to get maxDirectMemory from VM: NoSuchMethodException: sun.misc.VM.maxDirectMemory(null)

	2012-10-05 10:49:04,139 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.tools.DirectMemoryUtils.allocate(DirectMemoryUtils.java:47)]
Direct Memory Allocation:  Allocation = 1048576, Allocated = 0, MaxDirectMemorySize = 954466304,
Remaining = 954466304

	2012-10-05 10:49:04,181 (lifecycleSupervisor-1-0) [WARN - org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition(LogFile.java:431)]
Checkpoint for file(/tmp/data/log-1) is: 1349459033373, which is beyond the requested checkpoint
time: 1349459033373 and position 0

	2012-10-05 10:49:04,218 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$SequentialReader.next(LogFile.java:452)]
Encountered EOF at 2423 in /tmp/data/log-1

	2012-10-05 10:49:04,219 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.ReplayHandler.replayLog(ReplayHandler.java:320)]
read: 17, put: 16, take: 0, rollback: 0, commit: 1, skip: 0, eventCount:16

	2012-10-05 10:49:04,222 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:362)]
Rolling /tmp/data

	2012-10-05 10:49:04,223 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.roll(Log.java:726)]
Roll start /tmp/data

	2012-10-05 10:49:04,229 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$Writer.<init>(LogFile.java:138)]
Opened /tmp/data/log-2

	2012-10-05 10:49:04,253 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.roll(Log.java:741)]
Roll end

	2012-10-05 10:49:04,269 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:108)]
Start checkpoint for /tmp/checkpoint/checkpoint, elements to sync = 16

	2012-10-05 10:49:04,306 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:116)]
Updating checkpoint metadata: logWriteOrderID: 1349459344074, queueSize: 16, queueHead: 999998

	2012-10-05 10:49:04,408 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
Updating log-2.meta currentPosition = 0, logWriteOrderID = 1349459344074

	2012-10-05 10:49:04,427 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:796)]
Updated checkpoint for file: /tmp/data/log-2 position: 0 logWriteOrderID: 1349459344074

	2012-10-05 10:49:04,428 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFile$RandomReader.close(LogFile.java:317)]
Closing RandomReader /tmp/data/log-1

	2012-10-05 10:49:04,434 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
Updating log-1.meta currentPosition = 0, logWriteOrderID = 1349459344074

	2012-10-05 10:49:04,451 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:816)]
Updated checkpoint for file: /tmp/data/log-1logWriteOrderID 1349459344074

	2012-10-05 10:49:04,451 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:264)]
Queue Size after replay: 16 [channel=FileChannel-1]

	2012-10-05 10:49:04,452 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: CHANNEL, name: FileChannel-1 started

	2012-10-05 10:49:04,453 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:141)]
Starting Sink avro-sink

	2012-10-05 10:49:04,457 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:152)]
Starting Source tail

	2012-10-05 10:49:04,459 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:239)]
Starting AvroSink avro-sink { host: sig-flume, port: 41414 }...

	2012-10-05 10:49:04,460 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: SINK, name: avro-sink started

	2012-10-05 10:49:04,462 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)]
Exec source starting with command:tail -F /var/log/auth.log

	2012-10-05 10:49:04,462 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
Avro sink avro-sink: Building RpcClient with hostname: sig-flume, port: 41414

	2012-10-05 10:49:04,477 (lifecycleSupervisor-1-2) [DEBUG - org.apache.flume.source.ExecSource.start(ExecSource.java:173)]
Exec source started

	2012-10-05 10:49:04,524 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:420)]
Batch size string = null

	2012-10-05 10:49:05,341 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:192)]
Avro sink avro-sink: Created RpcClient: NettyAvroRpcClient { host: sig-flume, port: 41414
}

	2012-10-05 10:49:05,342 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:253)]
Avro sink avro-sink started.

	2012-10-05 10:49:05,345 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)]
Polling sink runner starting

	2012-10-05 10:49:05,363 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.channel.file.LogFile$Writer.preallocate(LogFile.java:253)]
Preallocating at position 0

	2012-10-05 10:49:07,007 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:108)]
Start checkpoint for /tmp/checkpoint/checkpoint, elements to sync = 16

	2012-10-05 10:49:07,047 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:116)]
Updating checkpoint metadata: logWriteOrderID: 1349459344092, queueSize: 0, queueHead: 14

	2012-10-05 10:49:07,101 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
Updating log-2.meta currentPosition = 692, logWriteOrderID = 1349459344092

	2012-10-05 10:49:07,122 (Log-BackgroundWorker-FileChannel-1) [INFO - org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:796)]
Updated checkpoint for file: /tmp/data/log-2 position: 692 logWriteOrderID: 1349459344092

	2012-10-05 10:49:07,122 (Log-BackgroundWorker-FileChannel-1) [DEBUG - org.apache.flume.channel.file.Log.removeOldLogs(Log.java:846)]
Files currently in use: [2]

	2012-10-05 10:49:34,465 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf <file:///\\conf\flume.conf>  for changes

	2012-10-05 10:49:37,124 (Log-BackgroundWorker-FileChannel-1) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:99)]
Checkpoint not required

	 

	From: Hari Shreedharan [mailto:hshreedharan@cloudera.com] 
	Sent: Thursday, October 04, 2012 4:25 PM
	To: user@flume.apache.org
	Subject: Re: Flume Source and Sink in different hosts

	 

	It depends on what kind of guarantees you need. If you need to make sure your events are
persisted even during process/system failures, you should use the File Channel, else you can
use Memory Channel (performance of Memory Channel is obviously better).  

	 

	Thanks,

	Hari

	 

	-- 

	Hari Shreedharan

	 

	On Thursday, October 4, 2012 at 3:46 PM, Kumar, Suresh wrote:

		 

		Hari, I just noticed some entries in HBase, so this configuration does work.

		I will retry with the changes you recommended. Do you think I should be using

		some other channel type instead of memory?

		 

		Thanks,

		Suresh

		From: Hari Shreedharan [mailto:hshreedharan@cloudera.com] 
		Sent: Thursday, October 04, 2012 3:40 PM
		To: user@flume.apache.org
		Subject: Re: Flume Source and Sink in different hosts

		 

		Looks like your agent was set up properly. Can you increase the heap and try again? You
can do this by setting -Xmx in the flume-env.sh file. Try setting it to 1G or higher, since
you are using memory channel. Also I assume the file you are tailing is getting written to?
I strongly suggest using the AsyncHBaseSink.  

		 

		 

		Thanks,

		Hari

		 

		-- 

		Hari Shreedharan

		 

		On Thursday, October 4, 2012 at 3:19 PM, Kumar, Suresh wrote:

			Yes, my HBase has the table and column family, if I run the /etc/passwd test using flume-ng
client, the table

			gets populated.

			 

			Here is the log from the source agent, there is nothing much in the sink except for which
seem to benign.

			
			Thanks,

			Suresh

			 

			2012-10-04 14:59:05,622 (lifecycleSupervisor-1-0-SendThread(localhost:2181)) [DEBUG - org.apache.zookeeper.client.ZooKeeperSaslClient.clientTunneledAuthenticationInProgress(ZooKeeperSaslClient.java:515)]
Could not retrieve login configuration: java.lang.SecurityException: Unable to locate a login
configuration

			2012-10-04 14:59:08,414 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf <file:///\\conf\flume.conf>  for changes

			 

			source agent log:

			 

			 

			$ bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console
-n agent3

			 

			+ exec /usr/lib/jvm/java-6-sun/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/opt/flume/conf:/opt/flume/lib/*'
-Djava.library.path= org.apache.flume.node.Application -f conf/flume.conf -n agent3

			2012-10-04 15:09:30,778 (main) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 1

			2012-10-04 15:09:30,791 (main) [INFO - org.apache.flume.node.FlumeNode.start(FlumeNode.java:54)]
Flume node starting - agent3

			2012-10-04 15:09:30,799 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:203)]
Node manager starting

			2012-10-04 15:09:30,801 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:67)]
Configuration provider starting

			2012-10-04 15:09:30,810 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.start(LifecycleSupervisor.java:67)]
Starting lifecycle supervisor 10

			2012-10-04 15:09:30,813 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.start(DefaultLogicalNodeManager.java:207)]
Node manager started

			2012-10-04 15:09:30,819 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider.start(AbstractFileConfigurationProvider.java:86)]
Configuration provider started

			2012-10-04 15:09:30,819 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf <file:///\\conf\flume.conf>  for changes

			2012-10-04 15:09:30,821 (conf-file-poller-0) [INFO - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:195)]
Reloading configuration file:conf/flume.conf <file:///\\conf\flume.conf> 

			2012-10-04 15:09:30,839 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:912)]
Added sinks: avro-sink Agent: agent3

			2012-10-04 15:09:30,840 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

			2012-10-04 15:09:30,840 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1002)]
Created context for avro-sink: hostname

			2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

			2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

			2012-10-04 15:09:30,841 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:998)]
Processing:avro-sink

			2012-10-04 15:09:30,841 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:295)]
Starting validation of configuration for agent: agent3, initial-configuration: AgentConfiguration[agent3]

			SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=MemoryChannel-1,
type=exec} }}

			CHANNELS: {MemoryChannel-1={ parameters:{type=memory} }}

			SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=MemoryChannel-1}
}}

			 

			2012-10-04 15:09:30,854 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:450)]
Created channel MemoryChannel-1

			2012-10-04 15:09:30,883 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:655)]
Creating sink: avro-sink using AVRO

			2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:353)]
Post validation configuration for agent3

			AgentConfiguration created without Configuration stubs for which only basic syntactical
validation was performed[agent3]

			SOURCES: {tail={ parameters:{command=tail -F /var/log/auth.log, channels=MemoryChannel-1,
type=exec} }}

			CHANNELS: {MemoryChannel-1={ parameters:{type=memory} }}

			SINKS: {avro-sink={ parameters:{port=41414, hostname=sig-flume, type=avro, channel=MemoryChannel-1}
}}

			 

			2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:117)]
Channels:MemoryChannel-1

			 

			2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:118)]
Sinks avro-sink

			 

			2012-10-04 15:09:30,885 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:119)]
Sources tail

			 

			2012-10-04 15:09:30,885 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:122)]
Post-validation flume configuration contains configuration  for agents: [agent3]

			2012-10-04 15:09:30,886 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:249)]
Creating channels

			2012-10-04 15:09:30,886 (conf-file-poller-0) [DEBUG - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:68)]
Creating instance of channel MemoryChannel-1 type memory

			2012-10-04 15:09:31,013 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: CHANNEL, name: MemoryChannel-1, registered successfully.

			2012-10-04 15:09:31,013 (conf-file-poller-0) [INFO - org.apache.flume.conf.properties.PropertiesFileConfigurationProvider.loadChannels(PropertiesFileConfigurationProvider.java:273)]
created channel MemoryChannel-1

			2012-10-04 15:09:31,014 (conf-file-poller-0) [DEBUG - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:74)]
Creating instance of source tail, type exec

			2012-10-04 15:09:31,037 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:70)]
Creating instance of sink: avro-sink, type: avro

			2012-10-04 15:09:31,045 (conf-file-poller-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.<init>(MonitoredCounterGroup.java:68)]
Monitoried counter group for type: SINK, name: avro-sink, registered successfully.

			2012-10-04 15:09:31,046 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:106)]
Starting new configuration:{ sourceRunners:{tail=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:tail,state:IDLE}
}} sinkRunners:{avro-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@e949f69
counterGroup:{ name:null counters:{} } }} channels:{MemoryChannel-1=org.apache.flume.channel.MemoryChannel{name:
MemoryChannel-1}} }

			2012-10-04 15:09:31,046 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:113)]
Starting Channel MemoryChannel-1

			2012-10-04 15:09:31,049 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:128)]
Waiting for channel: MemoryChannel-1 to start. Sleeping for 500 ms

			2012-10-04 15:09:31,052 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: CHANNEL, name: MemoryChannel-1 started

			2012-10-04 15:09:31,550 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:141)]
Starting Sink avro-sink

			2012-10-04 15:09:31,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:239)]
Starting AvroSink avro-sink { host: sig-flume, port: 41414 }...

			2012-10-04 15:09:31,552 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:82)]
Component type: SINK, name: avro-sink started

			2012-10-04 15:09:31,552 (conf-file-poller-0) [INFO - org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.startAllComponents(DefaultLogicalNodeManager.java:152)]
Starting Source tail

			2012-10-04 15:09:31,554 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:178)]
Avro sink avro-sink: Building RpcClient with hostname: sig-flume, port: 41414

			2012-10-04 15:09:31,561 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)]
Exec source starting with command:tail -F /var/log/auth.log

			2012-10-04 15:09:31,586 (lifecycleSupervisor-1-1) [DEBUG - org.apache.flume.source.ExecSource.start(ExecSource.java:173)]
Exec source started

			2012-10-04 15:09:31,626 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:420)]
Batch size string = null

			2012-10-04 15:09:32,684 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:192)]
Avro sink avro-sink: Created RpcClient: NettyAvroRpcClient { host: sig-flume, port: 41414
}

			2012-10-04 15:09:32,685 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.sink.AvroSink.start(AvroSink.java:253)]
Avro sink avro-sink started.

			2012-10-04 15:09:32,689 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)]
Polling sink runner starting

			2012-10-04 15:10:01,565 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(AbstractFileConfigurationProvider.java:188)]
Checking file:conf/flume.conf <file:///\\conf\flume.conf>  for changes

			2012-10-04 15:10:31,567 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.file.AbstractFileConfigurationProvider$FileWatcherRunnable.run(Abstr

			 

			 

			From: Hari Shreedharan [mailto:hshreedharan@cloudera.com] 
			Sent: Thursday, October 04, 2012 3:02 PM
			To: user@flume.apache.org
			Subject: Re: Flume Source and Sink in different hosts

			 

			Can you send the logs also, of both agents? Does your Hbase cluster have the said column
family and table with that family? 

			 

			Also are you sure the files are not getting rotated out. You should use tail -F so that
your code works even with files getting rotated out.

			 

			 

			Hari

			-- 

			Hari Shreedharan

			 

			On Thursday, October 4, 2012 at 2:53 PM, Kumar, Suresh wrote:

				Hello:

				 

				I have just downloaded and build flume-ng (apache-flume-1.3.0-SNAPSHOT).

				 

				My goal is to collect log data from HostA (source) and send it to HostB(sink), my initial
test (sending /etc/passwd) 

				from HostA to HostB worked fine, I was also able to load the passwd file into my HBase
in HostB.

				 

				Now, I want to load a continuous stream of log data (using tail –f), but I was not able
to replicate the above process.

				Flume just started fine in HostA, but I do not see any data being received by HostB or
in my HBase.

				 

				What is wrong with my configuration?

				 

				Thanks,

				Suresh

				 

				Here is my flume.conf in HostA

				 

				agent3.sources = tail

				agent3.channels = MemoryChannel-1

				agent3.sinks = avro-sink

				 

				# Define source flow

				agent3.sources.tail.type = exec

				agent3.sources.tail.command = tail -f /var/log/auth.log

				agent3.sources.tail.channels = MemoryChannel-1

				 

				# What kind of channel

				agent3.channels.MemoryChannel-1.type = memory

				 

				# avro sink properties

				agent3.sinks.avro-sink.type = avro

				agent3.sinks.avro-sink.channel = MemoryChannel-1

				agent3.sinks.avro-sink.hostname = hostb

				agent3.sinks.avro-sink.port = 41414

				 

				Here is my flume.conf in HostB

				 

				# Define a memory channel called ch1 on agent1

				agent1.channels.ch1.type = memory

				 

				# Define an Avro source called avro-source1 on agent1 and tell it

				# to bind to 0.0.0.0:41414. Connect it to channel ch1.

				agent1.sources.avro-source1.channels = ch1

				agent1.sources.avro-source1.type = avro

				agent1.sources.avro-source1.bind = 0.0.0.0

				agent1.sources.avro-source1.port = 41414

				 

				# Define a logger sink that simply logs all events it receives

				# and connect it to the other end of the same channel.

				agent1.sinks.log-sink1.channel = ch1

				agent1.sinks.log-sink1.type = logger

				 

				# Finally, now that we've defined all of our components, tell

				# agent1 which ones we want to activate.

				agent1.channels = ch1

				agent1.sources = avro-source1

				#agent1.sources = avro-source1

				agent1.sinks = sink1

				 

				agent1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink

				agent1.sinks.sink1.channel = ch1

				agent1.sinks.sink1.table = flumedemo

				agent1.sinks.sink1.columnFamily = testing

				agent1.sinks.sink1.column = foo

				agent1.sinks.sink1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

				agent1.sinks.sink1.serializer.payloadColumn = col1

				agent1.sinks.sink1.serializer.keyType = timestamp

				agent1.sinks.sink1.serializer.rowPrefix = 1

				agent1.sinks.sink1.serializer.suffix = timestamp

				agent1.sinks.sink1.serializer.payloadColumn = pcol

				agent1.sinks.sink1.serializer.incrementColumn = icol

				 

			 

		 

	 

 

Mime
View raw message