kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2336: Changing offsets.topic.num.partitions after the offset topic is created breaks consumer group partition assignment; Reviewed by Jiangjie Qin, Gwen Shapira
Date Tue, 11 Aug 2015 22:08:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1fdb758f2 -> 96534a7d5


KAFKA-2336: Changing offsets.topic.num.partitions after the offset topic is created breaks
consumer group partition assignment; Reviewed by Jiangjie Qin, Gwen Shapira


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/96534a7d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/96534a7d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/96534a7d

Branch: refs/heads/trunk
Commit: 96534a7d502be58026feec3c2012f022bf330049
Parents: 1fdb758
Author: Grant Henke <granthenke@gmail.com>
Authored: Tue Aug 11 15:07:40 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Aug 11 15:07:40 2015 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/OffsetManager.scala | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/96534a7d/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 47b6ce9..0e613e7 100755
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -96,6 +96,7 @@ class OffsetManager(val config: OffsetManagerConfig,
   private val loadingPartitions: mutable.Set[Int] = mutable.Set()
   private val cleanupOrLoadMutex = new Object
   private val shuttingDown = new AtomicBoolean(false)
+  private val offsetsTopicPartitionCount = getOffsetsTopicPartitionCount
 
   this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: "
 
@@ -170,7 +171,7 @@ class OffsetManager(val config: OffsetManagerConfig,
   }
 
 
-  def partitionFor(group: String): Int = Utils.abs(group.hashCode) % config.offsetsTopicNumPartitions
+  def partitionFor(group: String): Int = Utils.abs(group.hashCode) % offsetsTopicPartitionCount
 
   /**
    * Fetch the current offset for the given group/topic/partition from the underlying offsets
storage.
@@ -436,13 +437,24 @@ class OffsetManager(val config: OffsetManagerConfig,
 
     if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
                              .format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName,
offsetsPartition)))
-
   }
 
   def shutdown() {
     shuttingDown.set(true)
   }
 
+  /**
+   * Gets the partition count of the offsets topic from ZooKeeper.
+   * If the topic does not exist, the configured partition count is returned.
+   */
+  private def getOffsetsTopicPartitionCount = {
+    val topic = ConsumerCoordinator.OffsetsTopicName
+    val topicData = ZkUtils.getPartitionAssignmentForTopics(zkClient, Seq(topic))
+    if (topicData(topic).nonEmpty)
+      topicData(topic).size
+    else
+      config.offsetsTopicNumPartitions
+  }
 }
 
 object OffsetManager {


Mime
View raw message