kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4340; Follow-up fixing system test failures and handling non default log.retention.ms
Date Fri, 17 Feb 2017 10:41:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a88737929 -> 1f2ee5f0a


KAFKA-4340; Follow-up fixing system test failures and handling non default log.retention.ms

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2544 from becketqin/KAFKA-4340_follow_up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1f2ee5f0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1f2ee5f0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1f2ee5f0

Branch: refs/heads/trunk
Commit: 1f2ee5f0a9dfc33e34362f7cd4ac45edaabe421b
Parents: a887379
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Feb 17 02:19:33 2017 -0800
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Feb 17 02:40:34 2017 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/log/LogValidator.scala |  6 ++-
 .../main/scala/kafka/server/ConfigHandler.scala | 13 +++----
 .../main/scala/kafka/server/KafkaConfig.scala   | 15 ++++++--
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 28 +++++++++-----
 .../unit/kafka/server/KafkaConfigTest.scala     |  2 +
 docs/upgrade.html                               | 39 +++++++++++++++++++-
 7 files changed, 82 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2ee5f0/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 45e364c..26d6e8c 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -235,9 +235,11 @@ private[kafka] object LogValidator {
                                 now: Long,
                                 timestampType: TimestampType,
                                 timestampDiffMaxMs: Long) {
-    if (timestampType == TimestampType.CREATE_TIME && math.abs(record.timestamp -
now) > timestampDiffMaxMs)
+    if (timestampType == TimestampType.CREATE_TIME
+      && record.timestamp != Record.NO_TIMESTAMP
+      && math.abs(record.timestamp - now) > timestampDiffMaxMs)
       throw new InvalidTimestampException(s"Timestamp ${record.timestamp} of message is out
of range. " +
-        s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}")
+        s"The timestamp should be within [${now - timestampDiffMaxMs}, ${now + timestampDiffMaxMs}]")
     if (record.timestampType == TimestampType.LOG_APPEND_TIME)
       throw new InvalidTimestampException(s"Invalid timestamp type in message $record. Producer
should not set " +
         s"timestamp type to LogAppendTime.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2ee5f0/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index e8f13d7..8d6de8c 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -65,7 +65,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig:
KafkaC
         && logConfig.retentionMs < logConfig.messageTimestampDifferenceMaxMs)
         warn(s"${LogConfig.RetentionMsProp} for topic $topic is set to ${logConfig.retentionMs}.
It is smaller than " + 
           s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value ${logConfig.messageTimestampDifferenceMaxMs}.
" +
-          s"This may result in potential frequent log rolling.")
+          s"This may result in frequent log rolling.")
       logs.foreach(_.config = logConfig)
     }
 
@@ -98,16 +98,15 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig:
KafkaC
   }
   
   def excludedConfigs(topic: String, topicConfig: Properties): Set[String] = {
-    val excludeConfigs: mutable.Set[String] = new mutable.HashSet[String]
     // Verify message format version
-    Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).foreach { versionString
=>
+    Option(topicConfig.getProperty(LogConfig.MessageFormatVersionProp)).flatMap { versionString
=>
       if (kafkaConfig.interBrokerProtocolVersion < ApiVersion(versionString)) {
         warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for `$topic`
because `$versionString` " +
           s"is not compatible with Kafka inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`")
-        excludeConfigs += LogConfig.MessageFormatVersionProp
-      }
-    }
-    excludeConfigs.toSet
+        Some(LogConfig.MessageFormatVersionProp)
+      } else
+        None
+    }.toSet
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2ee5f0/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 0180a2c..a15f034 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.util
 import java.util.Properties
 
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
@@ -479,7 +480,8 @@ object KafkaConfig {
   val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the
timestamp when a broker receives " +
     "a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime,
a message will be rejected " +
     "if the difference in timestamp exceeds this threshold. This configuration is ignored
if log.message.timestamp.type=LogAppendTime." +
-    "The maximum timestamp difference allowed should be no greater than log.retention.ms
to avoid unnecessarily frequent log rolling."
+    "The maximum timestamp difference allowed should be no greater than log.retention.ms
to avoid unnecessarily frequent log rolling. For " +
+    "this reason, the default is the value of log.retention.ms."
   val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used
for log recovery at startup and flushing at shutdown"
   val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
   val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " +
@@ -690,7 +692,7 @@ object KafkaConfig {
       .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
       .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM,
LogMessageFormatVersionDoc)
       .define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime",
"LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
-      .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs,
atLeast(0), MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
+      .define(LogMessageTimestampDifferenceMaxMsProp, LONG, null, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
       .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
 
       /** ********* Replication configuration ***********/
@@ -888,7 +890,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
   val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp)
   val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
   val logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
-  val logMessageTimestampDifferenceMaxMs = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
+  val logMessageTimestampDifferenceMaxMs = getMessageTimestampDifferenceMaxMs
 
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -998,6 +1000,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
     millis
   }
 
+  private def getMessageTimestampDifferenceMaxMs: Long = {
+    Option(getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)) match {
+      case Some(value) => value
+      case None => getLogRetentionTimeMillis
+    }
+  }
+
   private def getMap(propName: String, propValue: String): Map[String, String] = {
     try {
       CoreUtils.parseCsvMap(propValue)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2ee5f0/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2a247ec..f5d8fc6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -76,7 +76,7 @@ object KafkaServer {
     logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable)
     logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version)
     logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name)
-    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs)
+    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs:
java.lang.Long)
     logProps
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2ee5f0/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1831a49..5825ab7 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -354,7 +354,7 @@ class LogTest extends JUnitSuite {
     logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
     val numMessages = 100
-    val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value
= i.toString.getBytes, 
+    val messageSets = (0 until numMessages).map(i => TestUtils.singletonRecords(value
= i.toString.getBytes,
                                                                                 timestamp
= time.milliseconds))
     messageSets.foreach(log.append(_))
     log.flush()
@@ -389,11 +389,11 @@ class LogTest extends JUnitSuite {
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time
= time)
 
     /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3
*/
-    log.append(MemoryRecords.withRecords(CompressionType.GZIP, 
-                                         Record.create(time.milliseconds, null, "hello".getBytes),

+    log.append(MemoryRecords.withRecords(CompressionType.GZIP,
+                                         Record.create(time.milliseconds, null, "hello".getBytes),
                                          Record.create(time.milliseconds, null, "there".getBytes)))
-    log.append(MemoryRecords.withRecords(CompressionType.GZIP, 
-                                         Record.create(time.milliseconds, null, "alpha".getBytes),

+    log.append(MemoryRecords.withRecords(CompressionType.GZIP,
+                                         Record.create(time.milliseconds, null, "alpha".getBytes),
                                          Record.create(time.milliseconds, null, "beta".getBytes)))
 
     def read(offset: Int) = log.read(offset, 4096).records.deepEntries.iterator
@@ -445,7 +445,7 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testMessageSetSizeCheck() {
-    val messageSet = MemoryRecords.withRecords(Record.create(time.milliseconds, null, "You".getBytes),

+    val messageSet = MemoryRecords.withRecords(Record.create(time.milliseconds, null, "You".getBytes),
                                                Record.create(time.milliseconds, null, "bethe".getBytes))
     // append messages to log
     val configSegmentSize = messageSet.sizeInBytes - 1
@@ -513,10 +513,10 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testMessageSizeCheck() {
-    val first = MemoryRecords.withRecords(CompressionType.NONE, 
-                                          Record.create(time.milliseconds, null, "You".getBytes),

+    val first = MemoryRecords.withRecords(CompressionType.NONE,
+                                          Record.create(time.milliseconds, null, "You".getBytes),
                                           Record.create(time.milliseconds, null, "bethe".getBytes))
-    val second = MemoryRecords.withRecords(CompressionType.NONE, 
+    val second = MemoryRecords.withRecords(CompressionType.NONE,
                                            Record.create(time.milliseconds, null, "change
(I need more bytes)".getBytes))
 
     // append messages to log
@@ -990,6 +990,16 @@ class LogTest extends JUnitSuite {
   }
 
   @Test
+  def testAppendWithNoTimestamp(): Unit = {
+    val log = new Log(logDir,
+      LogConfig(),
+      recoveryPoint = 0L,
+      time.scheduler,
+      time)
+    log.append(MemoryRecords.withRecords(Record.create(Record.NO_TIMESTAMP, "key".getBytes,
"value".getBytes)))
+  }
+
+  @Test
   def testCorruptLog() {
     // append some messages to create some segments
     val logProps = new Properties()

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2ee5f0/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 0f5ff5d..c89e626 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -586,6 +586,7 @@ class KafkaConfigTest {
         case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
         case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
+        case KafkaConfig.LogMessageTimestampDifferenceMaxMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
         case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_boolean", "0")
         case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
@@ -702,6 +703,7 @@ class KafkaConfigTest {
     assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
     assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
     assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
+    assertEquals(config.logRetentionTimeMillis, config.logMessageTimestampDifferenceMaxMs)
     assertEquals(123L, config.logFlushIntervalMs)
     assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
     assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f2ee5f0/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 5976054..ef9216e 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -15,6 +15,44 @@
  limitations under the License.
 -->
 
+<h4><a id="upgrade_10_3_0" href="#upgrade_10_3_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x or 0.10.2.x to 0.10.3.0</a></h4>
+<p>0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade
plan below, you guarantee no downtime during the upgrade.
+    However, please review the <a href="#upgrade_1030_notable">notable changes in 0.10.3.0</a>
before upgrading.
+</p>
+
+<p>Starting with version 0.10.2, Java clients (producer and consumer) have acquired
the ability to communicate with older brokers. Version 0.10.3
+    clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older
than 0.10.0, you must upgrade all the brokers in the
+    Kafka cluster before upgrading your clients. Version 0.10.3 brokers support 0.8.x and
newer clients.
+</p>
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li> Update server.properties file on all brokers and add the following properties:
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0,
0.10.0, 0.10.1 or 0.10.2).</li>
+            <li>log.message.format.version=CURRENT_KAFKA_VERSION  (See <a href="#upgrade_10_performance_impact">potential
performance impact following the upgrade</a> for the details on what this configuration
does.)
+        </ul>
+    </li>
+    <li> Upgrade the brokers one at a time: shut down the broker, update the code,
and restart it. </li>
+    <li> Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version and setting it to 0.10.3. </li>
+    <li> If your previous message format is 0.10.0, change log.message.format.version
to 0.10.3 (this is a no-op as the message format is the same for 0.10.0, 0.10.1, 0.10.2 and
0.10.3).
+        If your previous message format version is lower than 0.10.0, do not change log.message.format.version
yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0
or later.</li>
+    <li> Restart the brokers one by one for the new protocol version to take effect.
</li>
+    <li> If log.message.format.version is still lower than 0.10.0 at this point, wait
until all consumers have been upgraded to 0.10.0 or later,
+        then change log.message.format.version to 0.10.3 on each broker and restart them
one by one. </li>
+</ol>
+
+<p><b>Note:</b> If you are willing to accept downtime, you can simply take
all the brokers down, update the code and start all of them. They will start with the new
protocol by default.
+
+<p><b>Note:</b> Bumping the protocol version and restarting can be done
any time after the brokers were upgraded. It does not have to be immediately after.
+
+<h5><a id="upgrade_1030_notable" href="#upgrade_1030_notable">Notable changes
in 0.10.3.0</a></h5>
+<ul>
+    <li>The <code>offsets.topic.replication.factor</code> broker config
is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE
error until the cluster size meets this replication factor requirement.</li>
+    <li>By default <code>message.timestamp.difference.max.ms</code> is
the same as <code>retention.ms</code> instead of <code>Long.MAX_VALUE</code>.</li>
+</ul>
+
 <h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>
 <p>0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade
plan below, you guarantee no downtime during the upgrade.
 However, please review the <a href="#upgrade_1020_notable">notable changes in 0.10.2.0</a>
before upgrading.
@@ -75,7 +113,6 @@ Kafka cluster before upgrading your clients. Version 0.10.2 brokers support
0.8.
         should not be set in the Streams app any more. If the Kafka cluster is secured, Streams
apps must have the required security privileges to create new topics.</li>
     <li>Several new fields including "security.protocol", "connections.max.idle.ms",
"retry.backoff.ms", "reconnect.backoff.ms" and "request.timeout.ms" were added to
         StreamsConfig class. User should pay attention to the default values and set these
if needed. For more details please refer to <a href="/{{version}}/documentation/#streamsconfigs">3.5
Kafka Streams Configs</a>.</li>
-    <li>The <code>offsets.topic.replication.factor</code> broker config
is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE
error until the cluster size meets this replication factor requirement.</li>
 </ul>
 
 <h5><a id="upgrade_1020_new_protocols" href="#upgrade_1020_new_protocols">New
Protocol Versions</a></h5>


Mime
View raw message