kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5091; ReassignPartitionsCommand should protect against empty replica list assignment
Date Mon, 01 May 2017 19:24:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0104b657a -> c96656efb


KAFKA-5091; ReassignPartitionsCommand should protect against empty replica list assignment

ReassignPartitionsCommand should protect against empty replica list assignment.

Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #2904 from amethystic/kafka-5901_ReassignPartitionsCommand_protect_against_empty_replica_list


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c96656ef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c96656ef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c96656ef

Branch: refs/heads/trunk
Commit: c96656efb3718f8b2badc30e56c7f179c0658de8
Parents: 0104b65
Author: amethystic <huxi_2b@hotmail.com>
Authored: Mon May 1 11:17:30 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon May 1 11:17:30 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/admin/ReassignPartitionsCommand.scala    |  3 +++
 .../unit/kafka/admin/ReassignPartitionsClusterTest.scala | 11 +++++++++++
 2 files changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c96656ef/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index c167633..436fb8e 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -186,6 +186,9 @@ object ReassignPartitionsCommand extends Logging {
 
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file is empty")
+    if (partitionsToBeReassigned.exists(_._2.isEmpty)) {
+      throw new AdminCommandFailedException("Partition replica list cannot be empty")
+    }
     val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map
{ case (tp, _) => tp })
     if (duplicateReassignedPartitions.nonEmpty)
       throw new AdminCommandFailedException("Partition reassignment contains duplicate topic
partitions: %s".format(duplicateReassignedPartitions.mkString(",")))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c96656ef/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index c576a5c..1bc90a5 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -290,6 +290,17 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
   }
 
+  @Test(expected = classOf[AdminCommandFailedException])
+  def shouldFailIfProposedHasEmptyReplicaList() {
+    //Given a single replica on server 100
+    startBrokers(Seq(100, 101))
+    createTopic(zkUtils, topicName, Map(0 -> Seq(100)), servers = servers)
+
+    //When we execute an assignment that specifies an empty replica list (0: empty list in
this case)
+    val topicJson = s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[]}]}"""
+    ReassignPartitionsCommand.executeAssignment(zkUtils, topicJson, NoThrottle)
+  }
+
   @Test
   def shouldPerformThrottledReassignmentOverVariousTopics() {
     val throttle = Throttle(1000L)


Mime
View raw message