kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.4 updated: MINOR: Rework NewPartitionReassignment public API (#7638)
Date Tue, 05 Nov 2019 18:41:28 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 5b7afab  MINOR: Rework NewPartitionReassignment public API (#7638)
5b7afab is described below

commit 5b7afab9cc5d04e63efcdf950fe2f2f9c3efacd6
Author: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
AuthorDate: Tue Nov 5 18:34:11 2019 +0000

    MINOR: Rework NewPartitionReassignment public API (#7638)
    
    This patch removes the NewPartitionReassignment#of() method in favor of a simple constructor.
Said method was confusing due to breaking two conventions - always returning a non-empty Optional
and thus not being used as a static factory method.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Colin P. McCabe <cmccabe@apache.org>
    (cherry picked from commit be58580e14be93618f11e609389ff6bb16317702)
---
 .../apache/kafka/clients/admin/NewPartitionReassignment.java |  9 ++-------
 .../org/apache/kafka/clients/admin/KafkaAdminClientTest.java |  8 ++++----
 .../integration/kafka/api/AdminClientIntegrationTest.scala   | 12 ++++++------
 .../unit/kafka/admin/ReassignPartitionsClusterTest.scala     |  4 ++--
 4 files changed, 14 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
index 111514a..f9a7008 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 /**
  * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map,
AlterPartitionReassignmentsOptions)}.
@@ -32,13 +31,9 @@ public class NewPartitionReassignment {
     /**
      * @throws IllegalArgumentException if no replicas are supplied
      */
-    public static Optional<NewPartitionReassignment> of(List<Integer> replicas)
{
-        if (replicas == null || replicas.size() == 0)
+    public NewPartitionReassignment(List<Integer> targetReplicas) {
+        if (targetReplicas == null || targetReplicas.size() == 0)
             throw new IllegalArgumentException("Cannot create a new partition reassignment
without any replicas");
-        return Optional.of(new NewPartitionReassignment(replicas));
-    }
-
-    private NewPartitionReassignment(List<Integer> targetReplicas) {
         this.targetReplicas = Collections.unmodifiableList(new ArrayList<>(targetReplicas));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 9fd0c1c..df28f00 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2000,7 +2000,7 @@ public class KafkaAdminClientTest {
             TopicPartition tp2 = new TopicPartition("B", 0);
             Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments
= new HashMap<>();
             reassignments.put(tp1, Optional.empty());
-            reassignments.put(tp2, NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
+            reassignments.put(tp2, Optional.of(new NewPartitionReassignment(Arrays.asList(1,
2, 3))));
 
             // 1. server returns less responses than number of partitions we sent
             AlterPartitionReassignmentsResponseData responseData1 = new AlterPartitionReassignmentsResponseData();
@@ -2091,9 +2091,9 @@ public class KafkaAdminClientTest {
             TopicPartition invalidTopicTP = new TopicPartition("", 0);
             TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1);
             Map<TopicPartition, Optional<NewPartitionReassignment>> invalidTopicReassignments
= new HashMap<>();
-            invalidTopicReassignments.put(invalidPartitionTP, NewPartitionReassignment.of(Arrays.asList(1,
2, 3)));
-            invalidTopicReassignments.put(invalidTopicTP, NewPartitionReassignment.of(Arrays.asList(1,
2, 3)));
-            invalidTopicReassignments.put(tp1, NewPartitionReassignment.of(Arrays.asList(1,
2, 3)));
+            invalidTopicReassignments.put(invalidPartitionTP, Optional.of(new NewPartitionReassignment(Arrays.asList(1,
2, 3))));
+            invalidTopicReassignments.put(invalidTopicTP, Optional.of(new NewPartitionReassignment(Arrays.asList(1,
2, 3))));
+            invalidTopicReassignments.put(tp1, Optional.of(new NewPartitionReassignment(Arrays.asList(1,
2, 3))));
 
             AlterPartitionReassignmentsResponseData singlePartResponseData =
                     new AlterPartitionReassignmentsResponseData()
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 87cc42f..ad0a78e 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -22,7 +22,7 @@ import java.time.{Duration => JDuration}
 import java.util.Arrays.asList
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
 import java.{time, util}
 
 import kafka.log.LogConfig
@@ -1996,9 +1996,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     createTopic(topic, numPartitions = 4)
 
 
-    val validAssignment = NewPartitionReassignment.of(
+    val validAssignment = Optional.of(new NewPartitionReassignment(
       (0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
-    )
+    ))
 
     val nonExistentTp1 = new TopicPartition("topicA", 0)
     val nonExistentTp2 = new TopicPartition(topic, 4)
@@ -2012,9 +2012,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1), classOf[UnknownTopicOrPartitionException])
     assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2), classOf[UnknownTopicOrPartitionException])
 
-    val extraNonExistentReplica = NewPartitionReassignment.of((0 until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
-    val negativeIdReplica = NewPartitionReassignment.of(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava)
-    val duplicateReplica = NewPartitionReassignment.of(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava)
+    val extraNonExistentReplica = Optional.of(new NewPartitionReassignment((0 until brokerCount
+ 1).map(_.asInstanceOf[Integer]).asJava))
+    val negativeIdReplica = Optional.of(new NewPartitionReassignment(Seq(-3, -2, -1).map(_.asInstanceOf[Integer]).asJava))
+    val duplicateReplica = Optional.of(new NewPartitionReassignment(Seq(0, 1, 1).map(_.asInstanceOf[Integer]).asJava))
     val invalidReplicaResult = client.alterPartitionReassignments(Map(
       tp1 -> extraNonExistentReplica,
       tp2 -> negativeIdReplica,
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 5331559..9312be3 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq}
 import scala.util.Random
 import java.io.File
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
 import java.util.concurrent.ExecutionException
 
 import kafka.controller.ReplicaAssignment
@@ -1261,7 +1261,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with
Logging {
     }.mkString(",")
 
   def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]): (TopicPartition, java.util.Optional[NewPartitionReassignment])
=
-    tp -> NewPartitionReassignment.of(replicas.map(_.asInstanceOf[Integer]).asJava)
+    tp -> Optional.of(new NewPartitionReassignment((replicas.map(_.asInstanceOf[Integer]).asJava)))
 
   def cancelReassignmentEntry(tp: TopicPartition): (TopicPartition, java.util.Optional[NewPartitionReassignment])
=
     tp -> java.util.Optional.empty()


Mime
View raw message