kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1207645 - in /incubator/kafka/trunk/core/src/main/scala/kafka/consumer: ConsumerConfig.scala ZookeeperConsumerConnector.scala
Date Mon, 28 Nov 2011 21:56:14 GMT
Author: junrao
Date: Mon Nov 28 21:56:14 2011
New Revision: 1207645

URL: http://svn.apache.org/viewvc?rev=1207645&view=rev
Log:
make # of consumer rebalance retries configurable; patched by Jun Rao; reviewed by Neha Narkhede;
KAFKA-213

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1207645&r1=1207644&r2=1207645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Mon Nov
28 21:56:14 2011
@@ -30,6 +30,7 @@ object ConsumerConfig {
   val AutoCommit = true
   val AutoCommitInterval = 10 * 1000
   val MaxQueuedChunks = 100
+  val MaxRebalanceRetries = 4
   val AutoOffsetReset = OffsetRequest.SmallestTimeString
   val ConsumerTimeoutMs = -1
   val MirrorTopicsWhitelist = ""
@@ -77,6 +78,9 @@ class ConsumerConfig(props: Properties) 
   /** max number of messages buffered for consumption */
   val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
 
+  /** max number of retries during rebalance */
+  val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries)
+
   /* what to do if an offset is out of range.
      smallest : automatically reset the offset to the smallest offset
      largest : automatically reset the offset to the largest offset

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1207645&r1=1207644&r2=1207645&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Mon Nov 28 21:56:14 2011
@@ -67,7 +67,6 @@ import kafka.common.InvalidConfigExcepti
  *
  */
 private[kafka] object ZookeeperConsumerConnector {
-  val MAX_N_RETRIES = 4
   val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
 }
 
@@ -424,7 +423,7 @@ private[kafka] class ZookeeperConsumerCo
 
     def syncedRebalance() {
       rebalanceLock synchronized {
-        for (i <- 0 until ZookeeperConsumerConnector.MAX_N_RETRIES) {
+        for (i <- 0 until config.maxRebalanceRetries) {
           info("begin rebalancing consumer " + consumerIdString + " try #" + i)
           var done = false
           try {
@@ -447,7 +446,7 @@ private[kafka] class ZookeeperConsumerCo
         }
       }
 
-      throw new RuntimeException(consumerIdString + " can't rebalance after " + ZookeeperConsumerConnector.MAX_N_RETRIES
+" retires")
+      throw new RuntimeException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries
+" retries")
     }
 
     private def rebalance(): Boolean = {



Mime
View raw message