kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3012: Avoid reserved.broker.max.id collisions on upgrade
Date Tue, 19 Jan 2016 02:40:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 a9395e746 -> ebd5f3360


KAFKA-3012: Avoid reserved.broker.max.id collisions on upgrade

Provides a configuration to opt out of broker id generation.

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Gwen Shapira

Closes #762 from granthenke/id-generation

(cherry picked from commit 5c337d759892ae6d46d6901a24e9b97cebd2a4da)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ebd5f336
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ebd5f336
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ebd5f336

Branch: refs/heads/0.9.0
Commit: ebd5f33605c8891e606a9a4b793691aa62d7963c
Parents: a9395e7
Author: Grant Henke <granthenke@gmail.com>
Authored: Mon Jan 18 18:39:55 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Mon Jan 18 18:40:35 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaConfig.scala  | 11 ++++++++++-
 core/src/main/scala/kafka/server/KafkaServer.scala  |  6 +++---
 .../scala/unit/kafka/server/KafkaConfigTest.scala   |  4 +++-
 .../kafka/server/ServerGenerateBrokerIdTest.scala   | 16 ++++++++++++++++
 docs/upgrade.html                                   |  8 +++++++-
 5 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ebd5f336/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4b46ef7..5cc10b4 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -42,6 +42,7 @@ object Defaults {
   val ZkEnableSecureAcls = false
 
   /** ********* General Configuration ***********/
+  val BrokerIdGenerationEnable = true
   val MaxReservedBrokerId = 1000
   val BrokerId = -1
   val MessageMaxBytes = 1000000 + MessageSet.LogOverhead
@@ -190,6 +191,7 @@ object KafkaConfig {
   val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
   val ZkEnableSecureAclsProp = "zookeeper.set.acl"
   /** ********* General Configuration ***********/
+  val BrokerIdGenerationEnableProp = "broker.id.generation.enable"
   val MaxReservedBrokerIdProp = "reserved.broker.max.id"
   val BrokerIdProp = "broker.id"
   val MessageMaxBytesProp = "message.max.bytes"
@@ -338,6 +340,7 @@ object KafkaConfig {
   val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
   val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
   /** ********* General Configuration ***********/
+  val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server?
When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
   val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
   val BrokerIdDoc = "The broker id for this server. " +
   "To avoid conflicts between zookeeper generated brokerId and user's config.brokerId " +
@@ -522,6 +525,7 @@ object KafkaConfig {
       .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc)
 
       /** ********* General Configuration ***********/
+      .define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BrokerIdGenerationEnable, MEDIUM,
BrokerIdGenerationEnableDoc)
       .define(MaxReservedBrokerIdProp, INT, Defaults.MaxReservedBrokerId, atLeast(0), MEDIUM,
MaxReservedBrokerIdDoc)
       .define(BrokerIdProp, INT, Defaults.BrokerId, HIGH, BrokerIdDoc)
       .define(MessageMaxBytesProp, INT, Defaults.MessageMaxBytes, atLeast(0), HIGH, MessageMaxBytesDoc)
@@ -720,6 +724,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
   val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
 
   /** ********* General Configuration ***********/
+  val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
   var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
   val numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
@@ -929,7 +934,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
   validateValues()
 
   private def validateValues() {
-    require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must
be equal or greater than -1 and not greater than reserved.broker.max.id")
+    if(brokerIdGenerationEnable) {
+      require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id
must be equal or greater than -1 and not greater than reserved.broker.max.id")
+    } else {
+      require(brokerId >= 0, "broker.id must be equal or greater than 0")
+    }
     require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
     require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater
than 0")
     require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms
must be unlimited (-1) or, equal or greater than 1")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebd5f336/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 9eedbe2..c0ae991 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -600,9 +600,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
   }
 
   /**
-    * Generates new brokerId or reads from meta.properties based on following conditions
+    * Generates new brokerId if enabled or reads from meta.properties based on following
conditions
     * <ol>
-    * <li> config has no broker.id provided , generates a broker.id based on Zookeeper's
sequence
+    * <li> config has no broker.id provided and broker id generation is enabled, generates
a broker.id based on Zookeeper's sequence
     * <li> stored broker.id in meta.properties doesn't match in all the log.dirs throws
InconsistentBrokerIdException
     * <li> config has broker.id and meta.properties contains broker.id if they don't
match throws InconsistentBrokerIdException
     * <li> config has broker.id and there is no meta.properties file, creates new meta.properties
and stores broker.id
@@ -628,7 +628,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
       throw new InconsistentBrokerIdException("Failed to match brokerId across logDirs")
     else if(brokerId >= 0 && brokerIdSet.size == 1 && brokerIdSet.last
!= brokerId)
       throw new InconsistentBrokerIdException("Configured brokerId %s doesn't match stored
brokerId %s in meta.properties".format(brokerId, brokerIdSet.last))
-    else if(brokerIdSet.size == 0 && brokerId < 0)  // generate a new brokerId
from Zookeeper
+    else if(brokerIdSet.size == 0 && brokerId < 0 && config.brokerIdGenerationEnable)
 // generate a new brokerId from Zookeeper
       brokerId = generateBrokerId
     else if(brokerIdSet.size == 1) // pick broker.id from meta.properties
       brokerId = brokerIdSet.last

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebd5f336/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 8a5038f..9ddc2c1 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -408,7 +408,7 @@ class KafkaConfigTest {
         case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string
-          
+
         case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name,
"not_a_number")
         case KafkaConfig.HostNameProp => // ignore string
         case KafkaConfig.AdvertisedHostNameProp => //ignore string
@@ -526,6 +526,7 @@ class KafkaConfigTest {
     defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
     // For ZkConnectionTimeoutMs
     defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
+    defaults.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
     defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
     defaults.put(KafkaConfig.BrokerIdProp, "1")
     defaults.put(KafkaConfig.HostNameProp, "127.0.0.1")
@@ -542,6 +543,7 @@ class KafkaConfigTest {
     val config = KafkaConfig.fromProps(defaults)
     Assert.assertEquals("127.0.0.1:2181", config.zkConnect)
     Assert.assertEquals(1234, config.zkConnectionTimeoutMs)
+    Assert.assertEquals(false, config.brokerIdGenerationEnable)
     Assert.assertEquals(1, config.maxReservedBrokerId)
     Assert.assertEquals(1, config.brokerId)
     Assert.assertEquals("127.0.0.1", config.hostName)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebd5f336/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index 9afb2ca..60ec561 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -82,6 +82,22 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testDisableGeneratedBrokerId() {
+    val props3 = TestUtils.createBrokerConfig(3, zkConnect)
+    props3.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    // Set reserve broker ids to cause collision and ensure disabling broker id generation
ignores the setting
+    props3.put(KafkaConfig.MaxReservedBrokerIdProp, "0")
+    val config3 = KafkaConfig.fromProps(props3)
+    val server3 = new KafkaServer(config3)
+    server3.startup()
+    assertEquals(server3.config.brokerId,3)
+    server3.shutdown()
+    assertTrue(verifyBrokerMetadata(server3.config.logDirs,3))
+    CoreUtils.rm(server3.config.logDirs)
+    TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
+  }
+
+  @Test
   def testMultipleLogDirsMetaProps() {
     // add multiple logDirs and check if the generate brokerId is stored in all of them
     val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/ebd5f336/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 98ac570..4ae1c27 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -39,7 +39,6 @@
     <li> Broker IDs above 1000 are now reserved by default to automatically assigned
broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase
the reserved.broker.max.id broker configuration property accordingly. </li>
     <li> Configuration parameter replica.lag.max.messages was removed. Partition leaders
will no longer consider the number of lagging messages when deciding which replicas are in
sync. </li>
     <li> Configuration parameter replica.lag.time.max.ms now refers not just to the
time passed since last fetch request from replica, but also to time since the replica last
caught up. Replicas that are still fetching messages from leaders but did not catch up to
the latest messages in replica.lag.time.max.ms will be considered out of sync. </li>
-    <li> Configuration parameter log.cleaner.enable is now true by default. This means
topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap
will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want
to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based
on your usage of compacted topics. </li>
     <li> MirrorMaker no longer supports multiple target clusters. As a result it will
only accept a single --consumer.config parameter. To mirror multiple source clusters, you
will need at least one MirrorMaker instance per source cluster, each with its own consumer
configuration. </li>
     <li> Tools packaged under <em>org.apache.kafka.clients.tools.*</em>
have been moved to <em>org.apache.kafka.tools.*</em>. All included scripts will
still function as usual, only custom code directly importing these classes will be affected.
</li>
     <li> The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have
been changed in kafka-run-class.sh. </li>
@@ -49,6 +48,13 @@
     <li> By default all command line tools will print all logging messages to stderr
instead of stout. </li>
 </ul>
 
+<h5><a id="upgrade_901_notable" href="#upgrade_901_notable">Notable changes in
0.9.0.1</a></h5>
+
+<ul>
+    <li> The new broker id generation feature can be disable by setting broker.id.generation.enable
to false. </li>
+    <li> Configuration parameter log.cleaner.enable is now true by default. This means
topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap
will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want
to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based
on your usage of compacted topics. </li>
+</ul>
+
 <h5>Deprecations in 0.9.0.0</h5>
 
 <ul>


Mime
View raw message