kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-2234; Partition reassignment of a nonexistent topic prevents future reassignments; patched by Manikumar Reddy; reviewed by Jun Rao
Date Thu, 18 Jun 2015 23:37:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d9c0ad685 -> 5c2ca30f2


kafka-2234; Partition reassignment of a nonexistent topic prevents future reassignments; patched
by Manikumar Reddy; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5c2ca30f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5c2ca30f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5c2ca30f

Branch: refs/heads/trunk
Commit: 5c2ca30f229c7f39fca65aed6bd45c382aacda77
Parents: d9c0ad6
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Thu Jun 18 16:37:25 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Jun 18 16:37:25 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/admin/ReassignPartitionsCommand.scala    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5c2ca30f/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 912b718..ea34589 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -208,9 +208,14 @@ class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[T
   def reassignPartitions(): Boolean = {
     try {
       val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic,
p._1.partition))
-      val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions)
-      ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
-      true
+      if(validPartitions.isEmpty) {
+        false
+      }
+      else {
+        val jsonReassignmentData = ZkUtils.getPartitionReassignmentZkData(validPartitions)
+        ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
+        true
+      }
     } catch {
       case ze: ZkNodeExistsException =>
         val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)


Mime
View raw message