kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4623; Default unclean.leader.election.enabled to false (KIP-106)
Date Tue, 02 May 2017 15:28:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 64ea193ce -> 46da01a4a


KAFKA-4623; Default unclean.leader.election.enabled to false (KIP-106)

Author: sharad.develop <sharad.develop@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2625 from sharad-develop/KAFKA-4623


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/46da01a4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/46da01a4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/46da01a4

Branch: refs/heads/trunk
Commit: 46da01a4a7d12243a7f8b1d5a51f7815a9162320
Parents: 64ea193
Author: sharad-develop <sharad.develop@gmail.com>
Authored: Tue May 2 16:07:25 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue May 2 16:27:28 2017 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaConfig.scala   |  2 +-
 .../kafka/api/ConsumerBounceTest.scala          |  1 +
 .../integration/UncleanLeaderElectionTest.scala | 27 ++++++++++----------
 .../unit/kafka/server/KafkaConfigTest.scala     |  2 +-
 .../unit/kafka/server/LeaderElectionTest.scala  | 11 +++++---
 docs/upgrade.html                               |  2 ++
 6 files changed, 25 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/46da01a4/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 675e22c..47046ce 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -131,7 +131,7 @@ object Defaults {
   val AutoLeaderRebalanceEnable = true
   val LeaderImbalancePerBrokerPercentage = 10
   val LeaderImbalanceCheckIntervalSeconds = 300
-  val UncleanLeaderElectionEnable = true
+  val UncleanLeaderElectionEnable = false
   val InterBrokerSecurityProtocol = SecurityProtocol.PLAINTEXT.toString
   val InterBrokerProtocolVersion = ApiVersion.latestVersion.toString
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/46da01a4/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 1ce95fb..fcfe0da 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -49,6 +49,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't
want to lose offset
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
   this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small
enough session timeout
+  this.serverConfig.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "true")
   this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")

http://git-wip-us.apache.org/repos/asf/kafka/blob/46da01a4/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index a8ba283..2597e81 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -26,7 +26,6 @@ import java.util.Properties
 import java.util.concurrent.ExecutionException
 
 import kafka.admin.AdminUtils
-import kafka.common.FailedToSendMessageException
 import kafka.consumer.{Consumer, ConsumerConfig}
 import kafka.serializer.StringDecoder
 import kafka.server.{KafkaConfig, KafkaServer}
@@ -67,9 +66,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
     configProps2 = createBrokerConfig(brokerId2, zkConnect)
 
     for (configProps <- List(configProps1, configProps2)) {
-      configProps.put("controlled.shutdown.enable", String.valueOf(enableControlledShutdown))
-      configProps.put("controlled.shutdown.max.retries", String.valueOf(1))
-      configProps.put("controlled.shutdown.retry.backoff.ms", String.valueOf(1000))
+      configProps.put("controlled.shutdown.enable", enableControlledShutdown.toString)
+      configProps.put("controlled.shutdown.max.retries", "1")
+      configProps.put("controlled.shutdown.retry.backoff.ms", "1000")
     }
 
     // temporarily set loggers to a higher level so that tests run quietly
@@ -104,7 +103,9 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
   @Test
   def testUncleanLeaderElectionEnabled {
-    // unclean leader election is enabled by default
+    // enable unclean leader election
+    configProps1.put("unclean.leader.election.enable", "true")
+    configProps2.put("unclean.leader.election.enable", "true")
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
@@ -115,9 +116,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
 
   @Test
   def testUncleanLeaderElectionDisabled {
-	  // disable unclean leader election
-	  configProps1.put("unclean.leader.election.enable", String.valueOf(false))
-  	configProps2.put("unclean.leader.election.enable", String.valueOf(false))
+    // unclean leader election is disabled by default
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker
@@ -129,13 +128,13 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   @Test
   def testUncleanLeaderElectionEnabledByTopicOverride {
     // disable unclean leader election globally, but enable for our specific test topic
-    configProps1.put("unclean.leader.election.enable", String.valueOf(false))
-    configProps2.put("unclean.leader.election.enable", String.valueOf(false))
+    configProps1.put("unclean.leader.election.enable", "false")
+    configProps2.put("unclean.leader.election.enable", "false")
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader
election enabled
     val topicProps = new Properties()
-    topicProps.put("unclean.leader.election.enable", String.valueOf(true))
+    topicProps.put("unclean.leader.election.enable", "true")
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId
-> Seq(brokerId1, brokerId2)),
       topicProps)
 
@@ -145,13 +144,13 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   @Test
   def testCleanLeaderElectionDisabledByTopicOverride {
     // enable unclean leader election globally, but disable for our specific test topic
-    configProps1.put("unclean.leader.election.enable", String.valueOf(true))
-    configProps2.put("unclean.leader.election.enable", String.valueOf(true))
+    configProps1.put("unclean.leader.election.enable", "true")
+    configProps2.put("unclean.leader.election.enable", "true")
     startBrokers(Seq(configProps1, configProps2))
 
     // create topic with 1 partition, 2 replicas, one on each broker, and unclean leader
election disabled
     val topicProps = new Properties()
-    topicProps.put("unclean.leader.election.enable", String.valueOf(false))
+    topicProps.put("unclean.leader.election.enable", "false")
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId
-> Seq(brokerId1, brokerId2)),
       topicProps)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/46da01a4/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b54d5d1..23c11aa 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -410,7 +410,7 @@ class KafkaConfigTest {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
     val serverConfig = KafkaConfig.fromProps(props)
 
-    assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
+    assertEquals(serverConfig.uncleanLeaderElectionEnable, false)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/46da01a4/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 4eeb515..6ffe314 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -49,6 +49,9 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val configProps1 = TestUtils.createBrokerConfig(brokerId1, zkConnect, enableControlledShutdown
= false)
     val configProps2 = TestUtils.createBrokerConfig(brokerId2, zkConnect, enableControlledShutdown
= false)
 
+    configProps1.put("unclean.leader.election.enable", "true")
+    configProps2.put("unclean.leader.election.enable", "true")
+
     // start both servers
     val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
     val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
@@ -72,7 +75,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val leader1 = createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(0,
1)), servers = servers)(0)
 
     val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
-    debug("leader Epoc: " + leaderEpoch1)
+    debug("leader Epoch: " + leaderEpoch1)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
     assertTrue("Leader should get elected", leader1.isDefined)
     // NOTE: this is to avoid transient test failures
@@ -86,7 +89,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
                                                     oldLeaderOpt = if(leader1.get == 0) None
else leader1)
     val leaderEpoch2 = zkUtils.getEpochForPartition(topic, partitionId)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
-    debug("leader Epoc: " + leaderEpoch2)
+    debug("leader Epoch: " + leaderEpoch2)
     assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
     if(leader1.get == leader2.get)
       assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2)
@@ -99,7 +102,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val leader3 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId,
                                                     oldLeaderOpt = if(leader2.get == 1) None
else leader2)
     val leaderEpoch3 = zkUtils.getEpochForPartition(topic, partitionId)
-    debug("leader Epoc: " + leaderEpoch3)
+    debug("leader Epoch: " + leaderEpoch3)
     debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))
     assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1))
     if(leader2.get == leader3.get)
@@ -118,7 +121,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val leader1 = createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(0,
1)), servers = servers)(0)
 
     val leaderEpoch1 = zkUtils.getEpochForPartition(topic, partitionId)
-    debug("leader Epoc: " + leaderEpoch1)
+    debug("leader Epoch: " + leaderEpoch1)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
     assertTrue("Leader should get elected", leader1.isDefined)
     // NOTE: this is to avoid transient test failures

http://git-wip-us.apache.org/repos/asf/kafka/blob/46da01a4/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index cc07728..ee8ab0e 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -53,6 +53,8 @@
 
 <h5><a id="upgrade_1100_notable" href="#upgrade_1100_notable">Notable changes
in 0.11.0.0</a></h5>
 <ul>
+    <li>Unclean leader election is now disabled by default. The new default favors
durability over availability. Users who wish to
+        to retain the previous behavior should set the broker config <code>unclean.leader.election.enabled</code>
to <code>false</code>.</li>
     <li>The <code>offsets.topic.replication.factor</code> broker config
is now enforced upon auto topic creation. Internal
         auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until
the cluster size meets this
         replication factor requirement.</li>


Mime
View raw message