kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joest...@apache.org
Subject [1/3] kafka git commit: KAFKA-1845 KafkaConfig should use ConfigDef patch by Andrii Biletskyi reviewed by Gwen Shapira
Date Thu, 05 Mar 2015 14:53:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3a9f4b833 -> 8f0003f9b


http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 1bf2667..8bc1785 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -55,10 +55,10 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
     // start all the servers
-    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
-    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
-    val server3 = TestUtils.createServer(new KafkaConfig(configProps3))
-    val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
+    val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
+    val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
+    val server3 = TestUtils.createServer(KafkaConfig.fromProps(configProps3))
+    val server4 = TestUtils.createServer(KafkaConfig.fromProps(configProps4))
 
     servers ++= List(server1, server2, server3, server4)
     brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.config.port))

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index e289798..ee0b21e 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -145,7 +145,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -176,7 +176,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -207,7 +207,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val expectedReplicaAssignment = Map(0  -> List(0, 1))
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // reassign partition 0
@@ -236,7 +236,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
   def testReassigningNonExistingPartition() {
     val topic = "test"
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val servers = TestUtils.createBrokerConfigs(4, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
@@ -262,7 +262,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
     reassignPartitionsCommand.reassignPartitions
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val servers = TestUtils.createBrokerConfigs(2, false).map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
 
     // wait until reassignment completes
     TestUtils.waitUntilTrue(() => !checkIfReassignPartitionPathExists(zkClient),
@@ -298,7 +298,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val partition = 1
     val preferredReplica = 0
     // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_))
+    val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(KafkaConfig.fromProps)
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
@@ -318,7 +318,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val topic = "test"
     val partition = 1
     // create brokers
-    val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(new KafkaConfig(_))
+    val serverConfigs = TestUtils.createBrokerConfigs(3, false).map(KafkaConfig.fromProps)
     val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
     // create the topic
     TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
@@ -365,7 +365,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
   def testTopicConfigChange() {
     val partitions = 3
     val topic = "my-topic"
-    val server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0)))
+    val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)))
 
     def makeConfig(messageSize: Int, retentionMs: Long) = {
       var props = new Properties()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
index d530338..1baff0e 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala
@@ -26,7 +26,7 @@ import kafka.integration.KafkaServerTestHarness
 
 
 class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness {
-  val configs = TestUtils.createBrokerConfigs(3, false, true).map(new KafkaConfig(_))
+  val configs = TestUtils.createBrokerConfigs(3, false, true).map(KafkaConfig.fromProps)
 
   @Test
   def testGroupWideDeleteInZK() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index c8f336a..6258983 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -99,7 +99,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val brokerConfigs = TestUtils.createBrokerConfigs(4, false)
     brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
-    val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
@@ -263,7 +263,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topicAndPartition = TopicAndPartition(topic, 0)
     // create brokers
-    val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index c0355cc..995397b 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -18,6 +18,7 @@
 
 package kafka.consumer
 
+import java.util.Properties
 import java.util.concurrent._
 import java.util.concurrent.atomic._
 import scala.collection._
@@ -36,11 +37,14 @@ import kafka.integration.KafkaServerTestHarness
 class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
 
   val numNodes = 1
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.ZkConnectProp, TestZKUtils.zookeeperConnect)
+
   val configs =
     for(props <- TestUtils.createBrokerConfigs(numNodes))
-    yield new KafkaConfig(props) {
-      override val zkConnect = TestZKUtils.zookeeperConnect
-    }
+    yield KafkaConfig.fromProps(props, overridingProps)
+
   val messages = new mutable.HashMap[Int, Seq[Message]]
   val topic = "topic"
   val group = "group1"

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index a17e853..19640cc 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -42,12 +42,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
+  overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+
   val configs =
-    for(props <- TestUtils.createBrokerConfigs(numNodes))
-    yield new KafkaConfig(props) {
-      override val zkConnect = zookeeperConnect
-      override val numPartitions = numParts
-    }
+    for (props <- TestUtils.createBrokerConfigs(numNodes))
+    yield KafkaConfig.fromProps(props, overridingProps)
+
   val group = "group1"
   val consumer0 = "consumer0"
   val consumer1 = "consumer1"

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
index 95303e0..ffa6c30 100644
--- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
@@ -31,7 +31,7 @@ import junit.framework.Assert._
 
 class AutoOffsetResetTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
 
-  val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0)))
+  val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0)))
 
   val topic = "test_topic"
   val group = "default_group"

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 25845ab..3093e45 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -36,7 +36,7 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
   val numNodes = 1
   val configs =
     for(props <- TestUtils.createBrokerConfigs(numNodes))
-    yield new KafkaConfig(props)
+    yield KafkaConfig.fromProps(props)
   val messages = new mutable.HashMap[Int, Seq[Array[Byte]]]
   val topic = "topic"
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index aeb7a19..30deaf4 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -42,7 +42,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
 
   val port = TestUtils.choosePort()
   val props = TestUtils.createBrokerConfig(0, port)
-  val config = new KafkaConfig(props)
+  val config = KafkaConfig.fromProps(props)
   val configs = List(config)
 
   def testFetchRequestCanProperlySerialize() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index eab4b5f..4d27e41 100644
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -49,10 +49,10 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
     // start all the servers
-    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
-    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
-    val server3 = TestUtils.createServer(new KafkaConfig(configProps3))
-    val server4 = TestUtils.createServer(new KafkaConfig(configProps4))
+    val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
+    val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
+    val server3 = TestUtils.createServer(KafkaConfig.fromProps(configProps3))
+    val server4 = TestUtils.createServer(KafkaConfig.fromProps(configProps4))
 
     servers ++= List(server1, server2, server3, server4)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 35dc071..a671af4 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -32,7 +32,7 @@ import kafka.client.ClientUtils
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
-  val configs = props.map(p => new KafkaConfig(p))
+  val configs = props.map(p => KafkaConfig.fromProps(p))
   private var server1: KafkaServer = null
   val brokers = configs.map(c => new Broker(c.brokerId,c.hostName,c.port))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index ba3bcdc..8342cae 100644
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -91,7 +91,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   private def startBrokers(cluster: Seq[Properties]) {
     for (props <- cluster) {
-      val config = new KafkaConfig(props)
+      val config = KafkaConfig.fromProps(props)
       val server = createServer(config)
       configs ++= List(config)
       servers ++= List(server)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index d6248b0..3d0fc9d 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.javaapi.consumer
 
+import java.util.Properties
+
 import kafka.server._
 import kafka.message._
 import kafka.serializer._
@@ -42,12 +44,15 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
+  overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+
   val configs =
-    for(props <- TestUtils.createBrokerConfigs(numNodes))
-    yield new KafkaConfig(props) {
-      override val numPartitions = numParts
-      override val zkConnect = zookeeperConnect
-    }
+    for (props <- TestUtils.createBrokerConfigs(numNodes))
+    yield KafkaConfig.fromProps(props, overridingProps)
+
   val group = "group1"
   val consumer1 = "consumer1"
   val nMessages = 2

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1a4be70..8cd5f2f 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -38,7 +38,7 @@ class LogTest extends JUnitSuite {
   def setUp() {
     logDir = TestUtils.tempDir()
     val props = TestUtils.createBrokerConfig(0, -1)
-    config = new KafkaConfig(props)
+    config = KafkaConfig.fromProps(props)
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index 4ea0489..36db917 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -57,7 +57,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
     val propsZk = TestUtils.createBrokerConfig(brokerZk, portZk)
     val logDirZkPath = propsZk.getProperty("log.dir")
     logDirZk = new File(logDirZkPath)
-    config = new KafkaConfig(propsZk)
+    config = KafkaConfig.fromProps(propsZk)
     server = TestUtils.createServer(config)
     simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64 * 1024, "")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 111e4a2..0f58ad8 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.consumer
 
+import java.util.Properties
+
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.MetricPredicate
 import org.junit.Test
@@ -38,12 +40,15 @@ class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
   val numNodes = 2
   val numParts = 2
   val topic = "topic1"
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.ZkConnectProp, zookeeperConnect)
+  overridingProps.put(KafkaConfig.NumPartitionsProp, numParts.toString)
+
   val configs =
-    for (props <- TestUtils.createBrokerConfigs(numNodes, enableDeleteTopic=true))
-    yield new KafkaConfig(props) {
-      override val zkConnect = zookeeperConnect
-      override val numPartitions = numParts
-    }
+    for (props <- TestUtils.createBrokerConfigs(numNodes, enableDeleteTopic = true))
+    yield KafkaConfig.fromProps(props, overridingProps)
+
   val nMessages = 2
 
   override def tearDown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 1db6ac3..be90c5b 100644
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -37,7 +37,7 @@ import kafka.utils._
 
 class AsyncProducerTest extends JUnit3Suite {
   val props = createBrokerConfigs(1)
-  val configs = props.map(p => new KafkaConfig(p))
+  val configs = props.map(p => KafkaConfig.fromProps(p))
 
   override def setUp() {
     super.setUp()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index ce65dab..d2f3851 100644
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -51,10 +51,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
   private val props1 = TestUtils.createBrokerConfig(brokerId1, port1, false)
   props1.put("num.partitions", "4")
-  private val config1 = new KafkaConfig(props1)
+  private val config1 = KafkaConfig.fromProps(props1)
   private val props2 = TestUtils.createBrokerConfig(brokerId2, port2, false)
   props2.put("num.partitions", "4")
-  private val config2 = new KafkaConfig(props2)
+  private val config2 = KafkaConfig.fromProps(props2)
 
   override def setUp() {
     super.setUp()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index d60d8e0..b5208a5 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -33,7 +33,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
   // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool.
-  val configs = List(new KafkaConfig(TestUtils.createBrokerConfigs(1, false).head))
+  val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, false).head))
   val zookeeperConnect = TestZKUtils.zookeeperConnect
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index f0c4a56..296e2b5 100644
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -34,7 +34,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
     props.put("advertised.host.name", advertisedHostName)
     props.put("advertised.port", advertisedPort.toString)
     
-    server = TestUtils.createServer(new KafkaConfig(props))
+    server = TestUtils.createServer(KafkaConfig.fromProps(props))
   }
 
   override def tearDown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index ad12116..93182ae 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -27,7 +27,7 @@ import org.scalatest.junit.JUnit3Suite
 
 class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
   
-  override val configs = List(new KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.choosePort)))
+  override val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.choosePort)))
 
   @Test
   def testConfigChange() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 8913fc1..0bdbc2f 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 class HighwatermarkPersistenceTest extends JUnit3Suite {
 
-  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_))
+  val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps)
   val topic = "foo"
   val logManagers = configs map { config =>
     TestUtils.createLogManager(

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index a703d27..9215235 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -16,6 +16,8 @@
 */
 package kafka.server
 
+import java.util.Properties
+
 import org.scalatest.junit.JUnit3Suite
 import collection.mutable.HashMap
 import collection.mutable.Map
@@ -29,11 +31,15 @@ import java.util.concurrent.atomic.AtomicBoolean
 class IsrExpirationTest extends JUnit3Suite {
 
   var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
-  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
-    override val replicaLagTimeMaxMs = 100L
-    override val replicaFetchWaitMaxMs = 100
-    override val replicaLagMaxMessages = 10L
-  })
+  val replicaLagTimeMaxMs = 100L
+  val replicaFetchWaitMaxMs = 100
+  val replicaLagMaxMessages = 10L
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
+  overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
+  overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString)
+  val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps))
   val topic = "foo"
 
   val time = new MockTime

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
new file mode 100644
index 0000000..c124c8d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import java.util.Properties
+
+import kafka.message._
+import kafka.server.{Defaults, KafkaConfig}
+import org.apache.kafka.common.config.ConfigException
+import org.junit.{Assert, Test}
+import org.scalatest.junit.JUnit3Suite
+
+import scala.collection.Map
+import scala.util.Random._
+
+class KafkaConfigConfigDefTest extends JUnit3Suite {
+
+  @Test
+  def testFromPropsDefaults() {
+    val defaults = new Properties()
+    defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+
+    // some ordinary setting
+    defaults.put(KafkaConfig.AdvertisedPortProp, "1818")
+
+    val props = new Properties(defaults)
+
+    val config = KafkaConfig.fromProps(props)
+
+    Assert.assertEquals(1818, config.advertisedPort)
+    Assert.assertEquals("KafkaConfig defaults should be retained", Defaults.ConnectionsMaxIdleMs, config.connectionsMaxIdleMs)
+  }
+
+  @Test
+  def testFromPropsEmpty() {
+    // only required
+    val p = new Properties()
+    p.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+    val actualConfig = KafkaConfig.fromProps(p)
+
+    val expectedConfig = new KafkaConfig(zkConnect = "127.0.0.1:2181")
+
+    Assert.assertEquals(expectedConfig.zkConnect, actualConfig.zkConnect)
+    Assert.assertEquals(expectedConfig.zkSessionTimeoutMs, actualConfig.zkSessionTimeoutMs)
+    Assert.assertEquals(expectedConfig.zkConnectionTimeoutMs, actualConfig.zkConnectionTimeoutMs)
+    Assert.assertEquals(expectedConfig.zkSyncTimeMs, actualConfig.zkSyncTimeMs)
+    Assert.assertEquals(expectedConfig.maxReservedBrokerId, actualConfig.maxReservedBrokerId)
+    Assert.assertEquals(expectedConfig.brokerId, actualConfig.brokerId)
+    Assert.assertEquals(expectedConfig.messageMaxBytes, actualConfig.messageMaxBytes)
+    Assert.assertEquals(expectedConfig.numNetworkThreads, actualConfig.numNetworkThreads)
+    Assert.assertEquals(expectedConfig.numIoThreads, actualConfig.numIoThreads)
+    Assert.assertEquals(expectedConfig.backgroundThreads, actualConfig.backgroundThreads)
+    Assert.assertEquals(expectedConfig.queuedMaxRequests, actualConfig.queuedMaxRequests)
+
+    Assert.assertEquals(expectedConfig.port, actualConfig.port)
+    Assert.assertEquals(expectedConfig.hostName, actualConfig.hostName)
+    Assert.assertEquals(expectedConfig.advertisedHostName, actualConfig.advertisedHostName)
+    Assert.assertEquals(expectedConfig.advertisedPort, actualConfig.advertisedPort)
+    Assert.assertEquals(expectedConfig.socketSendBufferBytes, actualConfig.socketSendBufferBytes)
+    Assert.assertEquals(expectedConfig.socketReceiveBufferBytes, actualConfig.socketReceiveBufferBytes)
+    Assert.assertEquals(expectedConfig.socketRequestMaxBytes, actualConfig.socketRequestMaxBytes)
+    Assert.assertEquals(expectedConfig.maxConnectionsPerIp, actualConfig.maxConnectionsPerIp)
+    Assert.assertEquals(expectedConfig.maxConnectionsPerIpOverrides, actualConfig.maxConnectionsPerIpOverrides)
+    Assert.assertEquals(expectedConfig.connectionsMaxIdleMs, actualConfig.connectionsMaxIdleMs)
+
+    Assert.assertEquals(expectedConfig.numPartitions, actualConfig.numPartitions)
+    Assert.assertEquals(expectedConfig.logDirs, actualConfig.logDirs)
+
+    Assert.assertEquals(expectedConfig.logSegmentBytes, actualConfig.logSegmentBytes)
+
+    Assert.assertEquals(expectedConfig.logRollTimeMillis, actualConfig.logRollTimeMillis)
+    Assert.assertEquals(expectedConfig.logRollTimeJitterMillis, actualConfig.logRollTimeJitterMillis)
+    Assert.assertEquals(expectedConfig.logRetentionTimeMillis, actualConfig.logRetentionTimeMillis)
+
+    Assert.assertEquals(expectedConfig.logRetentionBytes, actualConfig.logRetentionBytes)
+    Assert.assertEquals(expectedConfig.logCleanupIntervalMs, actualConfig.logCleanupIntervalMs)
+    Assert.assertEquals(expectedConfig.logCleanupPolicy, actualConfig.logCleanupPolicy)
+    Assert.assertEquals(expectedConfig.logCleanerThreads, actualConfig.logCleanerThreads)
+    Assert.assertEquals(expectedConfig.logCleanerIoMaxBytesPerSecond, actualConfig.logCleanerIoMaxBytesPerSecond, 0.0)
+    Assert.assertEquals(expectedConfig.logCleanerDedupeBufferSize, actualConfig.logCleanerDedupeBufferSize)
+    Assert.assertEquals(expectedConfig.logCleanerIoBufferSize, actualConfig.logCleanerIoBufferSize)
+    Assert.assertEquals(expectedConfig.logCleanerDedupeBufferLoadFactor, actualConfig.logCleanerDedupeBufferLoadFactor, 0.0)
+    Assert.assertEquals(expectedConfig.logCleanerBackoffMs, actualConfig.logCleanerBackoffMs)
+    Assert.assertEquals(expectedConfig.logCleanerMinCleanRatio, actualConfig.logCleanerMinCleanRatio, 0.0)
+    Assert.assertEquals(expectedConfig.logCleanerEnable, actualConfig.logCleanerEnable)
+    Assert.assertEquals(expectedConfig.logCleanerDeleteRetentionMs, actualConfig.logCleanerDeleteRetentionMs)
+    Assert.assertEquals(expectedConfig.logIndexSizeMaxBytes, actualConfig.logIndexSizeMaxBytes)
+    Assert.assertEquals(expectedConfig.logIndexIntervalBytes, actualConfig.logIndexIntervalBytes)
+    Assert.assertEquals(expectedConfig.logFlushIntervalMessages, actualConfig.logFlushIntervalMessages)
+    Assert.assertEquals(expectedConfig.logDeleteDelayMs, actualConfig.logDeleteDelayMs)
+    Assert.assertEquals(expectedConfig.logFlushSchedulerIntervalMs, actualConfig.logFlushSchedulerIntervalMs)
+    Assert.assertEquals(expectedConfig.logFlushIntervalMs, actualConfig.logFlushIntervalMs)
+    Assert.assertEquals(expectedConfig.logFlushOffsetCheckpointIntervalMs, actualConfig.logFlushOffsetCheckpointIntervalMs)
+    Assert.assertEquals(expectedConfig.numRecoveryThreadsPerDataDir, actualConfig.numRecoveryThreadsPerDataDir)
+    Assert.assertEquals(expectedConfig.autoCreateTopicsEnable, actualConfig.autoCreateTopicsEnable)
+
+    Assert.assertEquals(expectedConfig.minInSyncReplicas, actualConfig.minInSyncReplicas)
+
+    Assert.assertEquals(expectedConfig.controllerSocketTimeoutMs, actualConfig.controllerSocketTimeoutMs)
+    Assert.assertEquals(expectedConfig.controllerMessageQueueSize, actualConfig.controllerMessageQueueSize)
+    Assert.assertEquals(expectedConfig.defaultReplicationFactor, actualConfig.defaultReplicationFactor)
+    Assert.assertEquals(expectedConfig.replicaLagTimeMaxMs, actualConfig.replicaLagTimeMaxMs)
+    Assert.assertEquals(expectedConfig.replicaLagMaxMessages, actualConfig.replicaLagMaxMessages)
+    Assert.assertEquals(expectedConfig.replicaSocketTimeoutMs, actualConfig.replicaSocketTimeoutMs)
+    Assert.assertEquals(expectedConfig.replicaSocketReceiveBufferBytes, actualConfig.replicaSocketReceiveBufferBytes)
+    Assert.assertEquals(expectedConfig.replicaFetchMaxBytes, actualConfig.replicaFetchMaxBytes)
+    Assert.assertEquals(expectedConfig.replicaFetchWaitMaxMs, actualConfig.replicaFetchWaitMaxMs)
+    Assert.assertEquals(expectedConfig.replicaFetchMinBytes, actualConfig.replicaFetchMinBytes)
+    Assert.assertEquals(expectedConfig.numReplicaFetchers, actualConfig.numReplicaFetchers)
+    Assert.assertEquals(expectedConfig.replicaHighWatermarkCheckpointIntervalMs, actualConfig.replicaHighWatermarkCheckpointIntervalMs)
+    Assert.assertEquals(expectedConfig.fetchPurgatoryPurgeIntervalRequests, actualConfig.fetchPurgatoryPurgeIntervalRequests)
+    Assert.assertEquals(expectedConfig.producerPurgatoryPurgeIntervalRequests, actualConfig.producerPurgatoryPurgeIntervalRequests)
+    Assert.assertEquals(expectedConfig.autoLeaderRebalanceEnable, actualConfig.autoLeaderRebalanceEnable)
+    Assert.assertEquals(expectedConfig.leaderImbalancePerBrokerPercentage, actualConfig.leaderImbalancePerBrokerPercentage)
+    Assert.assertEquals(expectedConfig.leaderImbalanceCheckIntervalSeconds, actualConfig.leaderImbalanceCheckIntervalSeconds)
+    Assert.assertEquals(expectedConfig.uncleanLeaderElectionEnable, actualConfig.uncleanLeaderElectionEnable)
+
+    Assert.assertEquals(expectedConfig.controlledShutdownMaxRetries, actualConfig.controlledShutdownMaxRetries)
+    Assert.assertEquals(expectedConfig.controlledShutdownRetryBackoffMs, actualConfig.controlledShutdownRetryBackoffMs)
+    Assert.assertEquals(expectedConfig.controlledShutdownEnable, actualConfig.controlledShutdownEnable)
+
+    Assert.assertEquals(expectedConfig.offsetMetadataMaxSize, actualConfig.offsetMetadataMaxSize)
+    Assert.assertEquals(expectedConfig.offsetsLoadBufferSize, actualConfig.offsetsLoadBufferSize)
+    Assert.assertEquals(expectedConfig.offsetsTopicReplicationFactor, actualConfig.offsetsTopicReplicationFactor)
+    Assert.assertEquals(expectedConfig.offsetsTopicPartitions, actualConfig.offsetsTopicPartitions)
+    Assert.assertEquals(expectedConfig.offsetsTopicSegmentBytes, actualConfig.offsetsTopicSegmentBytes)
+    Assert.assertEquals(expectedConfig.offsetsTopicCompressionCodec, actualConfig.offsetsTopicCompressionCodec)
+    Assert.assertEquals(expectedConfig.offsetsRetentionMinutes, actualConfig.offsetsRetentionMinutes)
+    Assert.assertEquals(expectedConfig.offsetsRetentionCheckIntervalMs, actualConfig.offsetsRetentionCheckIntervalMs)
+    Assert.assertEquals(expectedConfig.offsetCommitTimeoutMs, actualConfig.offsetCommitTimeoutMs)
+    Assert.assertEquals(expectedConfig.offsetCommitRequiredAcks, actualConfig.offsetCommitRequiredAcks)
+
+    Assert.assertEquals(expectedConfig.deleteTopicEnable, actualConfig.deleteTopicEnable)
+    Assert.assertEquals(expectedConfig.compressionType, actualConfig.compressionType)
+  }
+
+  private def atLeastXIntProp(x: Int): String = (nextInt(Int.MaxValue - 1) + x).toString
+
+  private def atLeastOneIntProp: String = atLeastXIntProp(1)
+
+  private def inRangeIntProp(fromInc: Int, toInc: Int): String = (nextInt(toInc + 1 - fromInc) + fromInc).toString
+
+  @Test
+  def testFromPropsToProps() {
+    import scala.util.Random._
+    val expected = new Properties()
+    KafkaConfig.configNames().foreach(name => {
+      name match {
+        case KafkaConfig.ZkConnectProp => expected.setProperty(name, "127.0.0.1:2181")
+        case KafkaConfig.ZkSessionTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.ZkConnectionTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.ZkSyncTimeMsProp => expected.setProperty(name, atLeastOneIntProp)
+
+        case KafkaConfig.NumNetworkThreadsProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.NumIoThreadsProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.BackgroundThreadsProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.QueuedMaxRequestsProp => expected.setProperty(name, atLeastOneIntProp)
+
+        case KafkaConfig.PortProp => expected.setProperty(name, "1234")
+        case KafkaConfig.HostNameProp => expected.setProperty(name, nextString(10))
+        case KafkaConfig.AdvertisedHostNameProp => expected.setProperty(name, nextString(10))
+        case KafkaConfig.AdvertisedPortProp => expected.setProperty(name, "4321")
+        case KafkaConfig.SocketRequestMaxBytesProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.MaxConnectionsPerIpProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.MaxConnectionsPerIpOverridesProp => expected.setProperty(name, "127.0.0.1:2, 127.0.0.2:3")
+
+        case KafkaConfig.NumPartitionsProp => expected.setProperty(name, "2")
+        case KafkaConfig.LogDirsProp => expected.setProperty(name, "/tmp/logs,/tmp/logs2")
+        case KafkaConfig.LogDirProp => expected.setProperty(name, "/tmp/log")
+        case KafkaConfig.LogSegmentBytesProp => expected.setProperty(name, atLeastXIntProp(Message.MinHeaderSize))
+
+        case KafkaConfig.LogRollTimeMillisProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.LogRollTimeHoursProp => expected.setProperty(name, atLeastOneIntProp)
+
+        case KafkaConfig.LogRetentionTimeMillisProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.LogRetentionTimeMinutesProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.LogRetentionTimeHoursProp => expected.setProperty(name, atLeastOneIntProp)
+
+        case KafkaConfig.LogCleanupIntervalMsProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.LogCleanupPolicyProp => expected.setProperty(name, randFrom(Defaults.Compact, Defaults.Delete))
+        case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
+        case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
+        case KafkaConfig.LogCleanerMinCleanRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1))
+        case KafkaConfig.LogCleanerEnableProp => expected.setProperty(name, randFrom("true", "false"))
+        case KafkaConfig.LogIndexSizeMaxBytesProp => expected.setProperty(name, atLeastXIntProp(4))
+        case KafkaConfig.LogFlushIntervalMessagesProp => expected.setProperty(name, atLeastOneIntProp)
+
+        case KafkaConfig.NumRecoveryThreadsPerDataDirProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.AutoCreateTopicsEnableProp => expected.setProperty(name, randFrom("true", "false"))
+        case KafkaConfig.MinInSyncReplicasProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.AutoLeaderRebalanceEnableProp => expected.setProperty(name, randFrom("true", "false"))
+        case KafkaConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false"))
+        case KafkaConfig.ControlledShutdownEnableProp => expected.setProperty(name, randFrom("true", "false"))
+        case KafkaConfig.OffsetsLoadBufferSizeProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.OffsetsTopicPartitionsProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.OffsetsTopicSegmentBytesProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.OffsetsTopicCompressionCodecProp => expected.setProperty(name, randFrom(GZIPCompressionCodec.codec.toString,
+          SnappyCompressionCodec.codec.toString, LZ4CompressionCodec.codec.toString, NoCompressionCodec.codec.toString))
+        case KafkaConfig.OffsetsRetentionMinutesProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.OffsetCommitTimeoutMsProp => expected.setProperty(name, atLeastOneIntProp)
+        case KafkaConfig.DeleteTopicEnableProp => expected.setProperty(name, randFrom("true", "false"))
+
+        // explicit, non trivial validations or with transient dependencies
+
+        // require(brokerId >= -1 && brokerId <= maxReservedBrokerId)
+        case KafkaConfig.MaxReservedBrokerIdProp => expected.setProperty(name, "100")
+        case KafkaConfig.BrokerIdProp => expected.setProperty(name, inRangeIntProp(0, 100))
+        // require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024)
+        case KafkaConfig.LogCleanerThreadsProp =>  expected.setProperty(name, "2")
+        case KafkaConfig.LogCleanerDedupeBufferSizeProp => expected.setProperty(name, (1024 * 1024 * 3 + 1).toString)
+        // require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs)
+        case KafkaConfig.ReplicaFetchWaitMaxMsProp => expected.setProperty(name, "321")
+        case KafkaConfig.ReplicaSocketTimeoutMsProp => expected.setProperty(name, atLeastXIntProp(321))
+        // require(replicaFetchMaxBytes >= messageMaxBytes)
+        case KafkaConfig.MessageMaxBytesProp => expected.setProperty(name, "1234")
+        case KafkaConfig.ReplicaFetchMaxBytesProp => expected.setProperty(name, atLeastXIntProp(1234))
+        // require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs)
+        case KafkaConfig.ReplicaLagTimeMaxMsProp => expected.setProperty(name, atLeastXIntProp(321))
+        //require(offsetCommitRequiredAcks >= -1 && offsetCommitRequiredAcks <= offsetsTopicReplicationFactor)
+        case KafkaConfig.OffsetCommitRequiredAcksProp => expected.setProperty(name, "-1")
+        case KafkaConfig.OffsetsTopicReplicationFactorProp => expected.setProperty(name, inRangeIntProp(-1, Short.MaxValue))
+        //BrokerCompressionCodec.isValid(compressionType)
+        case KafkaConfig.CompressionTypeProp => expected.setProperty(name, randFrom(BrokerCompressionCodec.brokerCompressionOptions))
+
+        case nonNegativeIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString)
+      }
+    })
+
+    val actual = KafkaConfig.fromProps(expected).toProps
+    Assert.assertEquals(expected, actual)
+  }
+
+  @Test
+  def testFromPropsInvalid() {
+    def getBaseProperties(): Properties = {
+      val validRequiredProperties = new Properties()
+      validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1")
+      validRequiredProperties
+    }
+    // to ensure a basis is valid - bootstraps all needed validation
+    KafkaConfig.fromProps(getBaseProperties())
+
+    KafkaConfig.configNames().foreach(name => {
+      name match {
+        case KafkaConfig.ZkConnectProp => // ignore string
+        case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+
+        case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+
+        case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.HostNameProp => // ignore string
+        case KafkaConfig.AdvertisedHostNameProp => //ignore string
+        case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.MaxConnectionsPerIpOverridesProp =>
+          assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number")
+        case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+
+        case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.LogDirsProp => // ignore string
+        case KafkaConfig.LogDirProp => // ignore string
+        case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinHeaderSize - 1)
+
+        case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+
+        case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+
+        case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(getBaseProperties(), name, "unknown_policy", "0")
+        case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "1024")
+        case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
+        case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
+        case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
+        case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ControllerMessageQueueSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ReplicaLagMaxMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
+        case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
+        case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
+        case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
+        case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
+        case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
+        case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
+
+        case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
+
+        case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
+      }
+    })
+  }
+
+  @Test
+  def testSpecificProperties(): Unit = {
+    val defaults = new Properties()
+    defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+    // For ZkConnectionTimeoutMs
+    defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
+    defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
+    defaults.put(KafkaConfig.BrokerIdProp, "1")
+    defaults.put(KafkaConfig.HostNameProp, "127.0.0.1")
+    defaults.put(KafkaConfig.PortProp, "1122")
+    defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
+    defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
+    defaults.put(KafkaConfig.LogRollTimeHoursProp, "12")
+    defaults.put(KafkaConfig.LogRollTimeJitterHoursProp, "11")
+    defaults.put(KafkaConfig.LogRetentionTimeHoursProp, "10")
+    //For LogFlushIntervalMsProp
+    defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
+    defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
+
+    val config = KafkaConfig.fromProps(defaults)
+    Assert.assertEquals("127.0.0.1:2181", config.zkConnect)
+    Assert.assertEquals(1234, config.zkConnectionTimeoutMs)
+    Assert.assertEquals(1, config.maxReservedBrokerId)
+    Assert.assertEquals(1, config.brokerId)
+    Assert.assertEquals("127.0.0.1", config.hostName)
+    Assert.assertEquals(1122, config.advertisedPort)
+    Assert.assertEquals("127.0.0.1", config.advertisedHostName)
+    Assert.assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
+    Assert.assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
+    Assert.assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
+    Assert.assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
+    Assert.assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
+    Assert.assertEquals(123L, config.logFlushIntervalMs)
+    Assert.assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
+  }
+
+  private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) {
+    values.foreach((value) => {
+      val props = validRequiredProps
+      props.setProperty(name, value.toString)
+      intercept[Exception] {
+        KafkaConfig.fromProps(props)
+      }
+    })
+  }
+
+  private def randFrom[T](choices: T*): T = {
+    import scala.util.Random
+    choices(Random.nextInt(choices.size))
+  }
+
+  private def randFrom[T](choices: List[T]): T = {
+    import scala.util.Random
+    choices(Random.nextInt(choices.size))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 82dce80..7f47e6f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import org.apache.kafka.common.config.ConfigException
 import org.junit.Test
 import junit.framework.Assert._
 import org.scalatest.junit.JUnit3Suite
@@ -31,7 +32,7 @@ class KafkaConfigTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, 8181)
     props.put("log.retention.hours", "1")
 
-    val cfg = new KafkaConfig(props)
+    val cfg = KafkaConfig.fromProps(props)
     assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis)
 
   }
@@ -41,7 +42,7 @@ class KafkaConfigTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, 8181)
     props.put("log.retention.minutes", "30")
 
-    val cfg = new KafkaConfig(props)
+    val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
 
   }
@@ -51,7 +52,7 @@ class KafkaConfigTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, 8181)
     props.put("log.retention.ms", "1800000")
 
-    val cfg = new KafkaConfig(props)
+    val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
 
   }
@@ -60,7 +61,7 @@ class KafkaConfigTest extends JUnit3Suite {
   def testLogRetentionTimeNoConfigProvided() {
     val props = TestUtils.createBrokerConfig(0, 8181)
 
-    val cfg = new KafkaConfig(props)
+    val cfg = KafkaConfig.fromProps(props)
     assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis)
 
   }
@@ -71,7 +72,7 @@ class KafkaConfigTest extends JUnit3Suite {
     props.put("log.retention.minutes", "30")
     props.put("log.retention.hours", "1")
 
-    val cfg = new KafkaConfig(props)
+    val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
 
   }
@@ -82,7 +83,7 @@ class KafkaConfigTest extends JUnit3Suite {
     props.put("log.retention.ms", "1800000")
     props.put("log.retention.minutes", "10")
 
-    val cfg = new KafkaConfig(props)
+    val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
 
   }
@@ -95,7 +96,7 @@ class KafkaConfigTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, port)
     props.put("host.name", hostName)
     
-    val serverConfig = new KafkaConfig(props)
+    val serverConfig = KafkaConfig.fromProps(props)
     
     assertEquals(serverConfig.advertisedHostName, hostName)
     assertEquals(serverConfig.advertisedPort, port)
@@ -111,7 +112,7 @@ class KafkaConfigTest extends JUnit3Suite {
     props.put("advertised.host.name", advertisedHostName)
     props.put("advertised.port", advertisedPort.toString)
     
-    val serverConfig = new KafkaConfig(props)
+    val serverConfig = KafkaConfig.fromProps(props)
     
     assertEquals(serverConfig.advertisedHostName, advertisedHostName)
     assertEquals(serverConfig.advertisedPort, advertisedPort)
@@ -120,7 +121,7 @@ class KafkaConfigTest extends JUnit3Suite {
   @Test
   def testUncleanLeaderElectionDefault() {
     val props = TestUtils.createBrokerConfig(0, 8181)
-    val serverConfig = new KafkaConfig(props)
+    val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
   }
@@ -129,7 +130,7 @@ class KafkaConfigTest extends JUnit3Suite {
   def testUncleanElectionDisabled() {
     val props = TestUtils.createBrokerConfig(0, 8181)
     props.put("unclean.leader.election.enable", String.valueOf(false))
-    val serverConfig = new KafkaConfig(props)
+    val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.uncleanLeaderElectionEnable, false)
   }
@@ -138,7 +139,7 @@ class KafkaConfigTest extends JUnit3Suite {
   def testUncleanElectionEnabled() {
     val props = TestUtils.createBrokerConfig(0, 8181)
     props.put("unclean.leader.election.enable", String.valueOf(true))
-    val serverConfig = new KafkaConfig(props)
+    val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
   }
@@ -148,8 +149,8 @@ class KafkaConfigTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, 8181)
     props.put("unclean.leader.election.enable", "invalid")
 
-    intercept[IllegalArgumentException] {
-      new KafkaConfig(props)
+    intercept[ConfigException] {
+      KafkaConfig.fromProps(props)
     }
   }
   
@@ -158,7 +159,7 @@ class KafkaConfigTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, 8181)
     props.put("log.roll.ms", "1800000")
 
-    val cfg = new KafkaConfig(props)
+    val cfg = KafkaConfig.fromProps(props)
     assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis)
 
   }
@@ -169,7 +170,7 @@ class KafkaConfigTest extends JUnit3Suite {
     props.put("log.roll.ms", "1800000")
     props.put("log.roll.hours", "1")
 
-    val cfg = new KafkaConfig(props)
+    val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis)
 
   }
@@ -178,7 +179,7 @@ class KafkaConfigTest extends JUnit3Suite {
   def testLogRollTimeNoConfigProvided() {
     val props = TestUtils.createBrokerConfig(0, 8181)
 
-    val cfg = new KafkaConfig(props)
+    val cfg = KafkaConfig.fromProps(props)
     assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis																									)
 
   }
@@ -186,7 +187,7 @@ class KafkaConfigTest extends JUnit3Suite {
   @Test
   def testDefaultCompressionType() {
     val props = TestUtils.createBrokerConfig(0, 8181)
-    val serverConfig = new KafkaConfig(props)
+    val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.compressionType, "producer")
   }
@@ -195,7 +196,7 @@ class KafkaConfigTest extends JUnit3Suite {
   def testValidCompressionType() {
     val props = TestUtils.createBrokerConfig(0, 8181)
     props.put("compression.type", "gzip")
-    val serverConfig = new KafkaConfig(props)
+    val serverConfig = KafkaConfig.fromProps(props)
 
     assertEquals(serverConfig.compressionType, "gzip")
   }
@@ -205,7 +206,7 @@ class KafkaConfigTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(0, 8181)
     props.put("compression.type", "abc")
     intercept[IllegalArgumentException] {
-      new KafkaConfig(props)
+      KafkaConfig.fromProps(props)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index c2ba07c..f252805 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -43,8 +43,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
     // start both servers
-    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
-    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+    val server1 = TestUtils.createServer(KafkaConfig.fromProps(configProps1))
+    val server2 = TestUtils.createServer(KafkaConfig.fromProps(configProps2))
     servers ++= List(server1, server2)
   }
 
@@ -117,7 +117,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // start another controller
     val controllerId = 2
-    val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
+    val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
     val controllerContext = new ControllerContext(zkClient, 6000)
     controllerContext.liveBrokers = brokers.toSet

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index c06ee75..8c9f9e7 100644
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -50,7 +50,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     time = new MockTime()
-    server = TestUtils.createServer(new KafkaConfig(config), time)
+    server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
     simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index d5d351c..92d6b2c 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -16,6 +16,8 @@
 */
 package kafka.server
 
+import java.util.Properties
+
 import kafka.utils.TestUtils._
 import kafka.utils.IntEncoder
 import kafka.utils.{Utils, TestUtils}
@@ -31,12 +33,18 @@ import org.junit.Assert._
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
-  val configs = TestUtils.createBrokerConfigs(2, false).map(new KafkaConfig(_) {
-    override val replicaLagTimeMaxMs = 5000L
-    override val replicaLagMaxMessages = 10L
-    override val replicaFetchWaitMaxMs = 1000
-    override val replicaFetchMinBytes = 20
-  })
+  val replicaLagTimeMaxMs = 5000L
+  val replicaLagMaxMessages = 10L
+  val replicaFetchWaitMaxMs = 1000
+  val replicaFetchMinBytes = 20
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
+  overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString)
+  overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
+  overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, replicaFetchMinBytes.toString)
+
+  val configs = TestUtils.createBrokerConfigs(2, false).map(KafkaConfig.fromProps(_, overridingProps))
   val topic = "new-topic"
   val partitionId = 0
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index a37a74d..ea9b315 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -49,7 +49,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     time = new MockTime()
-    server = TestUtils.createServer(new KafkaConfig(config), time)
+    server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
     simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client")
     val consumerMetadataRequest = ConsumerMetadataRequest(group)
     Stream.continually {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index da4bafc..1e64faf 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -28,7 +28,7 @@ import kafka.common._
 
 class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
   val props = createBrokerConfigs(2,false)
-  val configs = props.map(p => new KafkaConfig(p))
+  val configs = props.map(p => KafkaConfig.fromProps(p))
   var brokers: Seq[KafkaServer] = null
   val topic1 = "foo"
   val topic2 = "bar"

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d1ed5c2..2849a5e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -39,7 +39,7 @@ class ReplicaManagerTest extends JUnit3Suite {
   @Test
   def testHighWaterMarkDirectoryMapping() {
     val props = TestUtils.createBrokerConfig(1)
-    val config = new KafkaConfig(props)
+    val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
@@ -56,7 +56,7 @@ class ReplicaManagerTest extends JUnit3Suite {
   def testHighwaterMarkRelativeDirectoryMapping() {
     val props = TestUtils.createBrokerConfig(1)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
-    val config = new KafkaConfig(props)
+    val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
@@ -72,7 +72,7 @@ class ReplicaManagerTest extends JUnit3Suite {
   @Test
   def testIllegalRequiredAcks() {
     val props = TestUtils.createBrokerConfig(1)
-    val config = new KafkaConfig(props)
+    val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index cf2dd94..96a8a5a 100644
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -25,9 +25,9 @@ import java.io.File
 
 class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
   var props1 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
-  var config1 = new KafkaConfig(props1)
+  var config1 = KafkaConfig.fromProps(props1)
   var props2 = TestUtils.createBrokerConfig(0, TestUtils.choosePort)
-  var config2 = new KafkaConfig(props2)
+  var config2 = KafkaConfig.fromProps(props2)
   val brokerMetaPropsFile = "meta.properties"
 
 
@@ -52,7 +52,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
     val server1 = new KafkaServer(config1)
     val server2 = new KafkaServer(config2)
     val props3 = TestUtils.createBrokerConfig(-1, TestUtils.choosePort)
-    val config3 = new KafkaConfig(props3)
+    val config3 = KafkaConfig.fromProps(props3)
     val server3 = new KafkaServer(config3)
     server1.startup()
     assertEquals(server1.config.brokerId,1001)
@@ -78,7 +78,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
     val logDirs = props1.getProperty("log.dir")+ "," + TestUtils.tempDir().getAbsolutePath +
     "," + TestUtils.tempDir().getAbsolutePath
     props1.setProperty("log.dir",logDirs)
-    config1 = new KafkaConfig(props1)
+    config1 = KafkaConfig.fromProps(props1)
     var server1 = new KafkaServer(config1)
     server1.startup()
     server1.shutdown()
@@ -86,7 +86,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
     // addition to log.dirs after generation of a broker.id from zk should be copied over
     val newLogDirs = props1.getProperty("log.dir") + "," + TestUtils.tempDir().getAbsolutePath
     props1.setProperty("log.dir",newLogDirs)
-    config1 = new KafkaConfig(props1)
+    config1 = KafkaConfig.fromProps(props1)
     server1 = new KafkaServer(config1)
     server1.startup()
     server1.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 82fa4cf..b46daa4 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -34,7 +34,7 @@ import junit.framework.Assert._
 class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
-  val config = new KafkaConfig(props)
+  val config = KafkaConfig.fromProps(props)
 
   val host = "localhost"
   val topic = "test"
@@ -105,7 +105,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testCleanShutdownWithDeleteTopicEnabled() {
     val newProps = TestUtils.createBrokerConfig(0, port)
     newProps.setProperty("delete.topic.enable", "true")
-    val newConfig = new KafkaConfig(newProps)
+    val newConfig = KafkaConfig.fromProps(newProps)
     val server = new KafkaServer(newConfig)
     server.startup()
     server.shutdown()
@@ -118,7 +118,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testCleanShutdownAfterFailedStartup() {
     val newProps = TestUtils.createBrokerConfig(0, port)
     newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
-    val newConfig = new KafkaConfig(newProps)
+    val newConfig = KafkaConfig.fromProps(newProps)
     val server = new KafkaServer(newConfig)
     try {
       server.startup()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 93af7df..60021ef 100644
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -33,7 +33,7 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
     val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
     val zooKeeperConnect = props.get("zookeeper.connect")
     props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
-    val server = TestUtils.createServer(new KafkaConfig(props))
+    val server = TestUtils.createServer(KafkaConfig.fromProps(props))
 
     val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
     assertTrue(pathExists)
@@ -48,12 +48,12 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     val brokerId = 0
     val props1 = TestUtils.createBrokerConfig(brokerId)
-    val server1 = TestUtils.createServer(new KafkaConfig(props1))
+    val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
     val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1
 
     val props2 = TestUtils.createBrokerConfig(brokerId)
     try {
-      TestUtils.createServer(new KafkaConfig(props2))
+      TestUtils.createServer(KafkaConfig.fromProps(props2))
       fail("Registering a broker with a conflicting id should fail")
     } catch {
       case e : RuntimeException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index fd8f32c..efb4573 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -24,7 +24,7 @@ import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
 
 import scala.Some
-import java.util.Collections
+import java.util.{Properties, Collections}
 import java.util.concurrent.atomic.AtomicBoolean
 import collection.JavaConversions._
 
@@ -35,11 +35,16 @@ import junit.framework.Assert._
 
 class SimpleFetchTest extends JUnit3Suite {
 
-  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
-    override val replicaLagTimeMaxMs = 100L
-    override val replicaFetchWaitMaxMs = 100
-    override val replicaLagMaxMessages = 10L
-  })
+  val replicaLagTimeMaxMs = 100L
+  val replicaFetchWaitMaxMs = 100
+  val replicaLagMaxMessages = 10L
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, replicaLagTimeMaxMs.toString)
+  overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, replicaFetchWaitMaxMs.toString)
+  overridingProps.put(KafkaConfig.ReplicaLagMaxMessagesProp, replicaLagMaxMessages.toString)
+
+  val configs = TestUtils.createBrokerConfigs(2).map(KafkaConfig.fromProps(_, overridingProps))
 
   // set the replica manager with the partition
   val time = new MockTime

http://git-wip-us.apache.org/repos/asf/kafka/blob/8f0003f9/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 305498a..2edc814 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -53,7 +53,7 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testUpdateLeaderAndIsr() {
-    val configs = TestUtils.createBrokerConfigs(1).map(new KafkaConfig(_))
+    val configs = TestUtils.createBrokerConfigs(1).map(KafkaConfig.fromProps)
     val log = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
     EasyMock.expect(log)


Mime
View raw message