kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5161: add code in reassign-partitions to check broker existence
Date Fri, 05 May 2017 05:12:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2448343ed -> 95b48b157


KAFKA-5161: add code in reassign-partitions to check broker existence

Added code to check existence of the brokers in the proposed plan.

Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2962 from amethystic/kafka5161_reassign_check_invalid_brokerID


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/95b48b15
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/95b48b15
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/95b48b15

Branch: refs/heads/trunk
Commit: 95b48b157aca44beec4335e62a59f37097fe7499
Parents: 2448343
Author: amethystic <huxi_2b@hotmail.com>
Authored: Thu May 4 22:12:28 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu May 4 22:12:28 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/admin/ReassignPartitionsCommand.scala    |  8 +++++++-
 .../unit/kafka/admin/ReassignPartitionsClusterTest.scala | 11 +++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/95b48b15/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 436fb8e..3addb77 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -201,7 +201,7 @@ object ReassignPartitionsCommand extends Logging {
         .mkString(". ")
       throw new AdminCommandFailedException("Partition replica lists may not contain duplicate
entries: %s".format(duplicatesMsg))
     }
-    //Check that all partitions in the proposed assignment exist in the cluster
+    // check that all partitions in the proposed assignment exist in the cluster
     val proposedTopics = partitionsToBeReassigned.map { case (tp, _) => tp.topic }.distinct
     val existingAssignment = zkUtils.getReplicaAssignmentForTopics(proposedTopics)
     val nonExistentPartitions = partitionsToBeReassigned.map { case (tp, _) => tp }.filterNot(existingAssignment.contains)
@@ -209,6 +209,12 @@ object ReassignPartitionsCommand extends Logging {
       throw new AdminCommandFailedException("The proposed assignment contains non-existent
partitions: " +
         nonExistentPartitions)
 
+    // check that all brokers in the proposed assignment exist in the cluster
+    val existingBrokerIDs = zkUtils.getSortedBrokerList()
+    val nonExistingBrokerIDs = partitionsToBeReassigned.toMap.values.flatten.filterNot(existingBrokerIDs.contains).toSet
+    if (nonExistingBrokerIDs.nonEmpty)
+      throw new AdminCommandFailedException("The proposed assignment contains non-existent
brokerIDs: " + nonExistingBrokerIDs.mkString(","))
+
     partitionsToBeReassigned
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/95b48b15/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 1bc90a5..05a3f83 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -301,6 +301,17 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
   }
 
+  @Test(expected = classOf[AdminCommandFailedException])
+  def shouldFailIfProposedHasInvalidBrokerID() {
+    //Given a single replica on server 100
+    startBrokers(Seq(100, 101))
+    createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    //When we execute an assignment that specifies an invalid brokerID (102: invalid broker
ID in this case)
+    val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101,
102]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
+  }
+
   @Test
   def shouldPerformThrottledReassignmentOverVariousTopics() {
     val throttle = Throttle(1000L)


Mime
View raw message