Repository: kafka
Updated Branches:
refs/heads/0.8.2 a6170d4f6 -> c2dadda3a
KAFKA-1653 Duplicate broker ids allowed in replica assignment; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c2dadda3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c2dadda3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c2dadda3
Branch: refs/heads/0.8.2
Commit: c2dadda3acf1391ebac586c4b66f1845f7ff6cff
Parents: a6170d4
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Wed Oct 22 10:06:52 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed Oct 22 10:06:57 2014 -0700
----------------------------------------------------------------------
.../PreferredReplicaLeaderElectionCommand.scala | 11 ++++++---
.../kafka/admin/ReassignPartitionsCommand.scala | 24 +++++++++++++++++---
.../main/scala/kafka/admin/TopicCommand.scala | 4 ++++
.../kafka/tools/StateChangeLogMerger.scala | 7 +++++-
core/src/main/scala/kafka/utils/Utils.scala | 10 ++++++++
core/src/main/scala/kafka/utils/ZkUtils.scala | 15 ++++++++----
6 files changed, 59 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c2dadda3/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index c791848..79b5e0a 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -78,12 +78,17 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
case Some(m) =>
m.asInstanceOf[Map[String, Any]].get("partitions") match {
case Some(partitionsList) =>
- val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]]
- partitions.map { p =>
+ val partitionsRaw = partitionsList.asInstanceOf[List[Map[String, Any]]]
+ val partitions = partitionsRaw.map { p =>
val topic = p.get("topic").get.asInstanceOf[String]
val partition = p.get("partition").get.asInstanceOf[Int]
TopicAndPartition(topic, partition)
- }.toSet
+ }
+ val duplicatePartitions = Utils.duplicates(partitions)
+ val partitionsSet = partitions.toSet
+ if (duplicatePartitions.nonEmpty)
+ throw new AdminOperationException("Preferred replica election data contains
duplicate partitions: %s".format(duplicatePartitions.mkString(",")))
+ partitionsSet
case None => throw new AdminOperationException("Preferred replica election data
is empty")
}
case None => throw new AdminOperationException("Preferred replica election data
is empty")
http://git-wip-us.apache.org/repos/asf/kafka/blob/c2dadda3/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 691d69a..979992b 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -81,8 +81,14 @@ object ReassignPartitionsCommand extends Logging {
CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command
must include both --topics-to-move-json-file and --broker-list options")
val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
+ val duplicateReassignments = Utils.duplicates(brokerListToReassign)
+ if (duplicateReassignments.nonEmpty)
+ throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(",")))
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
+ val duplicateTopicsToReassign = Utils.duplicates(topicsToReassign)
+ if (duplicateTopicsToReassign.nonEmpty)
+ throw new AdminCommandFailedException("List of topics to reassign contains duplicate
entries: %s".format(duplicateTopicsToReassign.mkString(",")))
val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition,
List[Int]]()
@@ -103,17 +109,29 @@ object ReassignPartitionsCommand extends Logging {
CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command
must include --reassignment-json-file that was output " + "during the --generate option")
val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
- val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString)
+ val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
if (partitionsToBeReassigned.isEmpty)
throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile))
- val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
+ val duplicateReassignedPartitions = Utils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas)
=> tp})
+ if (duplicateReassignedPartitions.nonEmpty)
+ throw new AdminCommandFailedException("Partition reassignment contains duplicate topic
partitions: %s".format(duplicateReassignedPartitions.mkString(",")))
+ val duplicateEntries= partitionsToBeReassigned
+ .map{ case(tp,replicas) => (tp, Utils.duplicates(replicas))}
+ .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty }
+ if (duplicateEntries.nonEmpty) {
+ val duplicatesMsg = duplicateEntries
+ .map{ case (tp,duplicateReplicas) => "%s contains multiple entries for %s".format(tp,
duplicateReplicas.mkString(",")) }
+ .mkString(". ")
+ throw new AdminCommandFailedException("Partition replica lists may not contain duplicate
entries: %s".format(duplicatesMsg))
+ }
+ val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap)
// before starting assignment, output the current replica assignment to facilitate rollback
val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,
partitionsToBeReassigned.map(_._1.topic).toSeq)
println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file
option during rollback"
.format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
// start the reassignment
if(reassignPartitionsCommand.reassignPartitions())
- println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
+ println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap)))
else
println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c2dadda3/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 3b2166a..00b15a1 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -19,6 +19,7 @@ package kafka.admin
import joptsimple._
import java.util.Properties
+import kafka.common.AdminCommandFailedException
import kafka.utils._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -222,6 +223,9 @@ object TopicCommand {
val ret = new mutable.HashMap[Int, List[Int]]()
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+ val duplicateBrokers = Utils.duplicates(brokerList)
+ if (duplicateBrokers.nonEmpty)
+ throw new AdminCommandFailedException("Partition replica lists may not contain duplicate
entries: %s".format(duplicateBrokers.mkString(",")))
ret.put(i, brokerList.toList)
if (ret(i).size != ret(0).size)
throw new AdminOperationException("Partition " + i + " has different replication
factor: " + brokerList)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c2dadda3/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index d298e7e..b34b8c7 100644
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -22,7 +22,7 @@ import scala.util.matching.Regex
import collection.mutable
import java.util.Date
import java.text.SimpleDateFormat
-import kafka.utils.{Logging, CommandLineUtils}
+import kafka.utils.{Utils, Logging, CommandLineUtils}
import kafka.common.Topic
import java.io.{BufferedOutputStream, OutputStream}
@@ -115,6 +115,11 @@ object StateChangeLogMerger extends Logging {
}
if (options.has(partitionsOpt)) {
partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt)
+ val duplicatePartitions = Utils.duplicates(partitions)
+ if (duplicatePartitions.nonEmpty) {
+ System.err.println("The list of partitions contains repeated entries: %s".format(duplicatePartitions.mkString(",")))
+ System.exit(1)
+ }
}
startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim)
endDate = dateFormat.parse(options.valueOf(endTimeOpt).replace('\"', ' ').trim)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c2dadda3/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 29d5a17..23aefb4 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -566,4 +566,14 @@ object Utils extends Logging {
case c => c
}.mkString
}
+
+ /**
+ * Returns a list of duplicated items
+ */
+ def duplicates[T](s: Traversable[T]): Iterable[T] = {
+ s.groupBy(identity)
+ .map{ case (k,l) => (k,l.size)}
+ .filter{ case (k,l) => (l > 1) }
+ .keys
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c2dadda3/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index a7b1fdc..56e3e88 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -575,23 +575,28 @@ object ZkUtils extends Logging {
}
}
- def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]]
= {
- val reassignedPartitions: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map()
+ // Parses without deduplicating keys so the the data can be checked before allowing reassignment
to proceed
+ def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition,
Seq[Int])] = {
Json.parseFull(jsonData) match {
case Some(m) =>
m.asInstanceOf[Map[String, Any]].get("partitions") match {
case Some(partitionsSeq) =>
- partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].foreach(p => {
+ partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
val topic = p.get("topic").get.asInstanceOf[String]
val partition = p.get("partition").get.asInstanceOf[Int]
val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
- reassignedPartitions += TopicAndPartition(topic, partition) -> newReplicas
+ TopicAndPartition(topic, partition) -> newReplicas
})
case None =>
+ Seq.empty
}
case None =>
+ Seq.empty
}
- reassignedPartitions
+ }
+
+ def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]]
= {
+ parsePartitionReassignmentDataWithoutDedup(jsonData).toMap
}
def parseTopicsData(jsonData: String): Seq[String] = {
|