kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2509: Replace LeaderAndIsr{Request,Response} with o.a.k.c reque…
Date Tue, 15 Dec 2015 21:48:41 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3f3358b6d -> 3432e85f6


KAFKA-2509: Replace LeaderAndIsr{Request,Response} with o.a.k.c reque…

…sts equivalent

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Gwen Shapira

Closes #647 from granthenke/isr-request


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3432e85f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3432e85f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3432e85f

Branch: refs/heads/trunk
Commit: 3432e85f664c5e3d8300171c314ea52b2f4b88fd
Parents: 3f3358b
Author: Grant Henke <granthenke@gmail.com>
Authored: Tue Dec 15 13:48:34 2015 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Dec 15 13:48:34 2015 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/LeaderAndIsr.scala |  93 +++++++++
 .../scala/kafka/api/LeaderAndIsrRequest.scala   | 206 -------------------
 .../scala/kafka/api/LeaderAndIsrResponse.scala  |  75 -------
 .../main/scala/kafka/cluster/Partition.scala    |  34 ++-
 .../scala/kafka/network/RequestChannel.scala    |   1 -
 .../src/main/scala/kafka/server/KafkaApis.scala |  24 ++-
 .../scala/kafka/server/ReplicaManager.scala     |  69 ++++---
 .../api/RequestResponseSerializationTest.scala  |  26 +--
 8 files changed, 162 insertions(+), 366 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3432e85f/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
new file mode 100644
index 0000000..5de527c
--- /dev/null
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package kafka.api
+
+import java.nio._
+
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.utils._
+
+import scala.collection.Set
+
+object LeaderAndIsr {
+  val initialLeaderEpoch: Int = 0
+  val initialZKVersion: Int = 0
+  val NoLeader = -1
+  val LeaderDuringDelete = -2
+}
+
+case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion:
Int) {
+  def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr,
LeaderAndIsr.initialZKVersion)
+
+  override def toString(): String = {
+    Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" ->
isr))
+  }
+}
+
+object PartitionStateInfo {
+  def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
+    val controllerEpoch = buffer.getInt
+    val leader = buffer.getInt
+    val leaderEpoch = buffer.getInt
+    val isrSize = buffer.getInt
+    val isr = for(i <- 0 until isrSize) yield buffer.getInt
+    val zkVersion = buffer.getInt
+    val replicationFactor = buffer.getInt
+    val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
+    PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList,
zkVersion), controllerEpoch),
+                       replicas.toSet)
+  }
+}
+
+case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                              allReplicas: Set[Int]) {
+  def replicationFactor = allReplicas.size
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(leaderIsrAndControllerEpoch.controllerEpoch)
+    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leader)
+    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)
+    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
+    leaderIsrAndControllerEpoch.leaderAndIsr.isr.foreach(buffer.putInt(_))
+    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion)
+    buffer.putInt(replicationFactor)
+    allReplicas.foreach(buffer.putInt(_))
+  }
+
+  def sizeInBytes(): Int = {
+    val size =
+      4 /* epoch of the controller that elected the leader */ +
+      4 /* leader broker id */ +
+      4 /* leader epoch */ +
+      4 /* number of replicas in isr */ +
+      4 * leaderIsrAndControllerEpoch.leaderAndIsr.isr.size /* replicas in isr */ +
+      4 /* zk version */ +
+      4 /* replication factor */ +
+      allReplicas.size * 4
+    size
+  }
+
+  override def toString(): String = {
+    val partitionStateInfo = new StringBuilder
+    partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
+    partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
+    partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
+    partitionStateInfo.toString()
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3432e85f/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
deleted file mode 100644
index 95451eb..0000000
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package kafka.api
-
-import java.nio._
-
-import kafka.api.ApiUtils._
-import kafka.cluster.BrokerEndPoint
-import kafka.common.ErrorMapping
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-import kafka.utils._
-import org.apache.kafka.common.protocol.ApiKeys
-
-import scala.collection.Set
-
-
-object LeaderAndIsr {
-  val initialLeaderEpoch: Int = 0
-  val initialZKVersion: Int = 0
-  val NoLeader = -1
-  val LeaderDuringDelete = -2
-}
-
-case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion:
Int) {
-  def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr,
LeaderAndIsr.initialZKVersion)
-
-  override def toString(): String = {
-    Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" ->
isr))
-  }
-}
-
-object PartitionStateInfo {
-  def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
-    val controllerEpoch = buffer.getInt
-    val leader = buffer.getInt
-    val leaderEpoch = buffer.getInt
-    val isrSize = buffer.getInt
-    val isr = for(i <- 0 until isrSize) yield buffer.getInt
-    val zkVersion = buffer.getInt
-    val replicationFactor = buffer.getInt
-    val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
-    PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList,
zkVersion), controllerEpoch),
-                       replicas.toSet)
-  }
-}
-
-case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                              allReplicas: Set[Int]) {
-  def replicationFactor = allReplicas.size
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(leaderIsrAndControllerEpoch.controllerEpoch)
-    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leader)
-    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)
-    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
-    leaderIsrAndControllerEpoch.leaderAndIsr.isr.foreach(buffer.putInt(_))
-    buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion)
-    buffer.putInt(replicationFactor)
-    allReplicas.foreach(buffer.putInt(_))
-  }
-
-  def sizeInBytes(): Int = {
-    val size =
-      4 /* epoch of the controller that elected the leader */ +
-      4 /* leader broker id */ +
-      4 /* leader epoch */ +
-      4 /* number of replicas in isr */ +
-      4 * leaderIsrAndControllerEpoch.leaderAndIsr.isr.size /* replicas in isr */ +
-      4 /* zk version */ +
-      4 /* replication factor */ +
-      allReplicas.size * 4
-    size
-  }
-
-  override def toString(): String = {
-    val partitionStateInfo = new StringBuilder
-    partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
-    partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")")
-    partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
-    partitionStateInfo.toString()
-  }
-}
-
-object LeaderAndIsrRequest {
-  val CurrentVersion = 0.shortValue
-  val IsInit: Boolean = true
-  val NotInit: Boolean = false
-  val DefaultAckTimeout: Int = 1000
-
-  def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = {
-    val versionId = buffer.getShort
-    val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
-    val controllerId = buffer.getInt
-    val controllerEpoch = buffer.getInt
-    val partitionStateInfosCount = buffer.getInt
-    val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo]
-
-    for(i <- 0 until partitionStateInfosCount){
-      val topic = readShortString(buffer)
-      val partition = buffer.getInt
-      val partitionStateInfo = PartitionStateInfo.readFrom(buffer)
-
-      partitionStateInfos.put((topic, partition), partitionStateInfo)
-    }
-
-    val leadersCount = buffer.getInt
-    var leaders = Set[BrokerEndPoint]()
-    for (i <- 0 until leadersCount)
-      leaders += BrokerEndPoint.readFrom(buffer)
-
-    new LeaderAndIsrRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
partitionStateInfos.toMap, leaders)
-  }
-}
-
-case class LeaderAndIsrRequest (versionId: Short,
-                                correlationId: Int,
-                                clientId: String,
-                                controllerId: Int,
-                                controllerEpoch: Int,
-                                partitionStateInfos: Map[(String, Int), PartitionStateInfo],
-                                leaders: Set[BrokerEndPoint])
-    extends RequestOrResponse(Some(ApiKeys.LEADER_AND_ISR.id)) {
-
-  def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[BrokerEndPoint],
controllerId: Int,
-           controllerEpoch: Int, correlationId: Int, clientId: String) = {
-    this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId,
-         controllerId, controllerEpoch, partitionStateInfos, leaders)
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
-    buffer.putInt(correlationId)
-    writeShortString(buffer, clientId)
-    buffer.putInt(controllerId)
-    buffer.putInt(controllerEpoch)
-    buffer.putInt(partitionStateInfos.size)
-    for((key, value) <- partitionStateInfos){
-      writeShortString(buffer, key._1)
-      buffer.putInt(key._2)
-      value.writeTo(buffer)
-    }
-    buffer.putInt(leaders.size)
-    leaders.foreach(_.writeTo(buffer))
-  }
-
-  def sizeInBytes(): Int = {
-    var size =
-      2 /* version id */ +
-      4 /* correlation id */ +
-      (2 + clientId.length) /* client id */ +
-      4 /* controller id */ +
-      4 /* controller epoch */ +
-      4 /* number of partitions */
-    for((key, value) <- partitionStateInfos)
-      size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /*
partition state info */
-    size += 4 /* number of leader brokers */
-    for(broker <- leaders)
-      size += broker.sizeInBytes /* broker info */
-    size
-  }
-
-  override def toString(): String = {
-    describe(true)
-  }
-
-  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request):
Unit = {
-    val responseMap = partitionStateInfos.map {
-      case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    }
-    val errorResponse = LeaderAndIsrResponse(correlationId, responseMap)
-    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId,
errorResponse)))
-  }
-
-  override def describe(details: Boolean): String = {
-    val leaderAndIsrRequest = new StringBuilder
-    leaderAndIsrRequest.append("Name:" + this.getClass.getSimpleName)
-    leaderAndIsrRequest.append(";Version:" + versionId)
-    leaderAndIsrRequest.append(";Controller:" + controllerId)
-    leaderAndIsrRequest.append(";ControllerEpoch:" + controllerEpoch)
-    leaderAndIsrRequest.append(";CorrelationId:" + correlationId)
-    leaderAndIsrRequest.append(";ClientId:" + clientId)
-    leaderAndIsrRequest.append(";Leaders:" + leaders.mkString(","))
-    if(details)
-      leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
-    leaderAndIsrRequest.toString()
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3432e85f/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala b/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
deleted file mode 100644
index 22ce48a..0000000
--- a/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import kafka.common.ErrorMapping
-import java.nio.ByteBuffer
-import kafka.api.ApiUtils._
-import collection.mutable.HashMap
-import collection.Map
-
-
-object LeaderAndIsrResponse {
-  def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = {
-    val correlationId = buffer.getInt
-    val errorCode = buffer.getShort
-    val numEntries = buffer.getInt
-    val responseMap = new HashMap[(String, Int), Short]()
-    for (i<- 0 until numEntries){
-      val topic = readShortString(buffer)
-      val partition = buffer.getInt
-      val partitionErrorCode = buffer.getShort
-      responseMap.put((topic, partition), partitionErrorCode)
-    }
-    new LeaderAndIsrResponse(correlationId, responseMap, errorCode)
-  }
-}
-
-
-case class LeaderAndIsrResponse(correlationId: Int,
-                                responseMap: Map[(String, Int), Short],
-                                errorCode: Short = ErrorMapping.NoError)
-    extends RequestOrResponse() {
-  def sizeInBytes(): Int ={
-    var size =
-      4 /* correlation id */ + 
-      2 /* error code */ +
-      4 /* number of responses */
-    for ((key, value) <- responseMap) {
-      size +=
-        2 + key._1.length /* topic */ +
-        4 /* partition */ +
-        2 /* error code for this partition */
-    }
-    size
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(correlationId)
-    buffer.putShort(errorCode)
-    buffer.putInt(responseMap.size)
-    for ((key:(String, Int), value) <- responseMap) {
-      writeShortString(buffer, key._1)
-      buffer.putInt(key._2)
-      buffer.putShort(value)
-    }
-  }
-
-  override def describe(details: Boolean):String = { toString }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/3432e85f/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 3805dcc..916f3e7 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -20,7 +20,7 @@ import kafka.common._
 import kafka.utils._
 import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
 import kafka.admin.AdminUtils
-import kafka.api.{PartitionStateInfo, LeaderAndIsr}
+import kafka.api.LeaderAndIsr
 import kafka.log.LogConfig
 import kafka.server._
 import kafka.metrics.KafkaMetricsGroup
@@ -29,7 +29,9 @@ import kafka.message.ByteBufferMessageSet
 
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
-import scala.collection.immutable.Set
+import org.apache.kafka.common.requests.LeaderAndIsrRequest
+
+import scala.collection.JavaConverters._
 
 import com.yammer.metrics.core.Gauge
 
@@ -161,22 +163,20 @@ class Partition(val topic: String,
    * from the time when this broker was the leader last time) and setting the new leader
and ISR.
    * If the leader replica id does not change, return false to indicate the replica manager.
    */
-  def makeLeader(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId:
Int): Boolean = {
+  def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState,
correlationId: Int): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
-      val allReplicas = partitionStateInfo.allReplicas
-      val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
-      val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+      val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
       // record the epoch of the controller that made the leadership decision. This is useful
while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
-      controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+      controllerEpoch = partitionStateInfo.controllerEpoch
       // add replicas that are new
       allReplicas.foreach(replica => getOrCreateReplica(replica))
-      val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
+      val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
       inSyncReplicas = newInSyncReplicas
-      leaderEpoch = leaderAndIsr.leaderEpoch
-      zkVersion = leaderAndIsr.zkVersion
+      leaderEpoch = partitionStateInfo.leaderEpoch
+      zkVersion = partitionStateInfo.zkVersion
       val isNewLeader =
         if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId)
{
           false
@@ -204,22 +204,20 @@ class Partition(val topic: String,
    *  Make the local replica the follower by setting the new leader and ISR to empty
    *  If the leader replica id does not change, return false to indicate the replica manager
    */
-  def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, correlationId:
Int): Boolean = {
+  def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState,
correlationId: Int): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
-      val allReplicas = partitionStateInfo.allReplicas
-      val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
-      val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-      val newLeaderBrokerId: Int = leaderAndIsr.leader
+      val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
+      val newLeaderBrokerId: Int = partitionStateInfo.leader
       // record the epoch of the controller that made the leadership decision. This is useful
while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
-      controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+      controllerEpoch = partitionStateInfo.controllerEpoch
       // add replicas that are new
       allReplicas.foreach(r => getOrCreateReplica(r))
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
       inSyncReplicas = Set.empty[Replica]
-      leaderEpoch = leaderAndIsr.leaderEpoch
-      zkVersion = leaderAndIsr.zkVersion
+      leaderEpoch = partitionStateInfo.leaderEpoch
+      zkVersion = partitionStateInfo.zkVersion
 
       if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId)
{
         false

http://git-wip-us.apache.org/repos/asf/kafka/blob/3432e85f/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 719588d..1ab51da 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -69,7 +69,6 @@ object RequestChannel extends Logging {
       Map(ApiKeys.PRODUCE.id -> ProducerRequest.readFrom,
         ApiKeys.FETCH.id -> FetchRequest.readFrom,
         ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
-        ApiKeys.LEADER_AND_ISR.id -> LeaderAndIsrRequest.readFrom,
         ApiKeys.STOP_REPLICA.id -> StopReplicaRequest.readFrom,
         ApiKeys.UPDATE_METADATA_KEY.id -> UpdateMetadataRequest.readFrom,
         ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3432e85f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2f619a4..5a3b8af 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import java.nio.ByteBuffer
-import java.lang.{Long => JLong}
+import java.lang.{Long => JLong, Short => JShort}
 
 import kafka.admin.AdminUtils
 import kafka.api._
@@ -36,7 +36,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest,
GroupCoordinatorResponse, ListGroupsResponse,
 DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest,
JoinGroupResponse,
-LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
+LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse,
LeaderAndIsrRequest, LeaderAndIsrResponse}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Node}
 
@@ -112,9 +112,8 @@ class KafkaApis(val requestChannel: RequestChannel,
     // ensureTopicExists is only for client facing requests
     // We can't have the ensureTopicExists check here since the controller sends it as an
advisory to all brokers so they
     // stop serving data to clients for the topic being deleted
-    val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
-
-    authorizeClusterAction(request)
+    val correlationId = request.header.correlationId
+    val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
 
     try {
       def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition])
{
@@ -131,10 +130,17 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       }
 
-      // call replica manager to handle updating partitions to become leader or follower
-      val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, metadataCache,
onLeadershipChange)
-      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId,
result.responseMap, result.errorCode)
-      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId,
leaderAndIsrResponse)))
+      val responseHeader = new ResponseHeader(correlationId)
+      val leaderAndIsrResponse=
+        if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
+          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest,
metadataCache, onLeadershipChange)
+          new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
+        } else {
+          val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED.code)).toMap
+          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.mapValues(new
JShort(_)).asJava)
+        }
+
+      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId,
responseHeader, leaderAndIsrResponse)))
     } catch {
       case e: KafkaStorageException =>
         fatal("Disk error during leadership change.", e)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3432e85f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7a99aad..3c2fa36 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -22,18 +22,21 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
 import com.yammer.metrics.core.Gauge
 import kafka.api._
-import kafka.cluster.{BrokerEndPoint, Partition, Replica}
+import kafka.cluster.{Partition, Replica}
 import kafka.common._
 import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogManager}
 import kafka.message.{ByteBufferMessageSet, MessageSet}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.apache.kafka.common.utils.{Time => JTime}
 
 import scala.collection._
+import scala.collection.JavaConverters._
 
 /*
  * Result metadata of a log append operation on the log
@@ -79,7 +82,7 @@ object LogReadResult {
                                            false)
 }
 
-case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int), Short],
errorCode: Short) {
+case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Short],
errorCode: Short) {
 
   override def toString = {
     "update results: [%s], global error: [%d]".format(responseMap, errorCode)
@@ -579,66 +582,65 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
+  def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
                              metadataCache: MetadataCache,
                              onLeadershipChange: (Iterable[Partition], Iterable[Partition])
=> Unit): BecomeLeaderOrFollowerResult = {
-    leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo)
=>
+    leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo)
=>
       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id
%d from controller %d epoch %d for partition [%s,%d]"
-                                .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
-                                        leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch,
topic, partition))
+                                .format(localBrokerId, stateInfo, correlationId,
+                                        leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch,
topicPartition.topic, topicPartition.partition))
     }
     replicaStateChangeLock synchronized {
-      val responseMap = new mutable.HashMap[(String, Int), Short]
+      val responseMap = new mutable.HashMap[TopicPartition, Short]
       if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
-        leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo)
=>
+        leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo)
=>
         stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller
%d with correlation id %d since " +
           "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId,
leaderAndISRRequest.controllerId,
-          leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
+          correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
         }
         BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.StaleControllerEpochCode)
       } else {
         val controllerId = leaderAndISRRequest.controllerId
-        val correlationId = leaderAndISRRequest.correlationId
         controllerEpoch = leaderAndISRRequest.controllerEpoch
 
         // First check partition's leader epoch
-        val partitionState = new mutable.HashMap[Partition, PartitionStateInfo]()
-        leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partitionId), partitionStateInfo)
=>
-          val partition = getOrCreatePartition(topic, partitionId)
+        val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
+        leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo)
=>
+          val partition = getOrCreatePartition(topicPartition.topic, topicPartition.partition)
           val partitionLeaderEpoch = partition.getLeaderEpoch()
           // If the leader epoch is valid record the epoch of the controller that made the
leadership decision.
           // This is useful while updating the isr to maintain the decision maker controller's
epoch in the zookeeper path
-          if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)
{
-            if(partitionStateInfo.allReplicas.contains(config.brokerId))
-              partitionState.put(partition, partitionStateInfo)
+          if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
+            if(stateInfo.replicas.contains(config.brokerId))
+              partitionState.put(partition, stateInfo)
             else {
               stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller
%d with correlation id %d " +
                 "epoch %d for partition [%s,%d] as itself is not in assigned replica list
%s")
                 .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
-                topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(",")))
-              responseMap.put((topic, partitionId), ErrorMapping.UnknownTopicOrPartitionCode)
+                  topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
+              responseMap.put(topicPartition, ErrorMapping.UnknownTopicOrPartitionCode)
             }
           } else {
             // Otherwise record the error code in response
             stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller
%d with correlation id %d " +
               "epoch %d for partition [%s,%d] since its associated leader epoch %d is old.
Current leader epoch is %d")
               .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
-              topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch,
partitionLeaderEpoch))
-            responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
+                topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
+            responseMap.put(topicPartition, ErrorMapping.StaleLeaderEpochCode)
           }
         }
 
-        val partitionsTobeLeader = partitionState.filter { case (partition, partitionStateInfo)
=>
-          partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId
+        val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
+          stateInfo.leader == config.brokerId
         }
         val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
 
         val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)
-          makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId,
responseMap)
+          makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId,
responseMap)
         else
           Set.empty[Partition]
         val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
-          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.correlationId,
responseMap, metadataCache)
+          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId,
responseMap, metadataCache)
         else
           Set.empty[Partition]
 
@@ -671,16 +673,16 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeLeaders(controllerId: Int,
                           epoch: Int,
-                          partitionState: Map[Partition, PartitionStateInfo],
+                          partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
                           correlationId: Int,
-                          responseMap: mutable.Map[(String, Int), Short]): Set[Partition]
= {
+                          responseMap: mutable.Map[TopicPartition, Short]): Set[Partition]
= {
     partitionState.foreach(state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d
from controller %d epoch %d " +
         "starting the become-leader transition for partition %s")
         .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic,
state._1.partitionId))))
 
     for (partition <- partitionState.keys)
-      responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
+      responseMap.put(new TopicPartition(partition.topic, partition.partitionId), ErrorMapping.NoError)
 
     val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
 
@@ -741,9 +743,9 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeFollowers(controllerId: Int,
                             epoch: Int,
-                            partitionState: Map[Partition, PartitionStateInfo],
+                            partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
                             correlationId: Int,
-                            responseMap: mutable.Map[(String, Int), Short],
+                            responseMap: mutable.Map[TopicPartition, Short],
                             metadataCache: MetadataCache) : Set[Partition] = {
     partitionState.foreach { state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d
from controller %d epoch %d " +
@@ -752,7 +754,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
 
     for (partition <- partitionState.keys)
-      responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
+      responseMap.put(new TopicPartition(partition.topic, partition.partitionId), ErrorMapping.NoError)
 
     val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
 
@@ -760,8 +762,7 @@ class ReplicaManager(val config: KafkaConfig,
 
       // TODO: Delete leaders from LeaderAndIsrRequest
       partitionState.foreach{ case (partition, partitionStateInfo) =>
-        val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
-        val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
+        val newLeaderBrokerId = partitionStateInfo.leader
         metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
           // Only change partition state when the leader is available
           case Some(leaderBroker) =>
@@ -770,14 +771,14 @@ class ReplicaManager(val config: KafkaConfig,
             else
               stateChangeLogger.info(("Broker %d skipped the become-follower state change
after marking its partition as follower with correlation id %d from " +
                 "controller %d epoch %d for partition [%s,%d] since the new leader %d is
the same as the old leader")
-                .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+                .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
                 partition.topic, partition.partitionId, newLeaderBrokerId))
           case None =>
             // The leader broker should always be present in the metadata cache.
             // If not, we should record the error message and abort the transition process
for this partition
             stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation
id %d from controller" +
               " %d epoch %d for partition [%s,%d] but cannot become follower since the new
leader %d is unavailable.")
-              .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+              .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
               partition.topic, partition.partitionId, newLeaderBrokerId))
             // Create the local replica even if the leader is unavailable. This is required
to ensure that we include
             // the partition's high watermark in the checkpoint file (see KAFKA-1647)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3432e85f/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 90f629a..c645102 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -17,13 +17,9 @@
 
 package kafka.api
 
-
-import java.nio.channels.GatheringByteChannel
-
-import kafka.cluster.{BrokerEndPoint, EndPoint, Broker}
+import kafka.cluster.{EndPoint, Broker}
 import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError}
 import kafka.common._
-import kafka.consumer.FetchRequestAndResponseStatsRegistry
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.utils.SystemTime
 
@@ -120,20 +116,6 @@ object SerializationTestUtils {
     TopicAndPartition(topic1,3) -> partitionStateInfo3
   )
 
-  def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = {
-    val leaderAndIsr1 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader1, 1, isr1,
1), 1)
-    val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2,
2), 1)
-    val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, isr1.toSet)),
-                  ((topic2, 0), PartitionStateInfo(leaderAndIsr2, isr2.toSet)))
-    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[BrokerEndPoint](), 0, 1,
0, "")
-  }
-
-  def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
-    val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
-                          ((topic2, 0), ErrorMapping.NoError))
-    new LeaderAndIsrResponse(1, responseMap)
-  }
-
   def createTestStopReplicaRequest() : StopReplicaRequest = {
     new StopReplicaRequest(controllerId = 0, controllerEpoch = 1, correlationId = 0, deletePartitions
= true,
                            partitions = collection.immutable.Set(TopicAndPartition(topic1,
0),TopicAndPartition(topic2, 0)))
@@ -257,8 +239,6 @@ object SerializationTestUtils {
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
-  private val leaderAndIsrRequest = SerializationTestUtils.createTestLeaderAndIsrRequest
-  private val leaderAndIsrResponse = SerializationTestUtils.createTestLeaderAndIsrResponse
   private val stopReplicaRequest = SerializationTestUtils.createTestStopReplicaRequest
   private val stopReplicaResponse = SerializationTestUtils.createTestStopReplicaResponse
   private val producerRequest = SerializationTestUtils.createTestProducerRequest
@@ -286,7 +266,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   def testSerializationAndDeserialization() {
 
     val requestsAndResponses =
-      collection.immutable.Seq(leaderAndIsrRequest, leaderAndIsrResponse, stopReplicaRequest,
+      collection.immutable.Seq(stopReplicaRequest,
                                stopReplicaResponse, producerRequest, producerResponse,
                                fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest,
                                topicMetadataResponse,


Mime
View raw message