flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Wang, Yongkun | Yongkun | DU" <yongkun.w...@mail.rakuten.com>
Subject RE: Multi hop data flow with end-to-end(E2E) sink
Date Mon, 04 Jul 2011 08:41:40 GMT

Thank you for the analysis.

For the multi-hop flow, I have realized that only the originating node should be configured
with E2E sink, and BE sink for the middle nodes, and collectorSink for the last node.

Maybe we don't need to have any changes on the source code to prevent the crash when user
configures the E2E sink on middle node, 
but I think it might be necessary to mention this E2E multi-hop configuration how-to in flume
user guide.


-----Original Message-----
From: Jonathan Hsieh [mailto:jon@cloudera.com] 
Sent: Sunday, July 03, 2011 7:20 AM
To: flume-user@incubator.apache.org
Subject: Re: Multi hop data flow with end-to-end(E2E) sink

Hi Kun 

Welcome to the new list!

Answer is inline.


On Wed, Jun 29, 2011 at 1:25 AM, Wang, Yongkun | Yongkun | DU <yongkun.wang@mail.rakuten.com>

	I am testing using multi-tier flume nodes with multi hops (more than 3 hops).
	The simplest case is described as follow:
	agent1 -> collector1 -> collector2 -> hdfs
	agent1 : tail("file") | agentE2ESink("collector1", 35854) ;
	collector1 : collectorSouce(35854) | agentE2ESink("collector2", 35855) ;
	collector2 : collectorSource(35855) | collectorSink("hdfs:///...", "test-") ;
	The sink configuration of the bridge node "collector1" seems tricky. Maybe using agentE2ESink
is not correct, because collector2 crashed with an exception in my test.

	If I configured "collector1" with agentDFOSink, agentBESink, or rpcSink, the whole system
worked well. And I checked the log message on "collector2", it showed that collector2 was
working in end-to-end mode. It seems that the whole work flow will follow E2E mode as long
as the original agent is configured with E2E sink. The sink configuration of intermediate
node (collector1) doesn't have any influmence.

Ah, I think I know what the problem is -- the middle node (collector1)'s agentE2ESink will
try to add more metadata to the message and the node will complain.  By pushing data through
another agentE2E, you are actually adding another write ahead log into the stream which is
unnecessary for end-to-end reliability.  You only need the agentE2E on the originating node,
and on the final end point.  The other modes work because they don't try to add metadata.
 My suggestion is to use agentBESink/Chain in the intermediate node.

	I would like to get your confirmation or explanation on this point. Thank you very much.
	I got the following exception when collector1 was configured with agentE2ESink:
	2011-06-28 21:12:54,004 [logicalNode collector1-47] ERROR connector.DirectDriver: Driving
src/sink failed! LazyOpenSource | LazyOpenDecorator because Event already had an event with
attribute AckType
	java.lang.IllegalArgumentException: Event already had an event with attribute AckType
	       at com.cloudera.flume.handlers.thrift.ThriftEventAdaptor.set(ThriftEventAdaptor.java:184)
	       at com.cloudera.flume.handlers.endtoend.AckChecksumInjector.append(AckChecksumInjector.java:139)
	       at com.cloudera.flume.agent.durability.NaiveFileWALManager$1.append(NaiveFileWALManager.java:457)
	       at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:174)
	       at com.cloudera.flume.agent.durability.NaiveFileWALDeco.append(NaiveFileWALDeco.java:138)
	       at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:112)
	       at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:60)
	       at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:75)
	       at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:93)

Yup, this confirmed the attempt to overwrite metadata (in this case it is the AckType attribute)


// Jonathan Hsieh (shay)
// Software Engineer, Cloudera

// jon@cloudera.com

View raw message