kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3525; getSequenceId should return 1 for first data node creation
Date Wed, 11 May 2016 00:52:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fe0335ea1 -> 3e89d2bc5


KAFKA-3525; getSequenceId should return 1 for first data node creation

ZkUtils.getSequenceId() method is used to generate broker id sequence numbers. During startup,
each broker updates the data at /brokers/seqid zk path and returns stat.getVersion as next
sequence id.

stat.getVersion returns "1" for first data update. So ZkUtils.getSequenceId() should return
"1" on first data update.

Author: Manikumar reddy O <manikumar.reddy@gmail.com>

Reviewers: Flavio Junqueira <fpj@apache.org>, Ismael Juma <ismael@juma.me.uk>

Closes #1224 from omkreddy/KAFKA-3525


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e89d2bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e89d2bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e89d2bc

Branch: refs/heads/trunk
Commit: 3e89d2bc59b1bf6c73b262a36019df01af7a958c
Parents: fe0335e
Author: Manikumar reddy O <manikumar.reddy@gmail.com>
Authored: Wed May 11 01:52:43 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed May 11 01:52:43 2016 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaConfig.scala   |  2 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 20 ++++++--------------
 .../server/ServerGenerateBrokerIdTest.scala     |  8 ++++++++
 3 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3e89d2bc/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 596cc58..dff2b66 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -356,7 +356,7 @@ object KafkaConfig {
   val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
   val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
   /** ********* General Configuration ***********/
-  val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server?
When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
+  val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server.
When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
   val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
   val BrokerIdDoc = "The broker id for this server. If unset, a unique broker id will be
generated." +
   "To avoid conflicts between zookeeper generated broker id's and user configured broker
id's, generated broker ids" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e89d2bc/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 81eb24a..ec72029 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -792,24 +792,16 @@ class ZkUtils(val zkClient: ZkClient,
   /**
     * This API produces a sequence number by creating / updating given path in zookeeper
     * It uses the stat returned by the zookeeper and return the version. Every time
-    * client updates the path stat.version gets incremented
+    * client updates the path stat.version gets incremented. Starting value of sequence number
is 1.
     */
   def getSequenceId(path: String, acls: java.util.List[ACL] = DefaultAcls): Int = {
+    def writeToZk: Int = zkClient.writeDataReturnStat(path, "", -1).getVersion
     try {
-      val stat = zkClient.writeDataReturnStat(path, "", -1)
-      stat.getVersion
+      writeToZk
     } catch {
-      case e: ZkNoNodeException => {
-        createParentPath(BrokerSequenceIdPath, acls)
-        try {
-          zkClient.createPersistent(BrokerSequenceIdPath, "", acls)
-          0
-        } catch {
-          case e: ZkNodeExistsException =>
-            val stat = zkClient.writeDataReturnStat(BrokerSequenceIdPath, "", -1)
-            stat.getVersion
-        }
-      }
+      case e1: ZkNoNodeException =>
+        makeSurePersistentPathExists(path)
+        writeToZk
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e89d2bc/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 8e25366..312edd4 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -187,4 +187,12 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     }
     true
   }
+
+  @Test
+  def testGetSequenceIdMethod() {
+    val path = "/test/seqid"
+    (1 to 10).foreach { seqid =>
+      assertEquals(seqid, zkUtils.getSequenceId(path))
+    }
+  }
 }


Mime
View raw message