kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3373; add 'log' prefix to configurations in KIP-31/32
Date Tue, 15 Mar 2016 02:13:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cf40acc2b -> ffbe624e6


KAFKA-3373; add 'log' prefix to configurations in KIP-31/32

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Gwen Shapira

Closes #1049 from becketqin/KAFKA-3373


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffbe624e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffbe624e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffbe624e

Branch: refs/heads/trunk
Commit: ffbe624e6f4906c55080508fab19ac60dc93761e
Parents: cf40acc
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Mon Mar 14 19:13:26 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Mon Mar 14 19:13:26 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogConfig.scala   | 12 +++----
 .../main/scala/kafka/server/ConfigHandler.scala |  4 +--
 .../main/scala/kafka/server/KafkaConfig.scala   | 33 +++++++++++---------
 .../main/scala/kafka/server/KafkaServer.scala   |  6 ++--
 .../unit/kafka/server/KafkaConfigTest.scala     |  4 +--
 .../scala/unit/kafka/server/LogOffsetTest.scala | 24 +++++++-------
 docs/upgrade.html                               |  4 +--
 .../kafkatest/services/kafka/config_property.py |  2 +-
 tests/kafkatest/tests/upgrade_test.py           |  6 ++--
 9 files changed, 48 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ffbe624e/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index a76dce7..ffec85a 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -47,9 +47,9 @@ object Defaults {
   val MinInSyncReplicas = kafka.server.Defaults.MinInSyncReplicas
   val CompressionType = kafka.server.Defaults.CompressionType
   val PreAllocateEnable = kafka.server.Defaults.LogPreAllocateEnable
-  val MessageFormatVersion = kafka.server.Defaults.MessageFormatVersion
-  val MessageTimestampType = kafka.server.Defaults.MessageTimestampType
-  val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.MessageTimestampDifferenceMaxMs
+  val MessageFormatVersion = kafka.server.Defaults.LogMessageFormatVersion
+  val MessageTimestampType = kafka.server.Defaults.LogMessageTimestampType
+  val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.LogMessageTimestampDifferenceMaxMs
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef,
props, false) {
@@ -110,9 +110,9 @@ object LogConfig {
   val MinInSyncReplicasProp = "min.insync.replicas"
   val CompressionTypeProp = "compression.type"
   val PreAllocateEnableProp = "preallocate"
-  val MessageFormatVersionProp = KafkaConfig.MessageFormatVersionProp
-  val MessageTimestampTypeProp = KafkaConfig.MessageTimestampTypeProp
-  val MessageTimestampDifferenceMaxMsProp = KafkaConfig.MessageTimestampDifferenceMaxMsProp
+  val MessageFormatVersionProp = "message.format.version"
+  val MessageTimestampTypeProp = "message.timestamp.type"
+  val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms"
 
   val SegmentSizeDoc = "The hard maximum for the size of a segment file in the log"
   val SegmentMsDoc = "The soft maximum on the amount of time before a new log segment is
rolled"

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffbe624e/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 4bdd308..ab1d782 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -48,8 +48,8 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig:
KafkaC
         warn(s"Log configuration ${LogConfig.MessageFormatVersionProp} is ignored for `$topic`
because `$versionString` " +
           s"is not compatible with Kafka inter-broker protocol version `${kafkaConfig.interBrokerProtocolVersionString}`")
         Some(LogConfig.MessageFormatVersionProp)
-      }
-      else None
+      } else
+        None
     }
 
     val logs = logManager.logsByTopicPartition.filterKeys(_.topic == topic).values.toBuffer

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffbe624e/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d13c872..8d14edd 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -23,6 +23,7 @@ import kafka.api.ApiVersion
 import kafka.cluster.EndPoint
 import kafka.consumer.ConsumerConfig
 import kafka.coordinator.OffsetConfig
+import kafka.log.LogConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
@@ -95,12 +96,12 @@ object Defaults {
   val LogFlushOffsetCheckpointIntervalMs = 60000
   val LogPreAllocateEnable = false
   // lazy val as `InterBrokerProtocolVersion` is defined later
-  lazy val MessageFormatVersion = InterBrokerProtocolVersion
+  lazy val LogMessageFormatVersion = InterBrokerProtocolVersion
+  val LogMessageTimestampType = "CreateTime"
+  val LogMessageTimestampDifferenceMaxMs = Long.MaxValue
   val NumRecoveryThreadsPerDataDir = 1
   val AutoCreateTopicsEnable = true
   val MinInSyncReplicas = 1
-  val MessageTimestampType = "CreateTime"
-  val MessageTimestampDifferenceMaxMs = Long.MaxValue
 
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMs = RequestTimeoutMs
@@ -184,6 +185,8 @@ object Defaults {
 
 object KafkaConfig {
 
+  private val LogConfigPrefix = "log."
+
   def main(args: Array[String]) {
     System.out.println(configDef.toHtmlTable)
   }
@@ -255,12 +258,12 @@ object KafkaConfig {
   val LogFlushIntervalMsProp = "log.flush.interval.ms"
   val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
   val LogPreAllocateProp = "log.preallocate"
-  val MessageFormatVersionProp = "message.format.version"
+  val LogMessageFormatVersionProp = LogConfigPrefix + LogConfig.MessageFormatVersionProp
+  val LogMessageTimestampTypeProp = LogConfigPrefix + LogConfig.MessageTimestampTypeProp
+  val LogMessageTimestampDifferenceMaxMsProp = LogConfigPrefix + LogConfig.MessageTimestampDifferenceMaxMsProp
   val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
   val AutoCreateTopicsEnableProp = "auto.create.topics.enable"
   val MinInSyncReplicasProp = "min.insync.replicas"
-  val MessageTimestampTypeProp = "message.timestamp.type"
-  val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms"
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
   val DefaultReplicationFactorProp = "default.replication.factor"
@@ -607,9 +610,9 @@ object KafkaConfig {
       .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir,
atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
       .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH,
AutoCreateTopicsEnableDoc)
       .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
-      .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc)
-      .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, in("CreateTime",
"LogAppendTime"), MEDIUM, MessageTimestampTypeDoc)
-      .define(MessageTimestampDifferenceMaxMsProp, LONG, Defaults.MessageTimestampDifferenceMaxMs,
atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc)
+      .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM,
MessageFormatVersionDoc)
+      .define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime",
"LogAppendTime"), MEDIUM, MessageTimestampTypeDoc)
+      .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs,
atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc)
 
       /** ********* Replication configuration ***********/
       .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM,
ControllerSocketTimeoutMsDoc)
@@ -802,10 +805,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
   val logPreAllocateEnable: java.lang.Boolean = getBoolean(KafkaConfig.LogPreAllocateProp)
   // We keep the user-provided String as `ApiVersion.apply` can choose a slightly different
version (eg if `0.10.0`
   // is passed, `0.10.0-IV0` may be picked)
-  val messageFormatVersionString = getString(KafkaConfig.MessageFormatVersionProp)
-  val messageFormatVersion = ApiVersion(messageFormatVersionString)
-  val messageTimestampType = TimestampType.forName(getString(KafkaConfig.MessageTimestampTypeProp))
-  val messageTimestampDifferenceMaxMs = getLong(KafkaConfig.MessageTimestampDifferenceMaxMsProp)
+  val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp)
+  val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
+  val logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
+  val logMessageTimestampDifferenceMaxMs = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
 
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -986,7 +989,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
       s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of
${KafkaConfig.ListenersProp} protocols. " +
       s"Found ${advertisedListeners.keySet}. The valid options based on currently configured
protocols are ${listeners.keySet}"
     )
-    require(interBrokerProtocolVersion >= messageFormatVersion,
-      s"message.format.version $messageFormatVersionString cannot be used when inter.broker.protocol.version
is set to $interBrokerProtocolVersionString")
+    require(interBrokerProtocolVersion >= logMessageFormatVersion,
+      s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version
is set to $interBrokerProtocolVersionString")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffbe624e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2203df9..2f5441a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -75,9 +75,9 @@ object KafkaServer {
     logProps.put(LogConfig.CompressionTypeProp, kafkaConfig.compressionType)
     logProps.put(LogConfig.UncleanLeaderElectionEnableProp, kafkaConfig.uncleanLeaderElectionEnable)
     logProps.put(LogConfig.PreAllocateEnableProp, kafkaConfig.logPreAllocateEnable)
-    logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.messageFormatVersion.version)
-    logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.messageTimestampType.name)
-    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.messageTimestampDifferenceMaxMs)
+    logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version)
+    logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name)
+    logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs)
     logProps
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffbe624e/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index c5a0079..7524e6a 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -283,14 +283,14 @@ class KafkaConfigTest {
 
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0")
     // We need to set the message format version to make the configuration valid.
-    props.put(KafkaConfig.MessageFormatVersionProp, "0.8.2.0")
+    props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.0")
     val conf2 = KafkaConfig.fromProps(props)
     assertEquals(KAFKA_0_8_2, conf2.interBrokerProtocolVersion)
 
     // check that 0.8.2.0 is the same as 0.8.2.1
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1")
     // We need to set the message format version to make the configuration valid
-    props.put(KafkaConfig.MessageFormatVersionProp, "0.8.2.1")
+    props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.1")
     val conf3 = KafkaConfig.fromProps(props)
     assertEquals(KAFKA_0_8_2, conf3.interBrokerProtocolVersion)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffbe624e/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 5c2092c..8c86a7b 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -18,21 +18,20 @@
 package kafka.server
 
 import java.io.File
+import java.util.{Properties, Random}
+
+import kafka.admin.AdminUtils
+import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
+import kafka.utils.TestUtils._
 import kafka.utils._
-import org.apache.kafka.common.protocol.Errors
+import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
 import org.junit.Assert._
-import java.util.{Random, Properties}
-import kafka.consumer.SimpleConsumer
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
-import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.AdminUtils
-import kafka.api.{ApiVersion, PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
-import kafka.utils.TestUtils._
-import kafka.common.TopicAndPartition
-import org.junit.After
-import org.junit.Before
-import org.junit.Test
+import org.junit.{After, Before, Test}
 
 class LogOffsetTest extends ZooKeeperTestHarness {
   val random = new Random() 
@@ -206,7 +205,6 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     props.put("log.retention.check.interval.ms", (5*1000*60).toString)
     props.put("log.segment.bytes", logSize.toString)
     props.put("zookeeper.connect", zkConnect.toString)
-    props.put("message.format.version", "0.10.0")
     props
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffbe624e/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 863a6fa..15ea3ae 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -26,7 +26,7 @@ are introduced, it is important to upgrade your Kafka clusters before upgrading
 
 <ol>
     <li> Update server.properties file on all brokers and add the following property:
inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or 0.9.0.0).
-         We recommend that users set message.format.version=CURRENT_KAFKA_VERSION as well
to avoid a performance regression
+         We recommend that users set log.message.format.version=CURRENT_KAFKA_VERSION as
well to avoid a performance regression
          during upgrade. See <a href="#upgrade_10_performance_impact">potential performance
impact during upgrade</a> for the details.
     </li>
     <li> Upgrade the brokers. This can be done a broker at a time by simply bringing
it down, updating the code, and restarting it. </li>
@@ -41,7 +41,7 @@ are introduced, it is important to upgrade your Kafka clusters before upgrading
 <h5><a id="upgrade_10_performance_impact" href="#upgrade_10_performance_impact">Potential
performance impact during upgrade to 0.10.0.0</a></h5>
 <p>
     The message format in 0.10.0 includes a new timestamp field and uses relative offsets
for compressed messages.
-    The on disk message format can be configured through message.format.version in the server.properties
file.
+    The on disk message format can be configured through log.message.format.version in the
server.properties file.
     The default on-disk message format is 0.10.0. If a consumer client is on a version before
0.10.0.0, it only understands
     message formats before 0.10.0. In this case, the broker is able to convert messages from
the 0.10.0 format to an earlier format
     before sending the response to the consumer on an older version. However, the broker
can't use zero-copy transfer in this case.

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffbe624e/tests/kafkatest/services/kafka/config_property.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index b2b1d05..8f30f13 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -40,7 +40,7 @@ AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"
 ZOOKEEPER_CONNECT = "zookeeper.connect"
 ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms"
 INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version"
-MESSAGE_FORMAT_VERSION = "message.format.version"
+MESSAGE_FORMAT_VERSION = "log.message.format.version"
 
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffbe624e/tests/kafkatest/tests/upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py
index bec4b3f..9926f11 100644
--- a/tests/kafkatest/tests/upgrade_test.py
+++ b/tests/kafkatest/tests/upgrade_test.py
@@ -78,10 +78,10 @@ class TestUpgrade(ProduceConsumeValidateTest):
         - Start producer and consumer in the background
         - Perform two-phase rolling upgrade
             - First phase: upgrade brokers to 0.10 with inter.broker.protocol.version set
to
-            from_kafka_version and message.format.version set to from_kafka_version
+            from_kafka_version and log.message.format.version set to from_kafka_version
             - Second phase: remove inter.broker.protocol.version config with rolling bounce;
if
-            to_message_format_version is set to 0.9, set message.format.version to
-            to_message_format_version, otherwise remove message.format.version config
+            to_message_format_version is set to 0.9, set log.message.format.version to
+            to_message_format_version, otherwise remove log.message.format.version config
         - Finally, validate that every message acked by the producer was consumed by the
consumer
         """
         self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,


Mime
View raw message