kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5627; Reduce classes needed for LeaderAndIsrPartitionState and MetadataPartitionState
Date Wed, 26 Jul 2017 03:38:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 91c207c2c -> f15cdbc91


KAFKA-5627; Reduce classes needed for LeaderAndIsrPartitionState and MetadataPartitionState

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Jiangjie Qin <becket.qin@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3565 from lindong28/KAFKA-5627


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f15cdbc9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f15cdbc9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f15cdbc9

Branch: refs/heads/trunk
Commit: f15cdbc91b240e656d9a2aeb6877e94624b21f8d
Parents: 91c207c
Author: Dong Lin <lindong28@gmail.com>
Authored: Tue Jul 25 20:38:30 2017 -0700
Committer: Jiangjie Qin <becket.qin@gmail.com>
Committed: Tue Jul 25 20:38:30 2017 -0700

----------------------------------------------------------------------
 .../common/requests/BasePartitionState.java     | 40 +++++++++++++++
 .../common/requests/LeaderAndIsrRequest.java    | 41 ++++++++++++---
 .../kafka/common/requests/PartitionState.java   | 53 --------------------
 .../common/requests/UpdateMetadataRequest.java  | 40 ++++++---------
 .../common/requests/RequestResponseTest.java    |  8 +--
 .../src/main/scala/kafka/api/LeaderAndIsr.scala | 27 ----------
 .../main/scala/kafka/cluster/Partition.scala    | 29 +++++------
 .../controller/ControllerChannelManager.scala   | 49 +++++++++---------
 .../kafka/server/DelayedCreateTopics.scala      |  4 +-
 .../main/scala/kafka/server/MetadataCache.scala | 34 +++++--------
 .../scala/kafka/server/ReplicaManager.scala     | 26 +++++-----
 .../kafka/api/AuthorizerIntegrationTest.scala   |  3 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala | 14 +++---
 .../unit/kafka/server/LeaderElectionTest.scala  |  2 +-
 .../unit/kafka/server/MetadataCacheTest.scala   |  6 +--
 .../unit/kafka/server/ReplicaManagerTest.scala  | 12 ++---
 .../unit/kafka/server/RequestQuotaTest.scala    |  2 +-
 .../epoch/LeaderEpochIntegrationTest.scala      |  4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  2 +-
 .../integration/utils/IntegrationTestUtils.java |  8 +--
 20 files changed, 187 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/clients/src/main/java/org/apache/kafka/common/requests/BasePartitionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/BasePartitionState.java b/clients/src/main/java/org/apache/kafka/common/requests/BasePartitionState.java
new file mode 100644
index 0000000..c9a4610
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/BasePartitionState.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.util.List;
+
+// This class contains the common fields shared between LeaderAndIsrRequest.PartitionState and UpdateMetadataRequest.PartitionState
+public class BasePartitionState {
+
+    public final int controllerEpoch;
+    public final int leader;
+    public final int leaderEpoch;
+    public final List<Integer> isr;
+    public final int zkVersion;
+    public final List<Integer> replicas;
+
+    BasePartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, List<Integer> replicas) {
+        this.controllerEpoch = controllerEpoch;
+        this.leader = leader;
+        this.leaderEpoch = leaderEpoch;
+        this.isr = isr;
+        this.zkVersion = zkVersion;
+        this.replicas = replicas;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 733c9af..bd379b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -91,7 +91,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
     private final Set<Node> liveLeaders;
 
     private LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates,
-                               Set<Node> liveLeaders, short version) {
+                                Set<Node> liveLeaders, short version) {
         super(version);
         this.controllerId = controllerId;
         this.controllerEpoch = controllerEpoch;
@@ -157,12 +157,12 @@ public class LeaderAndIsrRequest extends AbstractRequest {
             partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
             partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition());
             PartitionState partitionState = entry.getValue();
-            partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch);
-            partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
-            partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch);
-            partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
-            partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion);
-            partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray());
+            partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.basePartitionState.controllerEpoch);
+            partitionStateData.set(LEADER_KEY_NAME, partitionState.basePartitionState.leader);
+            partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.basePartitionState.leaderEpoch);
+            partitionStateData.set(ISR_KEY_NAME, partitionState.basePartitionState.isr.toArray());
+            partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.basePartitionState.zkVersion);
+            partitionStateData.set(REPLICAS_KEY_NAME, partitionState.basePartitionState.replicas.toArray());
             if (partitionStateData.hasField(IS_NEW_KEY_NAME))
                 partitionStateData.set(IS_NEW_KEY_NAME, partitionState.isNew);
             partitionStatesData.add(partitionStateData);
@@ -219,4 +219,31 @@ public class LeaderAndIsrRequest extends AbstractRequest {
         return new LeaderAndIsrRequest(ApiKeys.LEADER_AND_ISR.parseRequest(version, buffer), version);
     }
 
+    public static final class PartitionState {
+        public final BasePartitionState basePartitionState;
+        public final boolean isNew;
+
+        public PartitionState(int controllerEpoch,
+                              int leader,
+                              int leaderEpoch,
+                              List<Integer> isr,
+                              int zkVersion,
+                              List<Integer> replicas,
+                              boolean isNew) {
+            this.basePartitionState = new BasePartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
+            this.isNew = isNew;
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionState(controllerEpoch=" + basePartitionState.controllerEpoch +
+                ", leader=" + basePartitionState.leader +
+                ", leaderEpoch=" + basePartitionState.leaderEpoch +
+                ", isr=" + Utils.join(basePartitionState.isr, ",") +
+                ", zkVersion=" + basePartitionState.zkVersion +
+                ", replicas=" + Utils.join(basePartitionState.replicas, ",") +
+                ", isNew=" + isNew + ")";
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
deleted file mode 100644
index be303d9..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.utils.Utils;
-
-import java.util.List;
-
-// This is used to describe per-partition state in the LeaderAndIsrRequest
-public class PartitionState {
-    public final int controllerEpoch;
-    public final int leader;
-    public final int leaderEpoch;
-    public final List<Integer> isr;
-    public final int zkVersion;
-    public final List<Integer> replicas;
-    public final boolean isNew;
-
-    public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, List<Integer> replicas, boolean isNew) {
-        this.controllerEpoch = controllerEpoch;
-        this.leader = leader;
-        this.leaderEpoch = leaderEpoch;
-        this.isr = isr;
-        this.zkVersion = zkVersion;
-        this.replicas = replicas;
-        this.isNew = isNew;
-    }
-
-    @Override
-    public String toString() {
-        return "PartitionState(controllerEpoch=" + controllerEpoch +
-                ", leader=" + leader +
-                ", leaderEpoch=" + leaderEpoch +
-                ", isr=" + Utils.join(isr, ",") +
-                ", zkVersion=" + zkVersion +
-                ", replicas=" + Utils.join(replicas, ",") +
-                ", isNew=" + isNew + ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 1e20866..5f17d0b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -76,12 +76,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     }
 
     public static final class PartitionState {
-        public final int controllerEpoch;
-        public final int leader;
-        public final int leaderEpoch;
-        public final List<Integer> isr;
-        public final int zkVersion;
-        public final List<Integer> replicas;
+        public final BasePartitionState basePartitionState;
         public final List<Integer> offlineReplicas;
 
         public PartitionState(int controllerEpoch,
@@ -91,24 +86,19 @@ public class UpdateMetadataRequest extends AbstractRequest {
                               int zkVersion,
                               List<Integer> replicas,
                               List<Integer> offlineReplicas) {
-            this.controllerEpoch = controllerEpoch;
-            this.leader = leader;
-            this.leaderEpoch = leaderEpoch;
-            this.isr = isr;
-            this.zkVersion = zkVersion;
-            this.replicas = replicas;
+            this.basePartitionState = new BasePartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas);
             this.offlineReplicas = offlineReplicas;
         }
 
         @Override
         public String toString() {
-            return "PartitionState(controllerEpoch=" + controllerEpoch +
-                ", leader=" + leader +
-                ", leaderEpoch=" + leaderEpoch +
-                ", isr=" + Arrays.toString(isr.toArray()) +
-                ", zkVersion=" + zkVersion +
-                ", replicas=" + Arrays.toString(replicas.toArray()) +
-                ", offlineReplicas=" + Arrays.toString(replicas.toArray()) + ")";
+            return "PartitionState(controllerEpoch=" + basePartitionState.controllerEpoch +
+                ", leader=" + basePartitionState.leader +
+                ", leaderEpoch=" + basePartitionState.leaderEpoch +
+                ", isr=" + Arrays.toString(basePartitionState.isr.toArray()) +
+                ", zkVersion=" + basePartitionState.zkVersion +
+                ", replicas=" + Arrays.toString(basePartitionState.replicas.toArray()) +
+                ", offlineReplicas=" + Arrays.toString(offlineReplicas.toArray()) + ")";
         }
     }
 
@@ -285,12 +275,12 @@ public class UpdateMetadataRequest extends AbstractRequest {
             partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic());
             partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition());
             PartitionState partitionState = entry.getValue();
-            partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch);
-            partitionStateData.set(LEADER_KEY_NAME, partitionState.leader);
-            partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch);
-            partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray());
-            partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion);
-            partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray());
+            partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.basePartitionState.controllerEpoch);
+            partitionStateData.set(LEADER_KEY_NAME, partitionState.basePartitionState.leader);
+            partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.basePartitionState.leaderEpoch);
+            partitionStateData.set(ISR_KEY_NAME, partitionState.basePartitionState.isr.toArray());
+            partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.basePartitionState.zkVersion);
+            partitionStateData.set(REPLICAS_KEY_NAME, partitionState.basePartitionState.replicas.toArray());
             if (partitionStateData.hasField(OFFLINE_REPLICAS_KEY_NAME))
               partitionStateData.set(OFFLINE_REPLICAS_KEY_NAME, partitionState.offlineReplicas.toArray());
             partitionStatesData.add(partitionStateData);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index a1c2a83..c2c9a4d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -804,15 +804,15 @@ public class RequestResponseTest {
     }
 
     private LeaderAndIsrRequest createLeaderAndIsrRequest() {
-        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
+        Map<TopicPartition, LeaderAndIsrRequest.PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = asList(1, 2);
         List<Integer> replicas = asList(1, 2, 3, 4);
         partitionStates.put(new TopicPartition("topic5", 105),
-                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas, false));
+                new LeaderAndIsrRequest.PartitionState(0, 2, 1, new ArrayList<>(isr), 2, replicas, false));
         partitionStates.put(new TopicPartition("topic5", 1),
-                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas, false));
+                new LeaderAndIsrRequest.PartitionState(1, 1, 1, new ArrayList<>(isr), 2, replicas, false));
         partitionStates.put(new TopicPartition("topic20", 1),
-                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas, false));
+                new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr), 2, replicas, false));
 
         Set<Node> leaders = Utils.mkSet(
                 new Node(0, "test0", 1223),

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index e92dc33..029d476 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -17,11 +17,8 @@
 
 package kafka.api
 
-import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.utils._
 
-import scala.collection.Set
-
 object LeaderAndIsr {
   val initialLeaderEpoch: Int = 0
   val initialZKVersion: Int = 0
@@ -49,27 +46,3 @@ case class LeaderAndIsr(leader: Int,
     Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr))
   }
 }
-
-case class LeaderAndIsrPartitionState(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int], isNew: Boolean) {
-
-  override def toString: String = {
-    val partitionStateInfo = new StringBuilder
-    partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
-    partitionStateInfo.append(",ReplicationFactor:" + allReplicas.size + ")")
-    partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
-    partitionStateInfo.append(",isNew:" + isNew + ")")
-    partitionStateInfo.toString()
-  }
-}
-
-case class MetadataPartitionState(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Seq[Int], offlineReplicas: Seq[Int]) {
-
-  override def toString: String = {
-    val partitionStateInfo = new StringBuilder
-    partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
-    partitionStateInfo.append(",ReplicationFactor:" + allReplicas.size + ")")
-    partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
-    partitionStateInfo.append(",OfflineReplicas:" + offlineReplicas.mkString(",") + ")")
-    partitionStateInfo.toString()
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 2c4767b..1d7f7ff 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,6 +18,7 @@ package kafka.cluster
 
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
+
 import com.yammer.metrics.core.Gauge
 import kafka.admin.AdminUtils
 import kafka.api.LeaderAndIsr
@@ -34,7 +35,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.EpochEndOffset._
-import org.apache.kafka.common.requests.{EpochEndOffset, PartitionState}
+import org.apache.kafka.common.requests.{EpochEndOffset, LeaderAndIsrRequest}
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
@@ -168,25 +169,25 @@ class Partition(val topic: String,
    * from the time when this broker was the leader last time) and setting the new leader and ISR.
    * If the leader replica id does not change, return false to indicate the replica manager.
    */
-  def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
+  def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
-      val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
+      val allReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
-      controllerEpoch = partitionStateInfo.controllerEpoch
+      controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
       // add replicas that are new
-      val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet
+      val newInSyncReplicas = partitionStateInfo.basePartitionState.isr.asScala.map(r => getOrCreateReplica(r, partitionStateInfo.isNew)).toSet
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
       inSyncReplicas = newInSyncReplicas
 
-      info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.leaderEpoch} from offset ${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was: $leaderEpoch")
+      info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.basePartitionState.leaderEpoch} from offset ${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was: $leaderEpoch")
 
       //We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
-      leaderEpoch = partitionStateInfo.leaderEpoch
+      leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
       allReplicas.foreach(id => getOrCreateReplica(id, partitionStateInfo.isNew))
 
-      zkVersion = partitionStateInfo.zkVersion
+      zkVersion = partitionStateInfo.basePartitionState.zkVersion
       val isNewLeader =
         if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
           false
@@ -221,20 +222,20 @@ class Partition(val topic: String,
    *  Make the local replica the follower by setting the new leader and ISR to empty
    *  If the leader replica id does not change, return false to indicate the replica manager
    */
-  def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
+  def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
-      val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
-      val newLeaderBrokerId: Int = partitionStateInfo.leader
+      val allReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
+      val newLeaderBrokerId: Int = partitionStateInfo.basePartitionState.leader
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
-      controllerEpoch = partitionStateInfo.controllerEpoch
+      controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
       // add replicas that are new
       allReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew))
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
       inSyncReplicas = Set.empty[Replica]
-      leaderEpoch = partitionStateInfo.leaderEpoch
-      zkVersion = partitionStateInfo.zkVersion
+      leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
+      zkVersion = partitionStateInfo.basePartitionState.zkVersion
 
       if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
         false

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index bb3b14b..8f9630e 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -34,12 +34,13 @@ import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{Node, TopicPartition, requests}
+import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.collection.{Set, mutable}
 
+
 object ControllerChannelManager {
   val QueueSizeMetricName = "QueueSize"
 }
@@ -281,10 +282,10 @@ class RequestSendThread(val controllerId: Int,
 class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging {
   val controllerContext = controller.controllerContext
   val controllerId: Int = controller.config.brokerId
-  val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, LeaderAndIsrPartitionState]]
+  val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, LeaderAndIsrRequest.PartitionState]]
   val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
   val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
-  val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, MetadataPartitionState]
+  val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataRequest.PartitionState]
   private val stateChangeLogger = KafkaController.stateChangeLogger
 
   def newBatch() {
@@ -316,7 +317,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
     brokerIds.filter(_ >= 0).foreach { brokerId =>
       val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
       val alreadyNew = result.get(topicPartition).exists(_.isNew)
-      result.put(topicPartition, LeaderAndIsrPartitionState(leaderIsrAndControllerEpoch, replicas, isNew || alreadyNew))
+      result.put(topicPartition, new LeaderAndIsrRequest.PartitionState(leaderIsrAndControllerEpoch.controllerEpoch,
+        leaderIsrAndControllerEpoch.leaderAndIsr.leader,
+        leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch,
+        leaderIsrAndControllerEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava,
+        leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion,
+        replicas.map(Integer.valueOf).asJava,
+        isNew || alreadyNew))
     }
 
     addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
@@ -354,7 +361,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
             l
           }
 
-          val partitionStateInfo = MetadataPartitionState(leaderIsrAndControllerEpoch, replicas, offlineReplicas)
+          val partitionStateInfo = new UpdateMetadataRequest.PartitionState(leaderIsrAndControllerEpoch.controllerEpoch,
+            leaderIsrAndControllerEpoch.leaderAndIsr.leader,
+            leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch,
+            leaderIsrAndControllerEpoch.leaderAndIsr.isr.map(Integer.valueOf).asJava,
+            leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion,
+            replicas.map(Integer.valueOf).asJava,
+            offlineReplicas.map(Integer.valueOf).asJava)
           updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
 
         case None =>
@@ -387,42 +400,30 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
       leaderAndIsrRequestMap.foreach { case (broker, leaderAndIsrPartitionStates) =>
         leaderAndIsrPartitionStates.foreach { case (topicPartition, state) =>
           val typeOfRequest =
-            if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader"
+            if (broker == state.basePartitionState.leader) "become-leader"
             else "become-follower"
           stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
                                    "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
-                                                                   state.leaderIsrAndControllerEpoch, broker,
+                                                                   state, broker,
                                                                    topicPartition.topic, topicPartition.partition))
         }
-        val leaderIds = leaderAndIsrPartitionStates.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
+        val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet
         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
           _.getNode(controller.config.interBrokerListenerName)
         }
-        val partitionStates = leaderAndIsrPartitionStates.map { case (topicPartition, leaderAndIsrPartitionState) =>
-          val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = leaderAndIsrPartitionState.leaderIsrAndControllerEpoch
-          val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
-            leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
-            leaderAndIsrPartitionState.allReplicas.map(Integer.valueOf).asJava, leaderAndIsrPartitionState.isNew)
-          topicPartition -> partitionState
-        }
         val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
-          controllerEpoch, partitionStates.asJava, leaders.asJava)
+          controllerEpoch, leaderAndIsrPartitionStates.asJava, leaders.asJava)
         controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequestBuilder,
           (r: AbstractResponse) => controller.eventManager.put(controller.LeaderAndIsrResponseReceived(r, broker)))
       }
       leaderAndIsrRequestMap.clear()
 
       updateMetadataRequestPartitionInfoMap.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
-        "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
+        "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2,
         updateMetadataRequestBrokerSet.toString(), p._1)))
-      val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) =>
-        val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
-        val partitionState = new UpdateMetadataRequest.PartitionState(controllerEpoch, leaderIsr.leader,
-          leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
-          partitionStateInfo.allReplicas.map(Integer.valueOf).asJava, partitionStateInfo.offlineReplicas.map(Integer.valueOf).asJava)
-        topicPartition -> partitionState
-      }
+      // Copy the updateMetadataRequestPartitionInfoMap
 
+      val partitionStates = Map(updateMetadataRequestPartitionInfoMap.toArray:_*)
       val updateMetadataRequestVersion: Short =
         if (controller.config.interBrokerProtocolVersion >= KAFKA_0_11_1_IV0) 4
         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
index 83cdd67..50b8143 100644
--- a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
+++ b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import kafka.api.LeaderAndIsr
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{ApiError, CreateTopicsResponse}
+import org.apache.kafka.common.requests.ApiError
 
 import scala.collection._
 
@@ -87,6 +87,6 @@ class DelayedCreateTopics(delayMs: Long,
 
   private def isMissingLeader(topic: String, partition: Int): Boolean = {
     val partitionInfo = adminManager.metadataCache.getPartitionInfo(topic, partition)
-    partitionInfo.isEmpty || partitionInfo.get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.NoLeader
+    partitionInfo.isEmpty || partitionInfo.get.basePartitionState.leader == LeaderAndIsr.NoLeader
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 2c28df7..9464941 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.api._
 import kafka.common.{BrokerEndPointNotAvailableException, TopicAndPartition}
-import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch}
+import kafka.controller.KafkaController
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.internals.Topic
@@ -39,7 +39,7 @@ import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest
  */
 class MetadataCache(brokerId: Int) extends Logging {
   private val stateChangeLogger = KafkaController.stateChangeLogger
-  private val cache = mutable.Map[String, mutable.Map[Int, MetadataPartitionState]]()
+  private val cache = mutable.Map[String, mutable.Map[Int, UpdateMetadataRequest.PartitionState]]()
   private var controllerId: Option[Int] = None
   private val aliveBrokers = mutable.Map[Int, Broker]()
   private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, Node]]()
@@ -67,13 +67,10 @@ class MetadataCache(brokerId: Int) extends Logging {
     cache.get(topic).map { partitions =>
       partitions.map { case (partitionId, partitionState) =>
         val topicPartition = TopicAndPartition(topic, partitionId)
-
-        val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
-        val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, listenerName)
-
-        val replicas = partitionState.allReplicas
+        val maybeLeader = getAliveEndpoint(partitionState.basePartitionState.leader, listenerName)
+        val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt)
         val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints)
-        val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas, listenerName, errorUnavailableEndpoints)
+        val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints)
 
         maybeLeader match {
           case None =>
@@ -82,7 +79,7 @@ class MetadataCache(brokerId: Int) extends Logging {
               replicaInfo.asJava, java.util.Collections.emptyList(), offlineReplicaInfo.asJava)
 
           case Some(leader) =>
-            val isr = leaderAndIsr.isr
+            val isr = partitionState.basePartitionState.isr.asScala.map(_.toInt)
             val isrInfo = getEndpoints(isr, listenerName, errorUnavailableEndpoints)
 
             if (replicaInfo.size < replicas.size) {
@@ -148,14 +145,14 @@ class MetadataCache(brokerId: Int) extends Logging {
 
   private def addOrUpdatePartitionInfo(topic: String,
                                        partitionId: Int,
-                                       stateInfo: MetadataPartitionState) {
+                                       stateInfo: UpdateMetadataRequest.PartitionState) {
     inWriteLock(partitionMetadataLock) {
       val infos = cache.getOrElseUpdate(topic, mutable.Map())
       infos(partitionId) = stateInfo
     }
   }
 
-  def getPartitionInfo(topic: String, partitionId: Int): Option[MetadataPartitionState] = {
+  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequest.PartitionState] = {
     inReadLock(partitionMetadataLock) {
       cache.get(topic).flatMap(_.get(partitionId))
     }
@@ -167,7 +164,7 @@ class MetadataCache(brokerId: Int) extends Logging {
   def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] = {
     inReadLock(partitionMetadataLock) {
       cache.get(topic).flatMap(_.get(partitionId)) map { partitionInfo =>
-        val leaderId = partitionInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+        val leaderId = partitionInfo.basePartitionState.leader
 
         aliveNodes.get(leaderId) match {
           case Some(nodeMap) =>
@@ -208,15 +205,14 @@ class MetadataCache(brokerId: Int) extends Logging {
       updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
         val controllerId = updateMetadataRequest.controllerId
         val controllerEpoch = updateMetadataRequest.controllerEpoch
-        if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
+        if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) {
           removePartitionInfo(tp.topic, tp.partition)
           stateChangeLogger.trace(s"Broker $brokerId deleted partition $tp from metadata cache in response to UpdateMetadata " +
             s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
           deletedPartitions += tp
         } else {
-          val partitionInfo = partitionStateToPartitionStateInfo(info)
-          addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
-          stateChangeLogger.trace(s"Broker $brokerId cached leader info $partitionInfo for partition $tp in response to " +
+          addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
+          stateChangeLogger.trace(s"Broker $brokerId cached leader info $info for partition $tp in response to " +
             s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
         }
       }
@@ -224,12 +220,6 @@ class MetadataCache(brokerId: Int) extends Logging {
     }
   }
 
-  private def partitionStateToPartitionStateInfo(partitionState: UpdateMetadataRequest.PartitionState): MetadataPartitionState = {
-    val leaderAndIsr = LeaderAndIsr(partitionState.leader, partitionState.leaderEpoch, partitionState.isr.asScala.map(_.toInt).toList, partitionState.zkVersion)
-    val leaderInfo = LeaderIsrAndControllerEpoch(leaderAndIsr, partitionState.controllerEpoch)
-    MetadataPartitionState(leaderInfo, partitionState.replicas.asScala.map(_.toInt), partitionState.offlineReplicas.asScala.map(_.toInt))
-  }
-
   def contains(topic: String): Boolean = {
     inReadLock(partitionMetadataLock) {
       cache.contains(topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 40887be..6cf2540 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -41,7 +41,7 @@ import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, LeaderAndIsrRequest, PartitionState, StopReplicaRequest, UpdateMetadataRequest, _}
+import org.apache.kafka.common.requests.{DeleteRecordsRequest, DeleteRecordsResponse, LeaderAndIsrRequest, StopReplicaRequest, UpdateMetadataRequest, _}
 import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
@@ -897,7 +897,7 @@ class ReplicaManager(val config: KafkaConfig,
         controllerEpoch = leaderAndISRRequest.controllerEpoch
 
         // First check partition's leader epoch
-        val partitionState = new mutable.HashMap[Partition, PartitionState]()
+        val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
         leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
           val partition = getOrCreatePartition(topicPartition)
           val partitionLeaderEpoch = partition.getLeaderEpoch
@@ -906,16 +906,16 @@ class ReplicaManager(val config: KafkaConfig,
               "epoch %d for partition [%s,%d] as the local replica for the partition is in an offline log directory")
               .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
             responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-          } else if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
+          } else if (partitionLeaderEpoch < stateInfo.basePartitionState.leaderEpoch) {
             // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
             // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-            if(stateInfo.replicas.contains(localBrokerId))
+            if(stateInfo.basePartitionState.replicas.contains(localBrokerId))
               partitionState.put(partition, stateInfo)
             else {
               stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
                 "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
                 .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
-                  topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
+                  topicPartition.topic, topicPartition.partition, stateInfo.basePartitionState.replicas.asScala.mkString(",")))
               responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
             }
           } else {
@@ -923,13 +923,13 @@ class ReplicaManager(val config: KafkaConfig,
             stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
               "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d")
               .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
-                topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
+                topicPartition.topic, topicPartition.partition, stateInfo.basePartitionState.leaderEpoch, partitionLeaderEpoch))
             responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
           }
         }
 
         val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
-          stateInfo.leader == localBrokerId
+          stateInfo.basePartitionState.leader == localBrokerId
         }
         val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
 
@@ -981,7 +981,7 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeLeaders(controllerId: Int,
                           epoch: Int,
-                          partitionState: Map[Partition, PartitionState],
+                          partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
                           correlationId: Int,
                           responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
     partitionState.keys.foreach { partition =>
@@ -1061,7 +1061,7 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeFollowers(controllerId: Int,
                             epoch: Int,
-                            partitionState: Map[Partition, PartitionState],
+                            partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
                             correlationId: Int,
                             responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = {
     partitionState.keys.foreach { partition =>
@@ -1080,7 +1080,7 @@ class ReplicaManager(val config: KafkaConfig,
       // TODO: Delete leaders from LeaderAndIsrRequest
       partitionState.foreach{ case (partition, partitionStateInfo) =>
         try {
-          val newLeaderBrokerId = partitionStateInfo.leader
+          val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
           metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
             // Only change partition state when the leader is available
             case Some(_) =>
@@ -1089,14 +1089,14 @@ class ReplicaManager(val config: KafkaConfig,
               else
                 stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
                   "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
-                  .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
+                  .format(localBrokerId, correlationId, controllerId, partitionStateInfo.basePartitionState.controllerEpoch,
                     partition.topicPartition, newLeaderBrokerId))
             case None =>
               // The leader broker should always be present in the metadata cache.
               // If not, we should record the error message and abort the transition process for this partition
               stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
                 " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
-                .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
+                .format(localBrokerId, correlationId, controllerId, partitionStateInfo.basePartitionState.controllerEpoch,
                   partition.topicPartition, newLeaderBrokerId))
               // Create the local replica even if the leader is unavailable. This is required to ensure that we include
               // the partition's high watermark in the checkpoint file (see KAFKA-1647)
@@ -1106,7 +1106,7 @@ class ReplicaManager(val config: KafkaConfig,
           case e: KafkaStorageException =>
             stateChangeLogger.error(("Broker %d skipped the become-follower state change with correlation id %d from " +
               "controller %d epoch %d for partition [%s,%d] since the replica for the partition is offline due to disk error %s")
-              .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, partition.topic, partition.partitionId, e))
+              .format(localBrokerId, correlationId, controllerId, partitionStateInfo.basePartitionState.controllerEpoch, partition.topic, partition.partitionId, e))
             val dirOpt = getLogDir(new TopicPartition(partition.topic, partition.partitionId))
             error(s"Error while making broker the follower for partition $partition in dir $dirOpt", e)
             responseMap.put(new TopicPartition(partition.topic, partition.partitionId), Errors.KAFKA_STORAGE_ERROR)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 2b134fe..1a4de99 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -44,6 +44,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{Node, TopicPartition, requests}
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
@@ -304,7 +305,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def leaderAndIsrRequest = {
     new requests.LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue,
-      Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, false)).asJava,
+      Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, false)).asJava,
       Set(new Node(brokerId, "localhost", 0)).asJava).build()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 377501c..68e2c48 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -376,29 +376,29 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     var activeServers = servers.filter(s => s.config.brokerId != 2)
     // wait for the update metadata request to trickle to the brokers
     TestUtils.waitUntilTrue(() =>
-      activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
+      activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3),
       "Topic test not created after timeout")
     assertEquals(0, partitionsRemaining.size)
     var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
-    var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+    var leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
     assertEquals(0, leaderAfterShutdown)
-    assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
-    assertEquals(List(0,1), partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr)
+    assertEquals(2, partitionStateInfo.basePartitionState.isr.size)
+    assertEquals(List(0,1), partitionStateInfo.basePartitionState.isr.asScala)
 
     controller.shutdownBroker(1, controlledShutdownCallback)
     partitionsRemaining = resultQueue.take().get
     assertEquals(0, partitionsRemaining.size)
     activeServers = servers.filter(s => s.config.brokerId == 0)
     partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
-    leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+    leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
     assertEquals(0, leaderAfterShutdown)
 
-    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
     controller.shutdownBroker(0, controlledShutdownCallback)
     partitionsRemaining = resultQueue.take().get
     assertEquals(1, partitionsRemaining.size)
     // leader doesn't change since all the replicas are shut down
-    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 20526a1..0118a39 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -143,7 +143,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     try {
       val staleControllerEpoch = 0
       val partitionStates = Map(
-        new TopicPartition(topic, partitionId) -> new PartitionState(2, brokerId2, LeaderAndIsr.initialLeaderEpoch,
+        new TopicPartition(topic, partitionId) -> new LeaderAndIsrRequest.PartitionState(2, brokerId2, LeaderAndIsr.initialLeaderEpoch,
           Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, LeaderAndIsr.initialZKVersion,
           Seq(0, 1).map(Integer.valueOf).asJava, false)
       )

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 0e90121..de3ccdb 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -94,9 +94,9 @@ class MetadataCacheTest {
           assertEquals(partitionId, partitionMetadata.partition)
           val leader = partitionMetadata.leader
           val partitionState = topicPartitionStates(new TopicPartition(topic, partitionId))
-          assertEquals(partitionState.leader, leader.id)
-          assertEquals(partitionState.isr, partitionMetadata.isr.asScala.map(_.id).asJava)
-          assertEquals(partitionState.replicas, partitionMetadata.replicas.asScala.map(_.id).asJava)
+          assertEquals(partitionState.basePartitionState.leader, leader.id)
+          assertEquals(partitionState.basePartitionState.isr, partitionMetadata.isr.asScala.map(_.id).asJava)
+          assertEquals(partitionState.basePartitionState.replicas, partitionMetadata.replicas.asScala.map(_.id).asJava)
           val endPoint = endPoints(partitionMetadata.leader.id).find(_.listenerName == listenerName).get
           assertEquals(endPoint.host, leader.host)
           assertEquals(endPoint.port, leader.port)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6f2a6a7..f5581f8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -29,7 +29,7 @@ import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest, PartitionState}
+import org.apache.kafka.common.requests.{IsolationLevel, LeaderAndIsrRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
@@ -146,7 +146,7 @@ class ReplicaManagerTest {
       partition.getOrCreateReplica(0)
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList, false)).asJava,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, brokerList, 0, brokerList, false)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
@@ -163,7 +163,7 @@ class ReplicaManagerTest {
 
       // Make this replica the follower
       val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerList, false)).asJava,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new LeaderAndIsrRequest.PartitionState(0, 1, 1, brokerList, 0, brokerList, false)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
 
@@ -187,7 +187,7 @@ class ReplicaManagerTest {
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList, true)).asJava,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, brokerList, 0, brokerList, true)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
@@ -276,7 +276,7 @@ class ReplicaManagerTest {
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList, true)).asJava,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, brokerList, 0, brokerList, true)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       replicaManager.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
@@ -354,7 +354,7 @@ class ReplicaManagerTest {
 
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0,
-        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerList, false)).asJava,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new LeaderAndIsrRequest.PartitionState(0, 0, 0, brokerList, 0, brokerList, false)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ecb75ce..4b33dcb 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -181,7 +181,7 @@ class RequestQuotaTest extends BaseRequestTest {
 
         case ApiKeys.LEADER_AND_ISR =>
           new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, brokerId, Int.MaxValue,
-            Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, true)).asJava,
+            Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava, true)).asJava,
             Set(new Node(brokerId, "localhost", 0)).asJava)
 
         case ApiKeys.STOP_REPLICA =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index ecc13d9..9d9b1c7 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -20,7 +20,7 @@ import java.util.{Map => JMap}
 
 import kafka.admin.AdminUtils
 import kafka.server.KafkaConfig._
-import kafka.server.{BlockingSend, KafkaConfig, KafkaServer, ReplicaFetcherBlockingSend}
+import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend}
 import kafka.utils.TestUtils._
 import kafka.utils.{Logging, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
@@ -222,7 +222,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
   private def waitForEpochChangeTo(topic: String, partition: Int, epoch: Int): Unit = {
     TestUtils.waitUntilTrue(() => {
       brokers(0).metadataCache.getPartitionInfo(topic, partition) match {
-        case Some(m) => m.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch == epoch
+        case Some(m) => m.basePartitionState.leaderEpoch == epoch
         case None => false
       }
     }, "Epoch didn't change")

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d4f00a6..ad641c0 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -925,7 +925,7 @@ object TestUtils extends Logging {
           partitionStateOpt match {
             case None => false
             case Some(partitionState) =>
-              leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+              leader = partitionState.basePartitionState.leader
               result && Request.isValidBrokerId(leader)
           }
       },

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdbc9/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index eb844d3..4dfd075 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
-import kafka.api.MetadataPartitionState;
 import kafka.api.Request;
 import kafka.server.KafkaServer;
 import kafka.server.MetadataCache;
@@ -30,6 +29,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.UpdateMetadataRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
@@ -225,13 +225,13 @@ public class IntegrationTestUtils {
             public boolean conditionMet() {
                 for (final KafkaServer server : servers) {
                     final MetadataCache metadataCache = server.apis().metadataCache();
-                    final Option<MetadataPartitionState> partitionInfo =
+                    final Option<UpdateMetadataRequest.PartitionState> partitionInfo =
                             metadataCache.getPartitionInfo(topic, partition);
                     if (partitionInfo.isEmpty()) {
                         return false;
                     }
-                    final MetadataPartitionState metadataPartitionState = partitionInfo.get();
-                    if (!Request.isValidBrokerId(metadataPartitionState.leaderIsrAndControllerEpoch().leaderAndIsr().leader())) {
+                    final UpdateMetadataRequest.PartitionState metadataPartitionState = partitionInfo.get();
+                    if (!Request.isValidBrokerId(metadataPartitionState.basePartitionState.leader)) {
                         return false;
                     }
                 }


Mime
View raw message