kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1401775 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: producer/ProducerConfig.scala producer/SyncProducer.scala producer/SyncProducerConfig.scala server/KafkaConfig.scala
Date Wed, 24 Oct 2012 16:46:32 GMT
Author: junrao
Date: Wed Oct 24 16:46:32 2012
New Revision: 1401775

URL: http://svn.apache.org/viewvc?rev=1401775&view=rev
Log:
remove connection timeout in SyncProducer; patched by Swapnil Ghike; reviewed by Jun Rao;
kafka-579

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1401775&r1=1401774&r2=1401775&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Wed
Oct 24 16:46:32 2012
@@ -38,12 +38,6 @@ class ProducerConfig private (val props:
    */
   val brokerList = props.getString("broker.list")
 
-  /**
-   * If DefaultEventHandler is used, this specifies the number of times to
-   * retry if an error is encountered during send.
-   */
-  val numRetries = props.getInt("num.retries", 0)
-
   /** the partitioner class for partitioning events amongst sub-topics */
   val partitionerClass = props.getString("partitioner.class", "kafka.producer.DefaultPartitioner")
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1401775&r1=1401774&r2=1401775&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Wed
Oct 24 16:46:32 2012
@@ -37,7 +37,6 @@ class SyncProducer(val config: SyncProdu
   
   private val MaxConnectBackoffMs = 60000
   private var sentOnConnection = 0
-  private var lastConnectionTime = -1L
 
   private val lock = new Object()
   @volatile private var shutdown: Boolean = false
@@ -82,13 +81,6 @@ class SyncProducer(val config: SyncProdu
           throw e
         case e => throw e
       }
-      // TODO: do we still need this?
-      sentOnConnection += 1
-
-      if(sentOnConnection >= config.reconnectInterval || (config.reconnectTimeInterval
>= 0 && System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval))
{
-        reconnect()
-        sentOnConnection = 0
-      }
       response
     }
   }
@@ -138,24 +130,15 @@ class SyncProducer(val config: SyncProdu
   }
     
   private def connect(): BlockingChannel = {
-    var connectBackoffMs = 1
-    val beginTimeMs = SystemTime.milliseconds
-    while(!blockingChannel.isConnected && !shutdown) {
+    if (!blockingChannel.isConnected && !shutdown) {
       try {
         blockingChannel.connect()
-        lastConnectionTime = System.currentTimeMillis
         info("Connected to " + config.host + ":" + config.port + " for producing")
       } catch {
         case e: Exception => {
           disconnect()
-          val endTimeMs = SystemTime.milliseconds
-          if ( (endTimeMs - beginTimeMs + connectBackoffMs) > config.connectTimeoutMs
) {
-            error("Producer connection to " +  config.host + ":" + config.port + " timing
out after " + config.connectTimeoutMs + " ms", e)
-            throw e
-          }
-          error("Connection attempt to " +  config.host + ":" + config.port + " failed, next
attempt in " + connectBackoffMs + " ms", e)
-          SystemTime.sleep(connectBackoffMs)
-          connectBackoffMs = math.min(10 * connectBackoffMs, MaxConnectBackoffMs)
+          error("Producer connection to " +  config.host + ":" + config.port + " unsuccessful",
e)
+          throw e
         }
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1401775&r1=1401774&r2=1401775&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
Wed Oct 24 16:46:32 2012
@@ -38,13 +38,6 @@ trait SyncProducerConfigShared {
   
   val bufferSize = props.getInt("buffer.size", 100*1024)
 
-  val connectTimeoutMs = props.getInt("connect.timeout.ms", 5000)
-
-  val reconnectInterval = props.getInt("reconnect.interval", 30000)
-
-  /** negative reconnect time interval means disabling this time-based reconnect feature
*/
-  var reconnectTimeInterval = props.getInt("reconnect.time.interval.ms", 1000*1000*10)
-
   val maxMessageSize = props.getInt("max.message.size", 1000000)
 
   /* the client application sending the producer requests */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1401775&r1=1401774&r2=1401775&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Wed Oct
24 16:46:32 2012
@@ -135,10 +135,6 @@ class KafkaConfig private (val props: Ve
   /* default replication factors for automatically created topics */
   val defaultReplicationFactor = props.getInt("default.replication.factor", 1)
 
-  /* wait time in ms to allow the preferred replica for a partition to become the leader.
This property is used during
-  * leader election on all replicas minus the preferred replica */
-  val preferredReplicaWaitTime = props.getLong("preferred.replica.wait.time", 300)
-
   val replicaMaxLagTimeMs = props.getLong("replica.max.lag.time.ms", 10000)
 
   val replicaMaxLagBytes = props.getLong("replica.max.lag.bytes", 4000)



Mime
View raw message