kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.4 updated: MINOR: Re-implement NewPartitionReassignment#of() (#7592)
Date Thu, 24 Oct 2019 22:32:36 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new e20c228  MINOR: Re-implement NewPartitionReassignment#of() (#7592)
e20c228 is described below

commit e20c228b98f87cfe2eaa34078f53953be587e900
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Thu Oct 24 23:23:54 2019 +0100

    MINOR: Re-implement NewPartitionReassignment#of() (#7592)
    
    Re-implement NewPartitionReassignment#of.  It now takes a list rather than a variable-length
list of arguments.
    
    Reviewers: Colin P. McCabe <cmccabe@apache.org>, Vikas Singh <vikas@confluent.io>
    (cherry picked from commit 28ef7f1d6d7f4cec7e81b8d0641debebefec9104)
---
 .../clients/admin/NewPartitionReassignment.java    | 12 ++++++++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 12 +++------
 .../kafka/api/AdminClientIntegrationTest.scala     | 31 +++++++++++-----------
 .../admin/ReassignPartitionsClusterTest.scala      |  2 +-
 4 files changed, 31 insertions(+), 26 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
index 0f7a61c..111514a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map,
AlterPartitionReassignmentsOptions)}.
@@ -28,7 +29,16 @@ import java.util.Map;
 public class NewPartitionReassignment {
     private final List<Integer> targetReplicas;
 
-    public NewPartitionReassignment(List<Integer> targetReplicas) {
+    /**
+     * @throws IllegalArgumentException if no replicas are supplied
+     */
+    public static Optional<NewPartitionReassignment> of(List<Integer> replicas)
{
+        if (replicas == null || replicas.size() == 0)
+            throw new IllegalArgumentException("Cannot create a new partition reassignment
without any replicas");
+        return Optional.of(new NewPartitionReassignment(replicas));
+    }
+
+    private NewPartitionReassignment(List<Integer> targetReplicas) {
         this.targetReplicas = Collections.unmodifiableList(new ArrayList<>(targetReplicas));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index eac7063..cf50c8f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2011,7 +2011,7 @@ public class KafkaAdminClientTest {
             TopicPartition tp2 = new TopicPartition("B", 0);
             Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments
= new HashMap<>();
             reassignments.put(tp1, Optional.empty());
-            reassignments.put(tp2, newPartitionReassignment(Arrays.asList(1, 2, 3)));
+            reassignments.put(tp2, NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
 
             // 1. server returns less responses than number of partitions we sent
             AlterPartitionReassignmentsResponseData responseData1 = new AlterPartitionReassignmentsResponseData();
@@ -2102,9 +2102,9 @@ public class KafkaAdminClientTest {
             TopicPartition invalidTopicTP = new TopicPartition("", 0);
             TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1);
             Map<TopicPartition, Optional<NewPartitionReassignment>> invalidTopicReassignments
= new HashMap<>();
-            invalidTopicReassignments.put(invalidPartitionTP, newPartitionReassignment(Arrays.asList(1,
2, 3)));
-            invalidTopicReassignments.put(invalidTopicTP, newPartitionReassignment(Arrays.asList(1,
2, 3)));
-            invalidTopicReassignments.put(tp1, newPartitionReassignment(Arrays.asList(1,
2, 3)));
+            invalidTopicReassignments.put(invalidPartitionTP, NewPartitionReassignment.of(Arrays.asList(1,
2, 3)));
+            invalidTopicReassignments.put(invalidTopicTP, NewPartitionReassignment.of(Arrays.asList(1,
2, 3)));
+            invalidTopicReassignments.put(tp1, NewPartitionReassignment.of(Arrays.asList(1,
2, 3)));
 
             AlterPartitionReassignmentsResponseData singlePartResponseData =
                     new AlterPartitionReassignmentsResponseData()
@@ -2271,8 +2271,4 @@ public class KafkaAdminClientTest {
             }
         }
     }
-
-    private static Optional<NewPartitionReassignment> newPartitionReassignment(List<Integer>
targetReplicas) {
-        return Optional.of(new NewPartitionReassignment(targetReplicas));
-    }
 }
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index eb15529..bed6900 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -2018,37 +2018,36 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     val tp1 = new TopicPartition(topic, 0)
     val tp2 = new TopicPartition(topic, 1)
     val tp3 = new TopicPartition(topic, 2)
-    val tp4 = new TopicPartition(topic, 3)
     createTopic(topic, numPartitions = 4)
 
-    val validAssignment = new NewPartitionReassignment((0 until brokerCount).map(_.asInstanceOf[Integer]).asJava)
+
+    val validAssignment = NewPartitionReassignment.of(
+      (0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
+    )
 
     val nonExistentTp1 = new TopicPartition("topicA", 0)
     val nonExistentTp2 = new TopicPartition(topic, 4)
     val nonExistentPartitionsResult = client.alterPartitionReassignments(Map(
-      tp1 -> java.util.Optional.of(validAssignment),
-      tp2 -> java.util.Optional.of(validAssignment),
-      tp3 -> java.util.Optional.of(validAssignment),
-      nonExistentTp1 -> java.util.Optional.of(validAssignment),
-      nonExistentTp2 -> java.util.Optional.of(validAssignment)
+      tp1 -> validAssignment,
+      tp2 -> validAssignment,
+      tp3 -> validAssignment,
+      nonExistentTp1 -> validAssignment,
+      nonExistentTp2 -> validAssignment
     ).asJava).values()
     assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1), classOf[UnknownTopicOrPartitionException])
     assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2), classOf[UnknownTopicOrPartitionException])
 
-    val extraNonExistentReplica = new NewPartitionReassignment((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
-    val negativeIdReplica = new NewPartitionReassignment(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava)
-    val duplicateReplica = new NewPartitionReassignment(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava)
-    val noReplicas = new NewPartitionReassignment(Seq().map(_.asInstanceOf[Integer]).asJava)
+    val extraNonExistentReplica = NewPartitionReassignment.of((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
+    val negativeIdReplica = NewPartitionReassignment.of(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava)
+    val duplicateReplica = NewPartitionReassignment.of(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava)
     val invalidReplicaResult = client.alterPartitionReassignments(Map(
-      tp1 -> java.util.Optional.of(extraNonExistentReplica),
-      tp2 -> java.util.Optional.of(negativeIdReplica),
-      tp3 -> java.util.Optional.of(duplicateReplica),
-      tp4 -> java.util.Optional.of(noReplicas)
+      tp1 -> extraNonExistentReplica,
+      tp2 -> negativeIdReplica,
+      tp3 -> duplicateReplica
     ).asJava).values()
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp1), classOf[InvalidReplicaAssignmentException])
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp2), classOf[InvalidReplicaAssignmentException])
     assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp3), classOf[InvalidReplicaAssignmentException])
-    assertFutureExceptionTypeEquals(invalidReplicaResult.get(tp4), classOf[InvalidReplicaAssignmentException])
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 10ed6bb..a84c611 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -1182,7 +1182,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     }.mkString(",")
 
   def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]): (TopicPartition, java.util.Optional[NewPartitionReassignment])
=
-    tp -> java.util.Optional.of(new NewPartitionReassignment(replicas.map(_.asInstanceOf[Integer]).asJava))
+    tp -> NewPartitionReassignment.of(replicas.map(_.asInstanceOf[Integer]).asJava)
 
   def cancelReassignmentEntry(tp: TopicPartition): (TopicPartition, java.util.Optional[NewPartitionReassignment])
=
     tp -> java.util.Optional.empty()


Mime
View raw message