kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: kafka-2249; KafkaConfig does not preserve original Properties; patched by Gwen Shapira; reviewed by Jun Rao
Date Thu, 18 Jun 2015 21:07:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ba86f0a25 -> 5c9040745


http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index 3fd5a53..c31f884 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -26,22 +26,9 @@ import org.scalatest.junit.JUnit3Suite
 class LogConfigTest extends JUnit3Suite {
 
   @Test
-  def testFromPropsDefaults() {
-    val defaults = new Properties()
-    defaults.put(LogConfig.SegmentBytesProp, "4242")
-    val props = new Properties(defaults)
-
-    val config = LogConfig.fromProps(props)
-
-    Assert.assertEquals(4242, config.segmentSize)
-    Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize,
config.maxMessageSize)
-    Assert.assertEquals("producer", config.compressionType)
-  }
-
-  @Test
   def testFromPropsEmpty() {
     val p = new Properties()
-    val config = LogConfig.fromProps(p)
+    val config = LogConfig(p)
     Assert.assertEquals(LogConfig(), config)
   }
 
@@ -62,7 +49,7 @@ class LogConfigTest extends JUnit3Suite {
       }
     })
 
-    val actual = LogConfig.fromProps(expected).toProps
+    val actual = LogConfig(expected).originals
     Assert.assertEquals(expected, actual)
   }
 
@@ -86,7 +73,7 @@ class LogConfigTest extends JUnit3Suite {
       val props = new Properties
       props.setProperty(name, value.toString)
       intercept[ConfigException] {
-        LogConfig.fromProps(props)
+        LogConfig(props)
       }
     })
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 01dfbc4..a13f2be 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -18,6 +18,7 @@
 package kafka.log
 
 import java.io._
+import java.util.Properties
 import junit.framework.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
@@ -30,7 +31,11 @@ class LogManagerTest extends JUnit3Suite {
   val time: MockTime = new MockTime()
   val maxRollInterval = 100
   val maxLogAgeMs = 10*60*60*1000
-  val logConfig = LogConfig(segmentSize = 1024, maxIndexSize = 4096, retentionMs = maxLogAgeMs)
+  val logProps = new Properties()
+  logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+  logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
+  logProps.put(LogConfig.RetentionMsProp, maxLogAgeMs: java.lang.Integer)
+  val logConfig = LogConfig(logProps)
   var logDir: File = null
   var logManager: LogManager = null
   val name = "kafka"
@@ -113,8 +118,11 @@ class LogManagerTest extends JUnit3Suite {
   def testCleanupSegmentsToMaintainSize() {
     val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
     logManager.shutdown()
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 10 * setSize: java.lang.Integer)
+    logProps.put(LogConfig.RetentionBytesProp, 5L * 10L * setSize + 10L: java.lang.Long)
+    val config = LogConfig.fromProps(logConfig.originals, logProps)
 
-    val config = logConfig.copy(segmentSize = 10 * setSize, retentionSize = 5L * 10L * setSize
+ 10L)
     logManager = createLogManager()
     logManager.startup
 
@@ -154,7 +162,10 @@ class LogManagerTest extends JUnit3Suite {
   @Test
   def testTimeBasedFlush() {
     logManager.shutdown()
-    val config = logConfig.copy(flushMs = 1000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.FlushMsProp, 1000: java.lang.Integer)
+    val config = LogConfig.fromProps(logConfig.originals, logProps)
+
     logManager = createLogManager()
     logManager.startup
     val log = logManager.createLog(TopicAndPartition(name, 0), config)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 8e095d6..a8e57c2 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -18,6 +18,7 @@
 package kafka.log
 
 import java.io._
+import java.util.Properties
 import java.util.concurrent.atomic._
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
@@ -61,9 +62,12 @@ class LogTest extends JUnitSuite {
   def testTimeBasedLogRoll() {
     val set = TestUtils.singleMessageSet("test".getBytes())
 
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60: java.lang.Long)
+
     // create a log
     val log = new Log(logDir,
-                      logConfig.copy(segmentMs = 1 * 60 * 60L),
+                      LogConfig(logProps),
                       recoveryPoint = 0L,
                       scheduler = time.scheduler,
                       time = time)
@@ -96,9 +100,12 @@ class LogTest extends JUnitSuite {
     val set = TestUtils.singleMessageSet("test".getBytes())
     val maxJitter = 20 * 60L
 
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60: java.lang.Long)
+    logProps.put(LogConfig.SegmentJitterMsProp, maxJitter: java.lang.Long)
     // create a log
     val log = new Log(logDir,
-      logConfig.copy(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter),
+      LogConfig(logProps),
       recoveryPoint = 0L,
       scheduler = time.scheduler,
       time = time)
@@ -123,8 +130,10 @@ class LogTest extends JUnitSuite {
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
 
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
     // create a log
-    val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint =
0L, time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -149,7 +158,9 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAppendAndReadWithSequentialOffsets() {
-    val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler,
time = time)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
     val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
 
     for(i <- 0 until messages.length)
@@ -168,7 +179,9 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAppendAndReadWithNonSequentialOffsets() {
-    val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler,
time = time)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
+    val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
     val messages = messageIds.map(id => new Message(id.toString.getBytes))
 
@@ -191,7 +204,9 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testReadAtLogGap() {
-    val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler,
time = time)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
 
     // keep appending until we have two segments with only a single message in the second
segment
     while(log.numberOfSegments == 1)
@@ -211,7 +226,9 @@ class LogTest extends JUnitSuite {
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler,
time = time)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024,
1000).messageSet.sizeInBytes)
     try {
       log.read(0, 1024)
@@ -234,7 +251,9 @@ class LogTest extends JUnitSuite {
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler,
time = time)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
     val numMessages = 100
     val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
     messageSets.foreach(log.append(_))
@@ -263,7 +282,9 @@ class LogTest extends JUnitSuite {
   @Test
   def testCompressedMessages() {
     /* this log should roll after every messageset */
-    val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler,
time = time)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3
*/
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes),
new Message("there".getBytes)))
@@ -286,7 +307,9 @@ class LogTest extends JUnitSuite {
     for(messagesToAppend <- List(0, 1, 25)) {
       logDir.mkdirs()
       // first test a log segment starting at 0
-      val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler,
time = time)
+      val logProps = new Properties()
+      logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+      val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler,
time = time)
       for(i <- 0 until messagesToAppend)
         log.append(TestUtils.singleMessageSet(i.toString.getBytes))
 
@@ -318,7 +341,9 @@ class LogTest extends JUnitSuite {
     val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes),
new Message("bethe".getBytes))
     // append messages to log
     val configSegmentSize = messageSet.sizeInBytes - 1
-    val log = new Log(logDir, logConfig.copy(segmentSize = configSegmentSize), recoveryPoint
= 0L, time.scheduler, time = time)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer)
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
 
     try {
       log.append(messageSet)
@@ -342,7 +367,10 @@ class LogTest extends JUnitSuite {
     val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage)
     val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage,
anotherKeyedMessage)
 
-    val log = new Log(logDir, logConfig.copy(compact = true), recoveryPoint = 0L, time.scheduler,
time)
+    val logProps = new Properties()
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time)
 
     try {
       log.append(messageSetWithUnkeyedMessage)
@@ -380,7 +408,9 @@ class LogTest extends JUnitSuite {
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
-    val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), recoveryPoint
= 0L, time.scheduler, time = time)
+    val logProps = new Properties()
+    logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer)
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
 
     // should be able to append the small message
     log.append(first)
@@ -401,7 +431,11 @@ class LogTest extends JUnitSuite {
     val messageSize = 100
     val segmentSize = 7 * messageSize
     val indexInterval = 3 * messageSize
-    val config = logConfig.copy(segmentSize = segmentSize, indexInterval = indexInterval,
maxIndexSize = 4096)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
+    val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
@@ -432,7 +466,11 @@ class LogTest extends JUnitSuite {
   def testIndexRebuild() {
     // publish the messages and close the log
     val numMessages = 200
-    val config = logConfig.copy(segmentSize = 200, indexInterval = 1)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+
+    val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
@@ -460,8 +498,11 @@ class LogTest extends JUnitSuite {
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
 
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+
     // create a log
-    val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint =
0L, scheduler = time.scheduler, time = time)
+    val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler,
time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     for (i<- 1 to msgPerSeg)
@@ -513,7 +554,9 @@ class LogTest extends JUnitSuite {
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
-    val config = logConfig.copy(segmentSize = segmentSize)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+    val config = LogConfig(logProps)
     val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time
= time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
     for (i<- 1 to msgPerSeg)
@@ -540,10 +583,12 @@ class LogTest extends JUnitSuite {
     val bogusIndex2 = Log.indexFilename(logDir, 5)
 
     val set = TestUtils.singleMessageSet("test".getBytes())
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     val log = new Log(logDir,
-                      logConfig.copy(segmentSize = set.sizeInBytes * 5,
-                                     maxIndexSize = 1000,
-                                     indexInterval = 1),
+                      LogConfig(logProps),
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
@@ -564,9 +609,11 @@ class LogTest extends JUnitSuite {
   @Test
   def testReopenThenTruncate() {
     val set = TestUtils.singleMessageSet("test".getBytes())
-    val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
-                                maxIndexSize = 1000,
-                                indexInterval = 10000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
+    val config = LogConfig(logProps)
 
     // create a log
     var log = new Log(logDir,
@@ -596,10 +643,13 @@ class LogTest extends JUnitSuite {
   def testAsyncDelete() {
     val set = TestUtils.singleMessageSet("test".getBytes())
     val asyncDeleteMs = 1000
-    val config = logConfig.copy(segmentSize = set.sizeInBytes * 5,
-                                fileDeleteDelayMs = asyncDeleteMs,
-                                maxIndexSize = 1000,
-                                indexInterval = 10000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
+    logProps.put(LogConfig.FileDeleteDelayMsProp, asyncDeleteMs: java.lang.Integer)
+    val config = LogConfig(logProps)
+
     val log = new Log(logDir,
                       config,
                       recoveryPoint = 0L,
@@ -634,7 +684,10 @@ class LogTest extends JUnitSuite {
   @Test
   def testOpenDeletesObsoleteFiles() {
     val set = TestUtils.singleMessageSet("test".getBytes())
-    val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, maxIndexSize = 1000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
+    val config = LogConfig(logProps)
     var log = new Log(logDir,
                       config,
                       recoveryPoint = 0L,
@@ -672,7 +725,11 @@ class LogTest extends JUnitSuite {
   @Test
   def testCorruptLog() {
     // append some messages to create some segments
-    val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize
= 1000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+    logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
+    val config = LogConfig(logProps)
     val set = TestUtils.singleMessageSet("test".getBytes())
     val recoveryPoint = 50L
     for(iteration <- 0 until 50) {
@@ -704,7 +761,11 @@ class LogTest extends JUnitSuite {
   @Test
   def testCleanShutdownFile() {
     // append some messages to create some segments
-    val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize
= 1000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer)
+    logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+    val config = LogConfig(logProps)
     val set = TestUtils.singleMessageSet("test".getBytes())
     val parentLogDir = logDir.getParentFile
     assertTrue("Data directory %s must exist", parentLogDir.isDirectory)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 7877f6c..8a871cf 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -16,6 +16,8 @@
  */
 package kafka.server
 
+import java.util.Properties
+
 import junit.framework.Assert._
 import org.junit.Test
 import kafka.integration.KafkaServerTestHarness
@@ -30,16 +32,19 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness
{
 
   @Test
   def testConfigChange() {
-    val oldVal = 100000
-    val newVal = 200000
+    val oldVal: java.lang.Long = 100000
+    val newVal: java.lang.Long = 200000
     val tp = TopicAndPartition("test", 0)
-    AdminUtils.createTopic(zkClient, tp.topic, 1, 1, LogConfig(flushInterval = oldVal).toProps)
+    val logProps = new Properties()
+    logProps.put(LogConfig.FlushMessagesProp, oldVal.toString)
+    AdminUtils.createTopic(zkClient, tp.topic, 1, 1, logProps)
     TestUtils.retry(10000) {
       val logOpt = this.servers(0).logManager.getLog(tp)
       assertTrue(logOpt.isDefined)
       assertEquals(oldVal, logOpt.get.config.flushInterval)
     }
-    AdminUtils.changeTopicConfig(zkClient, tp.topic, LogConfig(flushInterval = newVal).toProps)
+    logProps.put(LogConfig.FlushMessagesProp, newVal.toString)
+    AdminUtils.changeTopicConfig(zkClient, tp.topic, logProps)
     TestUtils.retry(10000) {
       assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval)
     }
@@ -49,7 +54,9 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness
{
   def testConfigChangeOnNonExistingTopic() {
     val topic = TestUtils.tempTopic
     try {
-      AdminUtils.changeTopicConfig(zkClient, topic, LogConfig(flushInterval = 10000).toProps)
+      val logProps = new Properties()
+      logProps.put(LogConfig.FlushMessagesProp, 10000: java.lang.Integer)
+      AdminUtils.changeTopicConfig(zkClient, topic, logProps)
       fail("Should fail with AdminOperationException for topic doesn't exist")
     } catch {
       case e: AdminOperationException => // expected

http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
index c487f36..8268852 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
@@ -30,29 +30,13 @@ import scala.util.Random._
 class KafkaConfigConfigDefTest extends JUnit3Suite {
 
   @Test
-  def testFromPropsDefaults() {
-    val defaults = new Properties()
-    defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
-
-    // some ordinary setting
-    defaults.put(KafkaConfig.AdvertisedPortProp, "1818")
-
-    val props = new Properties(defaults)
-
-    val config = KafkaConfig.fromProps(props)
-
-    Assert.assertEquals(1818, config.advertisedPort)
-    Assert.assertEquals("KafkaConfig defaults should be retained", Defaults.ConnectionsMaxIdleMs,
config.connectionsMaxIdleMs)
-  }
-
-  @Test
   def testFromPropsEmpty() {
     // only required
     val p = new Properties()
     p.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
     val actualConfig = KafkaConfig.fromProps(p)
 
-    val expectedConfig = new KafkaConfig(zkConnect = "127.0.0.1:2181")
+    val expectedConfig = new KafkaConfig(p)
 
     Assert.assertEquals(expectedConfig.zkConnect, actualConfig.zkConnect)
     Assert.assertEquals(expectedConfig.zkSessionTimeoutMs, actualConfig.zkSessionTimeoutMs)
@@ -252,7 +236,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
       }
     })
 
-    val actual = KafkaConfig.fromProps(expected).toProps
+    val actual = KafkaConfig.fromProps(expected).originals
     Assert.assertEquals(expected, actual)
   }
 


Mime
View raw message