kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1028 per topic configuration of preference for consistency over availability; reviewed by Neha Narkhede and Jay Kreps
Date Tue, 18 Mar 2014 01:41:53 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a670537aa -> 616086b90


KAFKA-1028 per topic configuration of preference for consistency over availability; reviewed
by Neha Narkhede and Jay Kreps


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/616086b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/616086b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/616086b9

Branch: refs/heads/trunk
Commit: 616086b909252eeee2cb05f379659a2de2c7bd83
Parents: a670537
Author: Andrew Olson <andrew.olson@cerner.com>
Authored: Mon Mar 17 18:39:57 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Mon Mar 17 18:40:59 2014 -0700

----------------------------------------------------------------------
 .../kafka/common/NoReplicaOnlineException.scala |  3 +-
 .../kafka/controller/KafkaController.scala      | 17 +++-
 .../controller/PartitionLeaderSelector.scala    | 22 +++++-
 core/src/main/scala/kafka/log/LogConfig.scala   | 81 +++++++++++++-------
 .../main/scala/kafka/server/KafkaConfig.scala   |  3 +
 .../kafka/server/ReplicaFetcherThread.scala     | 22 +++++-
 .../kafka/integration/RollingBounceTest.scala   |  3 +-
 .../unit/kafka/server/KafkaConfigTest.scala     | 37 ++++++++-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  4 +-
 9 files changed, 150 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/616086b9/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala
index a1e1279..b66c8fc 100644
--- a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala
+++ b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala
@@ -20,7 +20,8 @@ package kafka.common
 
 /**
  * This exception is thrown by the leader elector in the controller when leader election
fails for a partition since
- * all the replicas for a partition are offline
+ * all the leader candidate replicas for a partition are offline; the set of candidates may
or may not be limited
+ * to just the in sync replicas depending upon whether unclean leader election is allowed
to occur.
  */
 class NoReplicaOnlineException(message: String, cause: Throwable) extends RuntimeException(message,
cause) {
   def this(message: String) = this(message, null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/616086b9/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 5db24a7..7dc2718 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -21,10 +21,12 @@ import collection.Set
 import com.yammer.metrics.core.Gauge
 import java.lang.{IllegalStateException, Object}
 import java.util.concurrent.TimeUnit
+import kafka.admin.AdminUtils
 import kafka.admin.PreferredReplicaLeaderElectionCommand
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
+import kafka.log.LogConfig
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import kafka.utils.ZkUtils._
@@ -164,7 +166,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
   // kafka server
   private val autoRebalanceScheduler = new KafkaScheduler(1)
   var deleteTopicManager: TopicDeletionManager = null
-  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
+  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
@@ -972,8 +974,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
           if (leaderAndIsr.isr.contains(replicaId)) {
             // if the replica to be removed from the ISR is also the leader, set the new
leader value to -1
             val newLeader = if(replicaId == leaderAndIsr.leader) -1 else leaderAndIsr.leader
+            var newIsr = leaderAndIsr.isr.filter(b => b != replicaId)
+
+            // if the replica to be removed from the ISR is the last surviving member of
the ISR and unclean leader election
+            // is disallowed for the corresponding topic, then we must preserve the ISR membership
so that the replica can
+            // eventually be restored as the leader.
+            if (newIsr.isEmpty && !LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(zkClient,
+              topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+              info("Retaining last ISR %d of partition %s since unclean leader election is
disabled".format(replicaId, topicAndPartition))
+              newIsr = leaderAndIsr.isr
+            }
+
             val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch +
1,
-              leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
+              newIsr, leaderAndIsr.zkVersion + 1)
             // update the new leadership decision in zookeeper or retry
             val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
               zkClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/616086b9/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index fa29bbe..d3b25fa 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -16,9 +16,12 @@
  */
 package kafka.controller
 
+import kafka.admin.AdminUtils
 import kafka.api.LeaderAndIsr
+import kafka.log.LogConfig
 import kafka.utils.Logging
 import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException,
NoReplicaOnlineException}
+import kafka.server.KafkaConfig
 
 trait PartitionLeaderSelector {
 
@@ -37,12 +40,14 @@ trait PartitionLeaderSelector {
  * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):
  * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as
the new leader and the live
  *    isr as the new isr.
- * 2. Else, it picks some alive broker from the assigned replica list as the new leader and
the new isr.
- * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException
+ * 2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException.
+ * 3. Else, it picks some alive broker from the assigned replica list as the new leader and
the new isr.
+ * 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException
  * Replicas to receive LeaderAndIsr request = live assigned replicas
  * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
  */
-class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
+class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
+  extends PartitionLeaderSelector with Logging {
   this.logIdent = "[OfflinePartitionLeaderSelector]: "
 
   def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr):
(LeaderAndIsr, Seq[Int]) = {
@@ -54,6 +59,15 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext)
exten
         val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
         val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
           case true =>
+            // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is
not disallowed by the configuration
+            // for unclean leader election.
+            if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
+              topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+              throw new NoReplicaOnlineException(("No broker in ISR for partition " +
+                "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds))
+
+                " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
+            }
+
             debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned
replicas: %s"
               .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
             liveAssignedReplicas.isEmpty match {
@@ -77,7 +91,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext)
exten
         info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(),
topicAndPartition))
         (newLeaderAndIsr, liveAssignedReplicas)
       case None =>
-        throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition)
+ "replicas assigned to it")
+        throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to
it".format(topicAndPartition))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/616086b9/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 18c86fe..5746ad4 100644
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -21,6 +21,23 @@ import java.util.Properties
 import scala.collection._
 import kafka.common._
 
+object Defaults {
+  val SegmentSize = 1024 * 1024
+  val SegmentMs = Long.MaxValue
+  val FlushInterval = Long.MaxValue
+  val FlushMs = Long.MaxValue
+  val RetentionSize = Long.MaxValue
+  val RetentionMs = Long.MaxValue
+  val MaxMessageSize = Int.MaxValue
+  val MaxIndexSize = 1024 * 1024
+  val IndexInterval = 4096
+  val FileDeleteDelayMs = 60 * 1000L
+  val DeleteRetentionMs = 24 * 60 * 60 * 1000L
+  val MinCleanableDirtyRatio = 0.5
+  val Compact = false
+  val UncleanLeaderElectionEnable = true
+}
+
 /**
  * Configuration settings for a log
  * @param segmentSize The soft maximum for the size of a segment file in the log
@@ -35,20 +52,23 @@ import kafka.common._
  * @param deleteRetentionMs The time to retain delete markers in the log. Only applicable
for logs that are being compacted.
  * @param minCleanableRatio The ratio of bytes that are available for cleaning to the bytes
already cleaned
  * @param compact Should old segments in this log be deleted or deduplicated?
+ * @param uncleanLeaderElectionEnable Indicates whether unclean leader election is enabled;
actually a controller-level property
+ *   but included here for topic-specific configuration validation purposes
  */
-case class LogConfig(val segmentSize: Int = 1024*1024, 
-                     val segmentMs: Long = Long.MaxValue,
-                     val flushInterval: Long = Long.MaxValue, 
-                     val flushMs: Long = Long.MaxValue,
-                     val retentionSize: Long = Long.MaxValue,
-                     val retentionMs: Long = Long.MaxValue,
-                     val maxMessageSize: Int = Int.MaxValue,
-                     val maxIndexSize: Int = 1024*1024,
-                     val indexInterval: Int = 4096,
-                     val fileDeleteDelayMs: Long = 60*1000,
-                     val deleteRetentionMs: Long = 24 * 60 * 60 * 1000L,
-                     val minCleanableRatio: Double = 0.5,
-                     val compact: Boolean = false) {
+case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
+                     val segmentMs: Long = Defaults.SegmentMs,
+                     val flushInterval: Long = Defaults.FlushInterval,
+                     val flushMs: Long = Defaults.FlushMs,
+                     val retentionSize: Long = Defaults.RetentionSize,
+                     val retentionMs: Long = Defaults.RetentionMs,
+                     val maxMessageSize: Int = Defaults.MaxMessageSize,
+                     val maxIndexSize: Int = Defaults.MaxIndexSize,
+                     val indexInterval: Int = Defaults.IndexInterval,
+                     val fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs,
+                     val deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
+                     val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
+                     val compact: Boolean = Defaults.Compact,
+                     val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable)
{
   
   def toProps: Properties = {
     val props = new Properties()
@@ -66,6 +86,7 @@ case class LogConfig(val segmentSize: Int = 1024*1024,
     props.put(FileDeleteDelayMsProp, fileDeleteDelayMs.toString)
     props.put(MinCleanableDirtyRatioProp, minCleanableRatio.toString)
     props.put(CleanupPolicyProp, if(compact) "compact" else "delete")
+    props.put(UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
     props
   }
   
@@ -85,6 +106,7 @@ object LogConfig {
   val FileDeleteDelayMsProp = "file.delete.delay.ms"
   val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
   val CleanupPolicyProp = "cleanup.policy"
+  val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
   
   val ConfigNames = Set(SegmentBytesProp, 
                         SegmentMsProp, 
@@ -98,26 +120,31 @@ object LogConfig {
                         FileDeleteDelayMsProp,
                         DeleteRetentionMsProp,
                         MinCleanableDirtyRatioProp,
-                        CleanupPolicyProp)
+                        CleanupPolicyProp,
+                        UncleanLeaderElectionEnableProp)
     
   
   /**
    * Parse the given properties instance into a LogConfig object
    */
   def fromProps(props: Properties): LogConfig = {
-    new LogConfig(segmentSize = props.getProperty(SegmentBytesProp).toInt,
-                  segmentMs = props.getProperty(SegmentMsProp).toLong,
-                  maxIndexSize = props.getProperty(SegmentIndexBytesProp).toInt,
-                  flushInterval = props.getProperty(FlushMessagesProp).toLong,
-                  flushMs = props.getProperty(FlushMsProp).toLong,
-                  retentionSize = props.getProperty(RetentionBytesProp).toLong,
-                  retentionMs = props.getProperty(RententionMsProp).toLong,
-                  maxMessageSize = props.getProperty(MaxMessageBytesProp).toInt,
-                  indexInterval = props.getProperty(IndexIntervalBytesProp).toInt,
-                  fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp).toInt,
-                  deleteRetentionMs = props.getProperty(DeleteRetentionMsProp).toLong,
-                  minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp).toDouble,
-                  compact = props.getProperty(CleanupPolicyProp).trim.toLowerCase != "delete")
+    new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, Defaults.SegmentSize.toString).toInt,
+                  segmentMs = props.getProperty(SegmentMsProp, Defaults.SegmentMs.toString).toLong,
+                  maxIndexSize = props.getProperty(SegmentIndexBytesProp, Defaults.MaxIndexSize.toString).toInt,
+                  flushInterval = props.getProperty(FlushMessagesProp, Defaults.FlushInterval.toString).toLong,
+                  flushMs = props.getProperty(FlushMsProp, Defaults.FlushMs.toString).toLong,
+                  retentionSize = props.getProperty(RetentionBytesProp, Defaults.RetentionSize.toString).toLong,
+                  retentionMs = props.getProperty(RententionMsProp, Defaults.RetentionMs.toString).toLong,
+                  maxMessageSize = props.getProperty(MaxMessageBytesProp, Defaults.MaxMessageSize.toString).toInt,
+                  indexInterval = props.getProperty(IndexIntervalBytesProp, Defaults.IndexInterval.toString).toInt,
+                  fileDeleteDelayMs = props.getProperty(FileDeleteDelayMsProp, Defaults.FileDeleteDelayMs.toString).toInt,
+                  deleteRetentionMs = props.getProperty(DeleteRetentionMsProp, Defaults.DeleteRetentionMs.toString).toLong,
+                  minCleanableRatio = props.getProperty(MinCleanableDirtyRatioProp,
+                    Defaults.MinCleanableDirtyRatio.toString).toDouble,
+                  compact = props.getProperty(CleanupPolicyProp, if(Defaults.Compact) "compact"
else "delete")
+                    .trim.toLowerCase != "delete",
+                  uncleanLeaderElectionEnable = props.getProperty(UncleanLeaderElectionEnableProp,
+                    Defaults.UncleanLeaderElectionEnable.toString).toBoolean)
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/616086b9/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d07796e..08de0ef 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -231,6 +231,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the frequency with which the partition rebalance check is triggered by the controller
*/
   val leaderImbalanceCheckIntervalSeconds = props.getInt("leader.imbalance.check.interval.seconds",
300)
 
+  /* indicates whether to enable replicas not in the ISR set to be elected as leader as a
last resort, even though
+   * doing so may result in data loss */
+  val uncleanLeaderElectionEnable = props.getBoolean("unclean.leader.election.enable", true)
 
   /*********** Controlled shutdown configuration ***********/
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/616086b9/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 73e605e..75ae1e1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -17,7 +17,9 @@
 
 package kafka.server
 
+import kafka.admin.AdminUtils
 import kafka.cluster.Broker
+import kafka.log.LogConfig
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{OffsetRequest, FetchResponsePartitionData}
 import kafka.common.{KafkaStorageException, TopicAndPartition}
@@ -81,9 +83,21 @@ class ReplicaFetcherThread(name:String,
      */
     val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime,
brokerConfig.brokerId)
     if (leaderEndOffset < replica.logEndOffset) {
+      // Prior to truncating the follower's log, ensure that doing so is not disallowed by
the configuration for unclean leader election.
+      // This situation could only happen if the unclean election configuration for a topic
changes while a replica is down. Otherwise,
+      // we should never encounter this situation since a non-ISR leader cannot be elected
if disallowed by the broker configuration.
+      if (!LogConfig.fromProps(brokerConfig.props.props, AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
+        topicAndPartition.topic)).uncleanLeaderElectionEnable) {
+        // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly
occur.
+        fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic)
+
+          " Current leader %d's latest offset %d is less than replica %d's latest offset
%d"
+          .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset))
+        Runtime.getRuntime.halt(1)
+      }
+
       replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset))
-      warn("Replica %d for partition %s reset its fetch offset to current leader %d's latest
offset %d"
-        .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderEndOffset))
+      warn("Replica %d for partition %s reset its fetch offset from %d to current leader
%d's latest offset %d"
+        .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id,
leaderEndOffset))
       leaderEndOffset
     } else {
       /**
@@ -94,8 +108,8 @@ class ReplicaFetcherThread(name:String,
        */
       val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime,
brokerConfig.brokerId)
       replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
-      warn("Replica %d for partition %s reset its fetch offset to current leader %d's start
offset %d"
-        .format(brokerConfig.brokerId, topicAndPartition, sourceBroker.id, leaderStartOffset))
+      warn("Replica %d for partition %s reset its fetch offset from %d to current leader
%d's start offset %d"
+        .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id,
leaderStartOffset))
       leaderStartOffset
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/616086b9/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index b585f0e..e86ee80 100644
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.server
+package kafka.integration
 
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
@@ -27,6 +27,7 @@ import kafka.cluster.Broker
 import kafka.common.ErrorMapping
 import kafka.api._
 import kafka.admin.AdminUtils
+import kafka.server.{KafkaConfig, KafkaServer}
 
 class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
   val brokerId1 = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/616086b9/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 89c207a..6f4809d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -93,5 +93,40 @@ class KafkaConfigTest extends JUnit3Suite {
     assertEquals(serverConfig.advertisedHostName, advertisedHostName)
     assertEquals(serverConfig.advertisedPort, advertisedPort)
   }
-  
+
+  @Test
+  def testUncleanLeaderElectionDefault() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    val serverConfig = new KafkaConfig(props)
+
+    assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
+  }
+
+  @Test
+  def testUncleanElectionDisabled() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("unclean.leader.election.enable", String.valueOf(false))
+    val serverConfig = new KafkaConfig(props)
+
+    assertEquals(serverConfig.uncleanLeaderElectionEnable, false)
+  }
+
+  @Test
+  def testUncleanElectionEnabled() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("unclean.leader.election.enable", String.valueOf(true))
+    val serverConfig = new KafkaConfig(props)
+
+    assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
+  }
+
+  @Test
+  def testUncleanElectionInvalid() {
+    val props = TestUtils.createBrokerConfig(0, 8181)
+    props.put("unclean.leader.election.enable", "invalid")
+
+    intercept[IllegalArgumentException] {
+      new KafkaConfig(props)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/616086b9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 772d214..2054c25 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -312,8 +312,8 @@ object TestUtils extends Logging {
    */
   def createProducer[K, V](brokerList: String, 
                            encoder: Encoder[V] = new DefaultEncoder(), 
-                           keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V]
= {
-    val props = new Properties()
+                           keyEncoder: Encoder[K] = new DefaultEncoder(),
+                           props: Properties = new Properties()): Producer[K, V] = {
     props.put("metadata.broker.list", brokerList)
     props.put("send.buffer.bytes", "65536")
     props.put("connect.timeout.ms", "100000")


Mime
View raw message