kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4340; Change default message.timestamp.difference.max.ms to the same as log.retention.ms
Date Sun, 12 Feb 2017 05:34:03 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 601d4040d -> 8bc9d5839


KAFKA-4340; Change default message.timestamp.difference.max.ms to the same as log.retention.ms

Author: Jiangjie Qin <becket.qin@gmail.com>
Author: Jiangjie (Becket) Qin <becket.qin@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava
<ewen@confluent.io>

Closes #2071 from becketqin/KAFKA-4340


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8bc9d583
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8bc9d583
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8bc9d583

Branch: refs/heads/trunk
Commit: 8bc9d583930110cf209f8ab20f9ff640ace3ed35
Parents: 601d404
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Sat Feb 11 21:33:55 2017 -0800
Committer: Jiangjie Qin <becket.qin@gmail.com>
Committed: Sat Feb 11 21:33:55 2017 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/server/ConfigHandler.scala | 34 ++++++---
 .../main/scala/kafka/server/KafkaConfig.scala   |  5 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |  1 +
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  1 +
 .../scala/unit/kafka/log/LogManagerTest.scala   |  1 +
 .../src/test/scala/unit/kafka/log/LogTest.scala | 79 +++++++++++---------
 .../scala/unit/kafka/server/LogOffsetTest.scala |  1 +
 .../unit/kafka/server/ReplicaManagerTest.scala  | 10 ++-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  1 +
 9 files changed, 84 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8bc9d583/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 56d53ff..e8f13d7 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -30,7 +30,9 @@ import org.apache.kafka.common.config.ConfigDef.Validator
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.metrics.Quota._
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
   * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
@@ -46,15 +48,8 @@ trait ConfigHandler {
 class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaConfig, val
quotas: QuotaManagers) extends ConfigHandler with Logging  {
 
   def processConfigChanges(topic: String, topicConfig: Properties) {
-    // Validate the compatibility of message format version.
-    val configNameToExclude = Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).flatMap
{ versionString =>
-      if (kafkaConfig.interBrokerProtocolVersion < ApiVersion(versionString)) {
-        warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for `$topic`
because `$versionString` " +
-          s"is not compatible with Kafka inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`")
-        Some(LogConfig.MessageFormatVersionProp)
-      } else
-        None
-    }
+    // Validate the configurations.
+    val configNamesToExclude = excludedConfigs(topic, topicConfig)
 
     val logs = logManager.logsByTopicPartition.filterKeys(_.topic == topic).values.toBuffer
     if (logs.nonEmpty) {
@@ -62,9 +57,15 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig:
KafkaC
       val props = new Properties()
       props.putAll(logManager.defaultConfig.originals)
       topicConfig.asScala.foreach { case (key, value) =>
-        if (key != configNameToExclude) props.put(key, value)
+        if (!configNamesToExclude.contains(key)) props.put(key, value)
       }
       val logConfig = LogConfig(props)
+      if ((topicConfig.containsKey(LogConfig.RetentionMsProp) 
+        || topicConfig.containsKey(LogConfig.MessageTimestampDifferenceMaxMsProp))
+        && logConfig.retentionMs < logConfig.messageTimestampDifferenceMaxMs)
+        warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}.
It is smaller than " + 
+          s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${logConfig.messageTimestampDifferenceMaxMs}.
" +
+          s"This may result in potential frequent log rolling.")
       logs.foreach(_.config = logConfig)
     }
 
@@ -95,6 +96,19 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig:
KafkaC
         .map(_ (0).toInt).toSeq //convert to list of partition ids
     }
   }
+  
+  def excludedConfigs(topic: String, topicConfig: Properties): Set[String] = {
+    val excludeConfigs: mutable.Set[String] = new mutable.HashSet[String]
+    // Verify message format version
+    Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).foreach { versionString
=>
+      if (kafkaConfig.interBrokerProtocolVersion < ApiVersion(versionString)) {
+        warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for `$topic`
because `$versionString` " +
+          s"is not compatible with Kafka inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`")
+        excludeConfigs += LogConfig.MessageFormatVersionProp
+      }
+    }
+    excludeConfigs.toSet
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bc9d583/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7946475..ab7e1ae 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -105,7 +105,7 @@ object Defaults {
   // lazy val as `InterBrokerProtocolVersion` is defined later
   lazy val LogMessageFormatVersion = InterBrokerProtocolVersion
   val LogMessageTimestampType = "CreateTime"
-  val LogMessageTimestampDifferenceMaxMs = Long.MaxValue
+  val LogMessageTimestampDifferenceMaxMs = LogRetentionHours * 60 * 60 * 1000L
   val NumRecoveryThreadsPerDataDir = 1
   val AutoCreateTopicsEnable = true
   val MinInSyncReplicas = 1
@@ -478,7 +478,8 @@ object KafkaConfig {
 
   val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the
timestamp when a broker receives " +
     "a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime,
a message will be rejected " +
-    "if the difference in timestamp exceeds this threshold. This configuration is ignored
if log.message.timestamp.type=LogAppendTime."
+    "if the difference in timestamp exceeds this threshold. This configuration is ignored
if log.message.timestamp.type=LogAppendTime." +
+    "The maximum timestamp difference allowed should be no greater than log.retention.ms
to avoid unnecessarily frequent log rolling."
   val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used
for log recovery at startup and flushing at shutdown"
   val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
   val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bc9d583/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 74a1828..6a097f8 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -303,6 +303,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     props.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
     props.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
+    props.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
     props.putAll(propertyOverrides)
     props
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bc9d583/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 40691b9..cf1e4cb 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -45,6 +45,7 @@ class LogCleanerTest extends JUnitSuite {
   logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
   logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
   logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+  logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
   val logConfig = LogConfig(logProps)
   val time = new MockTime()
   val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue,
time = time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bc9d583/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index ab577ce..a8e953a 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -38,6 +38,7 @@ class LogManagerTest {
   logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
   logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer)
   logProps.put(LogConfig.RetentionMsProp, maxLogAgeMs: java.lang.Integer)
+  logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
   val logConfig = LogConfig(logProps)
   var logDir: File = null
   var logManager: LogManager = null

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bc9d583/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 698f8e5..1831a49 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -68,6 +68,7 @@ class LogTest extends JUnitSuite {
 
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentMsProp, (1 * 60 * 60L): java.lang.Long)
+    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
 
     // create a log
     val log = new Log(logDir,
@@ -122,7 +123,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testTimeBasedLogRollJitter() {
-    val set = TestUtils.singletonRecords("test".getBytes)
+    var set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val maxJitter = 20 * 60L
 
     val logProps = new Properties()
@@ -138,9 +139,11 @@ class LogTest extends JUnitSuite {
     log.append(set)
 
     time.sleep(log.config.segmentMs - maxJitter)
+    set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     log.append(set)
     assertEquals("Log does not roll on this append because it occurs earlier than max jitter",
1, log.numberOfSegments)
     time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
+    set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     log.append(set)
     assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments)
   }
@@ -150,7 +153,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testSizeBasedLogRoll() {
-    val set = TestUtils.singletonRecords("test".getBytes)
+    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
@@ -177,7 +180,7 @@ class LogTest extends JUnitSuite {
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
     val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time)
-    log.append(TestUtils.singletonRecords("test".getBytes))
+    log.append(TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds))
   }
 
   /**
@@ -190,7 +193,7 @@ class LogTest extends JUnitSuite {
     // We use need to use magic value 1 here because the test is message size sensitive.
     logProps.put(LogConfig.MessageFormatVersionProp, ApiVersion.latestVersion.toString)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
-    val records = (0 until 100 by 2).map(id => Record.create(id.toString.getBytes)).toArray
+    val records = (0 until 100 by 2).map(id => Record.create(time.milliseconds, null,
id.toString.getBytes)).toArray
 
     for(i <- records.indices)
       log.append(MemoryRecords.withRecords(records(i)))
@@ -213,7 +216,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
-    val records = messageIds.map(id => Record.create(id.toString.getBytes))
+    val records = messageIds.map(id => Record.create(time.milliseconds, null, id.toString.getBytes))
 
     // now test the case that we give the offsets and use non-sequential offsets
     for(i <- records.indices)
@@ -240,7 +243,7 @@ class LogTest extends JUnitSuite {
 
     // keep appending until we have two segments with only a single message in the second
segment
     while(log.numberOfSegments == 1)
-      log.append(MemoryRecords.withRecords(Record.create("42".getBytes)))
+      log.append(MemoryRecords.withRecords(Record.create(time.milliseconds, null, "42".getBytes)))
 
     // now manually truncate off all but one message from the first segment to create a gap
in the messages
     log.logSegments.head.truncateTo(1)
@@ -255,7 +258,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
-    val records = messageIds.map(id => Record.create(id.toString.getBytes))
+    val records = messageIds.map(id => Record.create(time.milliseconds, null, id.toString.getBytes))
 
     // now test the case that we give the offsets and use non-sequential offsets
     for (i <- records.indices)
@@ -284,7 +287,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer)
     val log = new Log(logDir,  LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
     val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
-    val records = messageIds.map(id => Record.create(id.toString.getBytes))
+    val records = messageIds.map(id => Record.create(time.milliseconds, null, id.toString.getBytes))
 
     // now test the case that we give the offsets and use non-sequential offsets
     for (i <- records.indices)
@@ -319,7 +322,7 @@ class LogTest extends JUnitSuite {
     // set up replica log starting with offset 1024 and with one message (at offset 1024)
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
-    log.append(MemoryRecords.withRecords(Record.create("42".getBytes)))
+    log.append(MemoryRecords.withRecords(Record.create(time.milliseconds, null, "42".getBytes)))
 
     assertEquals("Reading at the log end offset should produce 0 byte read.", 0, log.read(1025,
1000).records.sizeInBytes)
 
@@ -351,7 +354,8 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
     val numMessages = 100
-    val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(i.toString.getBytes))
+    val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value
= i.toString.getBytes, 
+                                                                                timestamp
= time.milliseconds))
     messageSets.foreach(log.append(_))
     log.flush()
 
@@ -385,8 +389,12 @@ class LogTest extends JUnitSuite {
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3
*/
-    log.append(MemoryRecords.withRecords(CompressionType.GZIP, Record.create("hello".getBytes),
Record.create("there".getBytes)))
-    log.append(MemoryRecords.withRecords(CompressionType.GZIP, Record.create("alpha".getBytes),
Record.create("beta".getBytes)))
+    log.append(MemoryRecords.withRecords(CompressionType.GZIP, 
+                                         Record.create(time.milliseconds, null, "hello".getBytes),

+                                         Record.create(time.milliseconds, null, "there".getBytes)))
+    log.append(MemoryRecords.withRecords(CompressionType.GZIP, 
+                                         Record.create(time.milliseconds, null, "alpha".getBytes),

+                                         Record.create(time.milliseconds, null, "beta".getBytes)))
 
     def read(offset: Int) = log.read(offset, 4096).records.deepEntries.iterator
 
@@ -424,7 +432,7 @@ class LogTest extends JUnitSuite {
       assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
       assertEquals("Should still be able to append and should get the logEndOffset assigned
to the new append",
                    currOffset,
-                   log.append(TestUtils.singletonRecords("hello".getBytes)).firstOffset)
+                   log.append(TestUtils.singletonRecords(value = "hello".getBytes, timestamp
= time.milliseconds)).firstOffset)
 
       // cleanup the log
       log.delete()
@@ -437,7 +445,8 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testMessageSetSizeCheck() {
-    val messageSet = MemoryRecords.withRecords(Record.create("You".getBytes), Record.create("bethe".getBytes))
+    val messageSet = MemoryRecords.withRecords(Record.create(time.milliseconds, null, "You".getBytes),

+                                               Record.create(time.milliseconds, null, "bethe".getBytes))
     // append messages to log
     val configSegmentSize = messageSet.sizeInBytes - 1
     val logProps = new Properties()
@@ -456,8 +465,8 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testCompactedTopicConstraints() {
-    val keyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, Record.NO_TIMESTAMP, "and
here it is".getBytes, "this message has a key".getBytes)
-    val anotherKeyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, Record.NO_TIMESTAMP,
"another key".getBytes, "this message also has a key".getBytes)
+    val keyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, time.milliseconds, "and
here it is".getBytes, "this message has a key".getBytes)
+    val anotherKeyedMessage = Record.create(Record.CURRENT_MAGIC_VALUE, time.milliseconds,
"another key".getBytes, "this message also has a key".getBytes)
     val unkeyedMessage = Record.create("this message does not have a key".getBytes)
 
     val messageSetWithUnkeyedMessage = MemoryRecords.withRecords(CompressionType.NONE, unkeyedMessage,
keyedMessage)
@@ -504,8 +513,11 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testMessageSizeCheck() {
-    val first = MemoryRecords.withRecords(CompressionType.NONE, Record.create("You".getBytes),
Record.create("bethe".getBytes))
-    val second = MemoryRecords.withRecords(CompressionType.NONE, Record.create("change (I
need more bytes)".getBytes))
+    val first = MemoryRecords.withRecords(CompressionType.NONE, 
+                                          Record.create(time.milliseconds, null, "You".getBytes),

+                                          Record.create(time.milliseconds, null, "bethe".getBytes))
+    val second = MemoryRecords.withRecords(CompressionType.NONE, 
+                                           Record.create(time.milliseconds, null, "change
(I need more bytes)".getBytes))
 
     // append messages to log
     val maxMessageSize = second.sizeInBytes - 1
@@ -717,7 +729,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testTruncateTo() {
-    val set = TestUtils.singletonRecords("test".getBytes)
+    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val setSize = set.sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
@@ -818,7 +830,7 @@ class LogTest extends JUnitSuite {
     val bogusIndex2 = Log.indexFilename(logDir, 5)
     val bogusTimeIndex2 = Log.timeIndexFilename(logDir, 5)
 
-    val set = TestUtils.singletonRecords("test".getBytes)
+    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
@@ -846,7 +858,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testReopenThenTruncate() {
-    val set = TestUtils.singletonRecords("test".getBytes)
+    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
@@ -879,14 +891,14 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testAsyncDelete() {
-    val set = TestUtils.singletonRecords("test".getBytes)
+    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds
- 1000L)
     val asyncDeleteMs = 1000
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer)
     logProps.put(LogConfig.FileDeleteDelayMsProp, asyncDeleteMs: java.lang.Integer)
-    logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
     val config = LogConfig(logProps)
 
     val log = new Log(logDir,
@@ -902,8 +914,6 @@ class LogTest extends JUnitSuite {
     // files should be renamed
     val segments = log.logSegments.toArray
     val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file)
-    // expire all segments
-    log.logSegments.foreach(_.lastModified = time.milliseconds - 1000L)
 
     log.deleteOldSegments()
 
@@ -925,11 +935,11 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testOpenDeletesObsoleteFiles() {
-    val set = TestUtils.singletonRecords("test".getBytes)
+    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds
- 1000)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
     val config = LogConfig(logProps)
     var log = new Log(logDir,
                       config,
@@ -942,7 +952,6 @@ class LogTest extends JUnitSuite {
       log.append(set)
 
     // expire all segments
-    log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
     log.deleteOldSegments()
     log.close()
 
@@ -961,7 +970,7 @@ class LogTest extends JUnitSuite {
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
-    log.append(MemoryRecords.withRecords(Record.create(null)))
+    log.append(MemoryRecords.withRecords(Record.create(time.milliseconds, null, null)))
     val head = log.read(0, 4096, None).records.shallowEntries().iterator.next()
     assertEquals(0, head.offset)
     assertTrue("Message payload should be null.", head.record.hasNullValue)
@@ -974,7 +983,7 @@ class LogTest extends JUnitSuite {
       recoveryPoint = 0L,
       time.scheduler,
       time)
-    val messages = (0 until 2).map(id => Record.create(id.toString.getBytes)).toArray
+    val messages = (0 until 2).map(id => Record.create(time.milliseconds, null, id.toString.getBytes)).toArray
     messages.foreach(record => log.append(MemoryRecords.withRecords(record)))
     val invalidMessage = MemoryRecords.withRecords(Record.create(1.toString.getBytes))
     log.append(invalidMessage, assignOffsets = false)
@@ -988,7 +997,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
     val config = LogConfig(logProps)
-    val set = TestUtils.singletonRecords("test".getBytes)
+    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val recoveryPoint = 50L
     for (_ <- 0 until 50) {
       // create a log and write some messages to it
@@ -1065,7 +1074,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer)
     logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
     val config = LogConfig(logProps)
-    val set = TestUtils.singletonRecords("test".getBytes)
+    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds)
     val parentLogDir = logDir.getParentFile
     assertTrue("Data directory %s must exist", parentLogDir.isDirectory)
     val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile)
@@ -1166,11 +1175,11 @@ class LogTest extends JUnitSuite {
 
   @Test
   def testDeleteOldSegmentsMethod() {
-    val set = TestUtils.singletonRecords("test".getBytes)
+    val set = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds
- 1000)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer)
-    logProps.put(LogConfig.RetentionMsProp, 0: java.lang.Integer)
+    logProps.put(LogConfig.RetentionMsProp, 999: java.lang.Integer)
     val config = LogConfig(logProps)
     val log = new Log(logDir,
       config,
@@ -1183,7 +1192,6 @@ class LogTest extends JUnitSuite {
       log.append(set)
 
     // expire all segments
-    log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
     log.deleteOldSegments()
     assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
 
@@ -1290,6 +1298,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
     logProps.put(LogConfig.RetentionBytesProp, retentionBytes: Integer)
     logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
+    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
     val config = LogConfig(logProps)
     val log = new Log(logDir,
       config,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bc9d583/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 4075a07..ac766dc 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -50,6 +50,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
     val config: Properties = createBrokerConfig(1)
+    config.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     time = new MockTime()

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bc9d583/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0259bad..c481ac4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -18,9 +18,11 @@
 package kafka.server
 
 import java.io.File
+import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.Broker
+import kafka.log.LogConfig
 import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
 import TestUtils.createBroker
 import org.I0Itec.zkclient.ZkClient
@@ -121,7 +123,9 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
+    val logProps = new Properties()
+    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray,
LogConfig(logProps))
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time),
mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
 
@@ -194,7 +198,9 @@ class ReplicaManagerTest {
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     props.put("broker.id", Int.box(0))
     val config = KafkaConfig.fromProps(props)
-    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
+    val logProps = new Properties()
+    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray,
LogConfig(logProps))
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time),
mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower,
Option(this.getClass.getName))
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8bc9d583/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index b0f8a43..c16618b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -217,6 +217,7 @@ object TestUtils extends Logging {
     props.put(KafkaConfig.DeleteTopicEnableProp, enableDeleteTopic.toString)
     props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "100")
     props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "2097152")
+    props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, Long.MaxValue.toString)
     props.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
     rack.foreach(props.put(KafkaConfig.RackProp, _))
 


Mime
View raw message