kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1154804 - /incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala
Date Sun, 07 Aug 2011 23:56:05 GMT
Author: junrao
Date: Sun Aug  7 23:56:05 2011
New Revision: 1154804

URL: http://svn.apache.org/viewvc?rev=1154804&view=rev
Log:
Options in SyncProducerConfig and AsyncProducerConfig can leak, KAFKA-83

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1154804&r1=1154803&r2=1154804&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerPool.scala Sun Aug  7
23:56:05 2011
@@ -69,24 +69,15 @@ class ProducerPool[V](private val config
    * @param port the port of the broker
    */
   def addProducer(broker: Broker) {
+    val props = new Properties()
+    props.put("host", broker.host)
+    props.put("port", broker.port.toString)
+    props.putAll(config.props)
     if(sync) {
-        val props = new Properties()
-        props.put("host", broker.host)
-        props.put("port", broker.port.toString)
-        props.put("buffer.size", config.bufferSize.toString)
-        props.put("connect.timeout.ms", config.connectTimeoutMs.toString)
-        props.put("reconnect.interval", config.reconnectInterval.toString)
         val producer = new SyncProducer(new SyncProducerConfig(props))
         logger.info("Creating sync producer for broker id = " + broker.id + " at " + broker.host
+ ":" + broker.port)
         syncProducers.put(broker.id, producer)
     } else {
-        val props = new Properties()
-        props.put("host", broker.host)
-        props.put("port", broker.port.toString)
-        props.put("queue.time", config.queueTime.toString)
-        props.put("queue.size", config.queueSize.toString)
-        props.put("batch.size", config.batchSize.toString)
-        props.put("serializer.class", config.serializerClass)
         val producer = new AsyncProducer[V](new AsyncProducerConfig(props),
                                             new SyncProducer(new SyncProducerConfig(props)),
                                             serializer,



Mime
View raw message