Updated Branches:
refs/heads/trunk 343a1e266 -> 8d4dbe60f
kafka-1052; integrate add-partitions command into topicCommand; patched by Sriram Subramanian;
reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d4dbe60
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d4dbe60
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d4dbe60
Branch: refs/heads/trunk
Commit: 8d4dbe60f188c48a7f0d552b7b1109fb8e126521
Parents: 343a1e2
Author: Sriram Subramanian <sriram@gmail.com>
Authored: Wed Oct 9 20:44:49 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Oct 9 20:44:49 2013 -0700
----------------------------------------------------------------------
.../kafka/admin/AddPartitionsCommand.scala | 127 -------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 56 ++++++++
.../main/scala/kafka/admin/TopicCommand.scala | 23 ++--
.../unit/kafka/admin/AddPartitionsTest.scala | 10 +-
4 files changed, 76 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d4dbe60/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
deleted file mode 100644
index c74d9c2..0000000
--- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import joptsimple.OptionParser
-import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
-import scala.collection.mutable
-import kafka.common.TopicAndPartition
-
-object AddPartitionsCommand extends Logging {
-
- def main(args: Array[String]): Unit = {
- val parser = new OptionParser
- val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need
to be added.")
- .withRequiredArg
- .describedAs("topic")
- .ofType(classOf[String])
- val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the form host:port. " +
- "Multiple URLS can be given to allow fail-over.")
- .withRequiredArg
- .describedAs("urls")
- .ofType(classOf[String])
- val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add
to the topic")
- .withRequiredArg
- .describedAs("# of partitions")
- .ofType(classOf[java.lang.Integer])
- val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning
replicas to brokers for the new partitions")
- .withRequiredArg
- .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
- "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
- .ofType(classOf[String])
- .defaultsTo("")
-
- val options = parser.parse(args : _*)
-
- for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) {
- if(!options.has(arg)) {
- System.err.println("***Please note that this tool can only be used to add partitions
when data for a topic does not use a key.***\n" +
- "Missing required argument. " + " \"" + arg + "\"")
- parser.printHelpOn(System.err)
- System.exit(1)
- }
- }
-
- val topic = options.valueOf(topicOpt)
- val zkConnect = options.valueOf(zkConnectOpt)
- val nPartitions = options.valueOf(nPartitionsOpt).intValue
- val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
- var zkClient: ZkClient = null
- try {
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
- addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
- println("adding partitions succeeded!")
- } catch {
- case e: Throwable =>
- println("adding partitions failed because of " + e.getMessage)
- println(Utils.stackTrace(e))
- } finally {
- if (zkClient != null)
- zkClient.close()
- }
- }
-
- def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr:
String = "") {
- val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
- if (existingPartitionsReplicaList.size == 0)
- throw new AdminOperationException("The topic %s does not exist".format(topic))
-
- val existingReplicaList = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get
-
- // create the new partition replication list
- val brokerList = ZkUtils.getSortedBrokerList(zkClient)
- val newPartitionReplicaList = if (replicaAssignmentStr == "")
- AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size,
existingReplicaList.head, existingPartitionsReplicaList.size)
- else
- getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
-
- // check if manual assignment has the right replication factor
- val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size !=
existingReplicaList.size))
- if (unmatchedRepFactorList.size != 0)
- throw new AdminOperationException("The replication factor in manual replication assignment
" +
- " is not equal to the existing replication factor for the topic " + existingReplicaList.size)
-
- info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
- val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition ->
p._2)
- // add the new list
- partitionReplicaList ++= newPartitionReplicaList
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList,
update = true)
- }
-
- def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int],
startPartitionId: Int): Map[Int, List[Int]] = {
- val partitionList = replicaAssignmentList.split(",")
- val ret = new mutable.HashMap[Int, List[Int]]()
- var partitionId = startPartitionId
- for (i <- 0 until partitionList.size) {
- val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
- if (brokerList.size <= 0)
- throw new AdminOperationException("replication factor must be larger than 0")
- if (brokerList.size != brokerList.toSet.size)
- throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
- if (!brokerList.toSet.subsetOf(availableBrokerList))
- throw new AdminOperationException("some specified brokers not available. specified
brokers: " + brokerList.toString +
- "available broker:" + availableBrokerList.toString)
- ret.put(partitionId, brokerList.toList)
- if (ret(partitionId).size != ret(startPartitionId).size)
- throw new AdminOperationException("partition " + i + " has different replication
factor: " + brokerList)
- partitionId = partitionId + 1
- }
- ret.toMap
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d4dbe60/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 6560fc6..8107a64 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -29,6 +29,10 @@ import scala.collection._
import mutable.ListBuffer
import scala.collection.mutable
import kafka.common._
+import scala.Predef._
+import collection.Map
+import scala.Some
+import collection.Set
object AdminUtils extends Logging {
val rand = new Random
@@ -82,6 +86,58 @@ object AdminUtils extends Logging {
}
ret.toMap
}
+
+ def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr:
String = "") {
+ val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+ if (existingPartitionsReplicaList.size == 0)
+ throw new AdminOperationException("The topic %s does not exist".format(topic))
+
+ val existingReplicaList = existingPartitionsReplicaList.head._2
+ val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
+ if (partitionsToAdd <= 0)
+ throw new AdminOperationException("The number of partitions for a topic can only be
increased")
+
+ // create the new partition replication list
+ val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+ val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr
== "")
+ AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size,
existingReplicaList.head, existingPartitionsReplicaList.size)
+ else
+ getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
+
+ // check if manual assignment has the right replication factor
+ val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size !=
existingReplicaList.size))
+ if (unmatchedRepFactorList.size != 0)
+ throw new AdminOperationException("The replication factor in manual replication assignment
" +
+ " is not equal to the existing replication factor for the topic " + existingReplicaList.size)
+
+ info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
+ val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition ->
p._2)
+ // add the new list
+ partitionReplicaList ++= newPartitionReplicaList
+ AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList,
update = true)
+ }
+
+ def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int],
startPartitionId: Int): Map[Int, List[Int]] = {
+ var partitionList = replicaAssignmentList.split(",")
+ val ret = new mutable.HashMap[Int, List[Int]]()
+ var partitionId = startPartitionId
+ partitionList = partitionList.takeRight(partitionList.size - partitionId)
+ for (i <- 0 until partitionList.size) {
+ val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+ if (brokerList.size <= 0)
+ throw new AdminOperationException("replication factor must be larger than 0")
+ if (brokerList.size != brokerList.toSet.size)
+ throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
+ if (!brokerList.toSet.subsetOf(availableBrokerList))
+ throw new AdminOperationException("some specified brokers not available. specified
brokers: " + brokerList.toString +
+ "available broker:" + availableBrokerList.toString)
+ ret.put(partitionId, brokerList.toList)
+ if (ret(partitionId).size != ret(startPartitionId).size)
+ throw new AdminOperationException("partition " + i + " has different replication
factor: " + brokerList)
+ partitionId = partitionId + 1
+ }
+ ret.toMap
+ }
def deleteTopic(zkClient: ZkClient, topic: String) {
zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d4dbe60/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 06bbd37..56f3177 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -78,16 +78,22 @@ object TopicCommand {
def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
- val topics = opts.options.valuesOf(opts.topicOpt)
- val configs = parseTopicConfigs(opts)
- if(opts.options.has(opts.partitionsOpt))
- Utils.croak("Changing the number of partitions is not supported.")
- if(opts.options.has(opts.replicationFactorOpt))
- Utils.croak("Changing the replication factor is not supported.")
- for(topic <- topics) {
+ val topic = opts.options.valueOf(opts.topicOpt)
+ if(opts.options.has(opts.configOpt)) {
+ val configs = parseTopicConfigs(opts)
AdminUtils.changeTopicConfig(zkClient, topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
+ if(opts.options.has(opts.partitionsOpt)) {
+ println("WARNING: If partitions are increased for a topic that has a key, the partition
" +
+ "logic or ordering of the messages will be affected")
+ val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
+ val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
+ AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+ println("adding partitions succeeded!")
+ }
+ if(opts.options.has(opts.replicationFactorOpt))
+ Utils.croak("Changing the replication factor is not supported.")
}
def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
@@ -182,7 +188,8 @@ object TopicCommand {
.withRequiredArg
.describedAs("name=value")
.ofType(classOf[String])
- val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic
being created.")
+ val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic
being created or " +
+ "altered (WARNING: If partitions are increased for a topic that has a key, the partition
logic or ordering of the messages will be affected")
.withRequiredArg
.describedAs("# of partitions")
.ofType(classOf[java.lang.Integer])
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d4dbe60/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 09254cc..115e203 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -100,7 +100,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
def testTopicDoesNotExist {
try {
- AddPartitionsCommand.addPartitions(zkClient, "Blah", 1)
+ AdminUtils.addPartitions(zkClient, "Blah", 1)
fail("Topic should not exist")
} catch {
case e: AdminOperationException => //this is good
@@ -110,7 +110,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
def testWrongReplicaCount {
try {
- AddPartitionsCommand.addPartitions(zkClient, topic1, 2, "0:1:2")
+ AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2")
fail("Add partitions should fail")
} catch {
case e: AdminOperationException => //this is good
@@ -119,7 +119,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
}
def testIncrementPartitions {
- AddPartitionsCommand.addPartitions(zkClient, topic1, 2)
+ AdminUtils.addPartitions(zkClient, topic1, 3)
// wait until leader is elected
var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500)
var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500)
@@ -144,7 +144,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
}
def testManualAssignmentOfReplicas {
- AddPartitionsCommand.addPartitions(zkClient, topic2, 2, "0:1,2:3")
+ AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3")
// wait until leader is elected
var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500)
var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500)
@@ -170,7 +170,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
}
def testReplicaPlacement {
- AddPartitionsCommand.addPartitions(zkClient, topic3, 6)
+ AdminUtils.addPartitions(zkClient, topic3, 7)
// wait until leader is elected
var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500)
var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500)
|