kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Improve PartitionState logging and remove duplication of code
Date Thu, 21 Jul 2016 00:00:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 039e89a6e -> e293ed145


MINOR: Improve PartitionState logging and remove duplication of code

Currently, logs involving PartitionState are not very helpful.

```
	Broker 449 cached leader info org.apache.kafka.common.requests.UpdateMetadataRequest$PartitionState3285d64a
for partition <topic>-<partition> in response to UpdateMetadata request sent by
controller 356 epoch 138 with correlation id 0

	TRACE state.change.logger: Broker 449 received LeaderAndIsr request org.apache.kafka.common.requests.LeaderAndIsrRequest$PartitionState66d6a8eb
correlation id 3 from controller 356 epoch 138 for partition [<topic>,<partition>]
```

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1609 from SinghAsDev/partitionState

(cherry picked from commit 0e5700fb68671f3fb75bfdeceda40e84330aca69)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e293ed14
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e293ed14
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e293ed14

Branch: refs/heads/0.10.0
Commit: e293ed1458a1bc2118fe37ed6a6002ffe05e3ecf
Parents: 039e89a
Author: Ashish Singh <asingh@cloudera.com>
Authored: Thu Jul 21 01:00:33 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Jul 21 01:00:51 2016 +0100

----------------------------------------------------------------------
 .../common/requests/LeaderAndIsrRequest.java    | 19 --------
 .../kafka/common/requests/PartitionState.java   | 46 ++++++++++++++++++++
 .../common/requests/UpdateMetadataRequest.java  | 18 --------
 .../common/requests/RequestResponseTest.java    | 16 +++----
 .../main/scala/kafka/cluster/Partition.scala    | 12 +++--
 .../controller/ControllerChannelManager.scala   |  5 ++-
 .../main/scala/kafka/server/MetadataCache.scala |  3 +-
 .../scala/kafka/server/ReplicaManager.scala     | 18 ++++----
 .../kafka/api/AuthorizerIntegrationTest.scala   |  4 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  3 +-
 .../unit/kafka/server/MetadataCacheTest.scala   |  4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  7 ++-
 12 files changed, 81 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index fee3c21..52b9674 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -35,25 +35,6 @@ import java.util.Set;
 
 public class LeaderAndIsrRequest extends AbstractRequest {
 
-    public static class PartitionState {
-        public final int controllerEpoch;
-        public final int leader;
-        public final int leaderEpoch;
-        public final List<Integer> isr;
-        public final int zkVersion;
-        public final Set<Integer> replicas;
-
-        public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer>
isr, int zkVersion, Set<Integer> replicas) {
-            this.controllerEpoch = controllerEpoch;
-            this.leader = leader;
-            this.leaderEpoch = leaderEpoch;
-            this.isr = isr;
-            this.zkVersion = zkVersion;
-            this.replicas = replicas;
-        }
-
-    }
-
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id);
 
     private static final String CONTROLLER_ID_KEY_NAME = "controller_id";

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
new file mode 100644
index 0000000..e766632
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/PartitionState.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+public class PartitionState {
+    public final int controllerEpoch;
+    public final int leader;
+    public final int leaderEpoch;
+    public final List<Integer> isr;
+    public final int zkVersion;
+    public final Set<Integer> replicas;
+
+    public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer>
isr, int zkVersion, Set<Integer> replicas) {
+        this.controllerEpoch = controllerEpoch;
+        this.leader = leader;
+        this.leaderEpoch = leaderEpoch;
+        this.isr = isr;
+        this.zkVersion = zkVersion;
+        this.replicas = replicas;
+    }
+
+    @Override
+    public String toString() {
+        return "PartitionState(controllerEpoch=" + controllerEpoch +
+                ", leader=" + leader +
+                ", leaderEpoch=" + leaderEpoch +
+                ", isr=" + Arrays.toString(isr.toArray()) +
+                ", zkVersion=" + zkVersion +
+                ", replicas=" + Arrays.toString(replicas.toArray()) + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 27f89fa..1c21789 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -33,24 +33,6 @@ import java.util.Set;
 
 public class UpdateMetadataRequest extends AbstractRequest {
 
-    public static final class PartitionState {
-        public final int controllerEpoch;
-        public final int leader;
-        public final int leaderEpoch;
-        public final List<Integer> isr;
-        public final int zkVersion;
-        public final Set<Integer> replicas;
-
-        public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer>
isr, int zkVersion, Set<Integer> replicas) {
-            this.controllerEpoch = controllerEpoch;
-            this.leader = leader;
-            this.leaderEpoch = leaderEpoch;
-            this.isr = isr;
-            this.zkVersion = zkVersion;
-            this.replicas = replicas;
-        }
-    }
-
     public static final class Broker {
         public final int id;
         public final Map<SecurityProtocol, EndPoint> endPoints;

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 345de3f..ecf9e53 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -372,15 +372,15 @@ public class RequestResponseTest {
     }
 
     private AbstractRequest createLeaderAndIsrRequest() {
-        Map<TopicPartition, LeaderAndIsrRequest.PartitionState> partitionStates = new
HashMap<>();
+        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = Arrays.asList(1, 2);
         List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
         partitionStates.put(new TopicPartition("topic5", 105),
-                new LeaderAndIsrRequest.PartitionState(0, 2, 1, new ArrayList<>(isr),
2, new HashSet<>(replicas)));
+                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
         partitionStates.put(new TopicPartition("topic5", 1),
-                new LeaderAndIsrRequest.PartitionState(1, 1, 1, new ArrayList<>(isr),
2, new HashSet<>(replicas)));
+                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
         partitionStates.put(new TopicPartition("topic20", 1),
-                new LeaderAndIsrRequest.PartitionState(1, 0, 1, new ArrayList<>(isr),
2, new HashSet<>(replicas)));
+                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
 
         Set<Node> leaders = new HashSet<>(Arrays.asList(
                 new Node(0, "test0", 1223),
@@ -398,15 +398,15 @@ public class RequestResponseTest {
 
     @SuppressWarnings("deprecation")
     private AbstractRequest createUpdateMetadataRequest(int version, String rack) {
-        Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates =
new HashMap<>();
+        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         List<Integer> isr = Arrays.asList(1, 2);
         List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
         partitionStates.put(new TopicPartition("topic5", 105),
-                new UpdateMetadataRequest.PartitionState(0, 2, 1, new ArrayList<>(isr),
2, new HashSet<>(replicas)));
+                new PartitionState(0, 2, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
         partitionStates.put(new TopicPartition("topic5", 1),
-                new UpdateMetadataRequest.PartitionState(1, 1, 1, new ArrayList<>(isr),
2, new HashSet<>(replicas)));
+                new PartitionState(1, 1, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
         partitionStates.put(new TopicPartition("topic20", 1),
-                new UpdateMetadataRequest.PartitionState(1, 0, 1, new ArrayList<>(isr),
2, new HashSet<>(replicas)));
+                new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new HashSet<>(replicas)));
 
         if (version == 0) {
             Set<Node> liveBrokers = new HashSet<>(Arrays.asList(

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4e79bdc..edf6619 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -18,7 +18,7 @@ package kafka.cluster
 
 import kafka.common._
 import kafka.utils._
-import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.admin.AdminUtils
 import kafka.api.LeaderAndIsr
 import kafka.log.LogConfig
@@ -26,17 +26,15 @@ import kafka.server._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
 import kafka.message.ByteBufferMessageSet
-
 import java.io.IOException
 import java.util.concurrent.locks.ReentrantReadWriteLock
+
 import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException}
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
-
 
 import scala.collection.JavaConverters._
-
 import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.requests.PartitionState
 
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR,
RAR
@@ -166,7 +164,7 @@ class Partition(val topic: String,
    * from the time when this broker was the leader last time) and setting the new leader
and ISR.
    * If the leader replica id does not change, return false to indicate the replica manager.
    */
-  def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState,
correlationId: Int): Boolean = {
+  def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int):
Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
       // record the epoch of the controller that made the leadership decision. This is useful
while updating the isr
@@ -207,7 +205,7 @@ class Partition(val topic: String,
    *  Make the local replica the follower by setting the new leader and ISR to empty
    *  If the leader replica id does not change, return false to indicate the replica manager
    */
-  def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState,
correlationId: Int): Boolean = {
+  def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId:
Int): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
       val newLeaderBrokerId: Int = partitionStateInfo.leader

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b4059a4..c19d35a 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUp
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive,
Selectable, Selector}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.requests
 import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Node, TopicPartition}
@@ -362,7 +363,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
         }
         val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo)
=>
           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
-          val partitionState = new LeaderAndIsrRequest.PartitionState(controllerEpoch, leaderIsr.leader,
+          val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
             leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
             partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
           )
@@ -379,7 +380,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
           broker, p._1)))
         val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo)
=>
           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
-          val partitionState = new UpdateMetadataRequest.PartitionState(controllerEpoch,
leaderIsr.leader,
+          val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
             leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
             partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
           )

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index b387f2e..f493e7d 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -30,8 +30,7 @@ import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState
-import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, UpdateMetadataRequest}
 
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated
through

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 68f2385..8260643 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,6 +19,7 @@ package kafka.server
 import java.io.{File, IOException}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
 import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.{Partition, Replica}
@@ -28,16 +29,17 @@ import kafka.log.{LogAppendInfo, LogManager}
 import kafka.message.{ByteBufferMessageSet, InvalidMessageException, Message, MessageSet}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
-import org.I0Itec.zkclient.IZkChildListener
-import org.apache.kafka.common.errors.{OffsetOutOfRangeException, RecordBatchTooLargeException,
ReplicaNotAvailableException, RecordTooLargeException,
-InvalidTopicException, ControllerMovedException, NotLeaderForPartitionException, CorruptRecordException,
UnknownTopicOrPartitionException,
-InvalidTimestampException}
+import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException,
InvalidTimestampException,
+                                        InvalidTopicException, NotLeaderForPartitionException,
OffsetOutOfRangeException,
+                                        RecordBatchTooLargeException, RecordTooLargeException,
ReplicaNotAvailableException,
+                                        UnknownTopicOrPartitionException}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{LeaderAndIsrRequest, StopReplicaRequest, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState, StopReplicaRequest,
UpdateMetadataRequest}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time => JTime}
+
 import scala.collection._
 import scala.collection.JavaConverters._
 
@@ -610,7 +612,7 @@ class ReplicaManager(val config: KafkaConfig,
         controllerEpoch = leaderAndISRRequest.controllerEpoch
 
         // First check partition's leader epoch
-        val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()
+        val partitionState = new mutable.HashMap[Partition, PartitionState]()
         leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo)
=>
           val partition = getOrCreatePartition(topicPartition.topic, topicPartition.partition)
           val partitionLeaderEpoch = partition.getLeaderEpoch()
@@ -679,7 +681,7 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeLeaders(controllerId: Int,
                           epoch: Int,
-                          partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
+                          partitionState: Map[Partition, PartitionState],
                           correlationId: Int,
                           responseMap: mutable.Map[TopicPartition, Short]): Set[Partition]
= {
     partitionState.foreach(state =>
@@ -750,7 +752,7 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeFollowers(controllerId: Int,
                             epoch: Int,
-                            partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
+                            partitionState: Map[Partition, PartitionState],
                             correlationId: Int,
                             responseMap: mutable.Map[TopicPartition, Short],
                             metadataCache: MetadataCache) : Set[Partition] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 2d5900f..60eb74c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -186,7 +186,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   }
 
   private def createUpdateMetadataRequest = {
-    val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue,
brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
+    val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue,
List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
     val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
       Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost",
0)).asJava, null)).asJava
     new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers)
@@ -215,7 +215,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
 
   private def createLeaderAndIsrRequest = {
     new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue,
-      Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId,
Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava,
+      Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava,
2, Set(brokerId).asJava)).asJava,
       Set(new Node(brokerId, "localhost", 0)).asJava)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 7258980..343a3e1 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -18,11 +18,10 @@
 package kafka.server
 
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
 
 import scala.collection.JavaConverters._
 import kafka.api.LeaderAndIsr
-import org.apache.kafka.common.requests.{AbstractRequestResponse, LeaderAndIsrRequest, LeaderAndIsrResponse}
+import org.apache.kafka.common.requests.{AbstractRequestResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
PartitionState}
 import org.junit.Assert._
 import kafka.utils.{CoreUtils, TestUtils}
 import kafka.cluster.Broker

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 770513c..b34c93d 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -22,8 +22,8 @@ import util.Arrays.asList
 import kafka.common.BrokerEndPointNotAvailableException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.UpdateMetadataRequest
-import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint, PartitionState}
+import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest}
+import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint}
 import org.junit.Test
 import org.junit.Assert._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e293ed14/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 5739856..bfb66b9 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -24,19 +24,18 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo}
 import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
-import kafka.message.{MessageSet, ByteBufferMessageSet, Message}
+import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.LeaderAndIsrRequest
-import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
+import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
 import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.{Test, Before, After}
+import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
 import scala.collection.Map


Mime
View raw message