kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-3721; Put UpdateMetadataRequest V2 in 0.10.0-IV1
Date Wed, 18 May 2016 04:16:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9a44d938d -> 2bd7b6450


KAFKA-3721; Put UpdateMetadataRequest V2 in 0.10.0-IV1

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Gwen Shapira <cshapi@gmail.com>, Jun
Rao <junrao@gmail.com>

Closes #1400 from becketqin/KAFKA-3721


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2bd7b645
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2bd7b645
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2bd7b645

Branch: refs/heads/trunk
Commit: 2bd7b64506a2a7ecef562f5b7db8a34e28d4e957
Parents: 9a44d93
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Tue May 17 21:16:52 2016 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Tue May 17 21:16:52 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/ApiVersion.scala           | 11 ++++++++++-
 .../kafka/controller/ControllerChannelManager.scala      |  2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala       |  4 ++--
 core/src/main/scala/kafka/utils/ZkUtils.scala            |  4 ++--
 4 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2bd7b645/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index e2cadd1..2417d79 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -47,8 +47,11 @@ object ApiVersion {
     "0.8.1" -> KAFKA_0_8_1,
     "0.8.2" -> KAFKA_0_8_2,
     "0.9.0" -> KAFKA_0_9_0,
+    // 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
     "0.10.0-IV0" -> KAFKA_0_10_0_IV0,
-    "0.10.0" -> KAFKA_0_10_0_IV0
+    // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake).
+    "0.10.0-IV1" -> KAFKA_0_10_0_IV1,
+    "0.10.0" -> KAFKA_0_10_0_IV1
   )
 
   private val versionPattern = "\\.".r
@@ -102,3 +105,9 @@ case object KAFKA_0_10_0_IV0 extends ApiVersion {
   val messageFormatVersion: Byte = Message.MagicValue_V1
   val id: Int = 4
 }
+
+case object KAFKA_0_10_0_IV1 extends ApiVersion {
+  val version: String = "0.10.0-IV1"
+  val messageFormatVersion: Byte = Message.MagicValue_V1
+  val id: Int = 5
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2bd7b645/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 018946e..65b7096 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -386,7 +386,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
           topicPartition -> partitionState
         }
 
-        val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0)
2: Short
+        val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1)
2: Short
                       else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0)
1: Short
                       else 0: Short
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2bd7b645/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index dff2b66..a664484 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.util.Properties
 
-import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
 import kafka.cluster.EndPoint
 import kafka.consumer.ConsumerConfig
 import kafka.coordinator.OffsetConfig
@@ -912,7 +912,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
   val saslKerberosTicketRenewJitter = getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp)
   val saslKerberosMinTimeBeforeRelogin = getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp)
   val saslKerberosPrincipalToLocalRules = getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp)
-  val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV0
+  val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1
 
   /** ********* Quota Configuration **************/
   val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2bd7b645/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index ec72029..1278a70 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -20,7 +20,7 @@ package kafka.utils
 import java.util.concurrent.CountDownLatch
 
 import kafka.admin._
-import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0, LeaderAndIsr}
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster._
 import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
 import kafka.consumer.{ConsumerThreadId, TopicCount}
@@ -277,7 +277,7 @@ class ZkUtils(val zkClient: ZkClient,
     val brokerIdPath = BrokerIdsPath + "/" + id
     val timestamp = SystemTime.milliseconds.toString
 
-    val version = if (apiVersion >= KAFKA_0_10_0_IV0) 3 else 2
+    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 3 else 2
     var jsonMap = Map("version" -> version,
                       "host" -> host,
                       "port" -> port,


Mime
View raw message