kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.4 updated: MINOR: Check against empty replicas in AlterPartitionReassignments (#7574)
Date Tue, 22 Oct 2019 22:31:21 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 78e7c90  MINOR: Check against empty replicas in AlterPartitionReassignments (#7574)
78e7c90 is described below

commit 78e7c90e90efa18b2a5b298e49154834d8d5bf67
Author: Stanislav Kozlovski <familyguyuser192@windowslive.com>
AuthorDate: Tue Oct 22 23:26:50 2019 +0100

    MINOR: Check against empty replicas in AlterPartitionReassignments (#7574)
    
    Do not allow an empty replica set to be passed into the reassignment API.
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>, José Armando García Sancio <jsancio@gmail.com>
    (cherry picked from commit 2efee34b74d4895b504ab541b716edab72b320d1)
---
 core/src/main/scala/kafka/controller/KafkaController.scala     |  2 +-
 .../integration/kafka/api/AdminClientIntegrationTest.scala     | 10 +++++++---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 0dd2ec7..d45fea8 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1716,7 +1716,7 @@ class KafkaController(val config: KafkaConfig,
       case Some(replicas) =>
         val replicaSet = replicas.toSet
 
-        if (replicas.size != replicaSet.size)
+        if (replicas.isEmpty || replicas.size != replicaSet.size)
           false
         else if (replicas.exists(_ < 0))
           false
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 6ff4f0e..eb15529 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -2018,12 +2018,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     val tp1 = new TopicPartition(topic, 0)
     val tp2 = new TopicPartition(topic, 1)
     val tp3 = new TopicPartition(topic, 2)
-    createTopic(topic, numPartitions = 3)
+    val tp4 = new TopicPartition(topic, 3)
+    createTopic(topic, numPartitions = 4)
 
     val validAssignment = new NewPartitionReassignment((0 until brokerCount).map(_.asInstanceOf[Integer]).asJava)
 
     val nonExistentTp1 = new TopicPartition("topicA", 0)
-    val nonExistentTp2 = new TopicPartition(topic, 3)
+    val nonExistentTp2 = new TopicPartition(topic, 4)
     val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(
       tp1 -> java.util.Optional.of(validAssignment),
       tp2 -> java.util.Optional.of(validAssignment),
@@ -2037,14 +2038,17 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     val extraNonExistentReplica = new NewPartitionReassignment((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
     val negativeIdReplica = new NewPartitionReassignment(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava)
     val duplicateReplica = new NewPartitionReassignment(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava)
+    val noReplicas = new NewPartitionReassignment(Seq().map(_.asInstanceOf[Integer]).asJava)
     val invalidReplicaResult = client.alterPartitionReassignments(Map(
       tp1 -> java.util.Optional.of(extraNonExistentReplica),
       tp2 -> java.util.Optional.of(negativeIdReplica),
-      tp3 -> java.util.Optional.of(duplicateReplica)
+      tp3 -> java.util.Optional.of(duplicateReplica),
+      tp4 -> java.util.Optional.of(noReplicas)
     ).asJava).values()
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1), classOf[InvalidReplicaAssignmentException])
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2), classOf[InvalidReplicaAssignmentException])
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException])
+    assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp4), classOf[InvalidReplicaAssignmentException])
   }
 
   @Test


Mime
View raw message