kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1052; integrate add-partitions command into topicCommand; patched by Sriram Subramanian; reviewed by Jun Rao
Date Thu, 10 Oct 2013 03:45:05 GMT
Updated Branches:
  refs/heads/trunk 343a1e266 -> 8d4dbe60f


kafka-1052; integrate add-partitions command into topicCommand; patched by Sriram Subramanian;
reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d4dbe60
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d4dbe60
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d4dbe60

Branch: refs/heads/trunk
Commit: 8d4dbe60f188c48a7f0d552b7b1109fb8e126521
Parents: 343a1e2
Author: Sriram Subramanian <sriram@gmail.com>
Authored: Wed Oct 9 20:44:49 2013 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Oct 9 20:44:49 2013 -0700

----------------------------------------------------------------------
 .../kafka/admin/AddPartitionsCommand.scala      | 127 -------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  56 ++++++++
 .../main/scala/kafka/admin/TopicCommand.scala   |  23 ++--
 .../unit/kafka/admin/AddPartitionsTest.scala    |  10 +-
 4 files changed, 76 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8d4dbe60/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
deleted file mode 100644
index c74d9c2..0000000
--- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import joptsimple.OptionParser
-import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
-import scala.collection.mutable
-import kafka.common.TopicAndPartition
-
-object AddPartitionsCommand extends Logging {
-
-  def main(args: Array[String]): Unit = {
-    val parser = new OptionParser
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need
to be added.")
-      .withRequiredArg
-      .describedAs("topic")
-      .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the form host:port. " +
-      "Multiple URLS can be given to allow fail-over.")
-      .withRequiredArg
-      .describedAs("urls")
-      .ofType(classOf[String])
-    val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add
to the topic")
-      .withRequiredArg
-      .describedAs("# of partitions")
-      .ofType(classOf[java.lang.Integer])
-    val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning
replicas to brokers for the new partitions")
-      .withRequiredArg
-      .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
-      "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
-      .ofType(classOf[String])
-      .defaultsTo("")
-
-    val options = parser.parse(args : _*)
-
-    for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("***Please note that this tool can only be used to add partitions
when data for a topic does not use a key.***\n" +
-          "Missing required argument. " + " \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    val topic = options.valueOf(topicOpt)
-    val zkConnect = options.valueOf(zkConnectOpt)
-    val nPartitions = options.valueOf(nPartitionsOpt).intValue
-    val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
-    var zkClient: ZkClient = null
-    try {
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
-      println("adding partitions succeeded!")
-    } catch {
-      case e: Throwable =>
-        println("adding partitions failed because of " + e.getMessage)
-        println(Utils.stackTrace(e))
-    } finally {
-      if (zkClient != null)
-        zkClient.close()
-    }
-  }
-
-  def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr:
String = "") {
-    val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
-    if (existingPartitionsReplicaList.size == 0)
-      throw new AdminOperationException("The topic %s does not exist".format(topic))
-
-    val existingReplicaList = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get
-
-    // create the new partition replication list
-    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
-    val newPartitionReplicaList = if (replicaAssignmentStr == "")
-      AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size,
existingReplicaList.head, existingPartitionsReplicaList.size)
-    else
-      getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
-
-    // check if manual assignment has the right replication factor
-    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size !=
existingReplicaList.size))
-    if (unmatchedRepFactorList.size != 0)
-      throw new AdminOperationException("The replication factor in manual replication assignment
" +
-        " is not equal to the existing replication factor for the topic " + existingReplicaList.size)
-
-    info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
-    val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition ->
p._2)
-    // add the new list
-    partitionReplicaList ++= newPartitionReplicaList
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList,
update = true)
-  }
-
-  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int],
startPartitionId: Int): Map[Int, List[Int]] = {
-    val partitionList = replicaAssignmentList.split(",")
-    val ret = new mutable.HashMap[Int, List[Int]]()
-    var partitionId = startPartitionId
-    for (i <- 0 until partitionList.size) {
-      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
-      if (brokerList.size <= 0)
-        throw new AdminOperationException("replication factor must be larger than 0")
-      if (brokerList.size != brokerList.toSet.size)
-        throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
-      if (!brokerList.toSet.subsetOf(availableBrokerList))
-        throw new AdminOperationException("some specified brokers not available. specified
brokers: " + brokerList.toString +
-          "available broker:" + availableBrokerList.toString)
-      ret.put(partitionId, brokerList.toList)
-      if (ret(partitionId).size != ret(startPartitionId).size)
-        throw new AdminOperationException("partition " + i + " has different replication
factor: " + brokerList)
-      partitionId = partitionId + 1
-    }
-    ret.toMap
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d4dbe60/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 6560fc6..8107a64 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -29,6 +29,10 @@ import scala.collection._
 import mutable.ListBuffer
 import scala.collection.mutable
 import kafka.common._
+import scala.Predef._
+import collection.Map
+import scala.Some
+import collection.Set
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -82,6 +86,58 @@ object AdminUtils extends Logging {
     }
     ret.toMap
   }
+
+  def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr:
String = "") {
+    val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+    if (existingPartitionsReplicaList.size == 0)
+      throw new AdminOperationException("The topic %s does not exist".format(topic))
+
+    val existingReplicaList = existingPartitionsReplicaList.head._2
+    val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
+    if (partitionsToAdd <= 0)
+      throw new AdminOperationException("The number of partitions for a topic can only be
increased")
+
+    // create the new partition replication list
+    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+    val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr
== "")
+      AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size,
existingReplicaList.head, existingPartitionsReplicaList.size)
+    else
+      getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
+
+    // check if manual assignment has the right replication factor
+    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size !=
existingReplicaList.size))
+    if (unmatchedRepFactorList.size != 0)
+      throw new AdminOperationException("The replication factor in manual replication assignment
" +
+        " is not equal to the existing replication factor for the topic " + existingReplicaList.size)
+
+    info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
+    val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition ->
p._2)
+    // add the new list
+    partitionReplicaList ++= newPartitionReplicaList
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList,
update = true)
+  }
+
+  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int],
startPartitionId: Int): Map[Int, List[Int]] = {
+    var partitionList = replicaAssignmentList.split(",")
+    val ret = new mutable.HashMap[Int, List[Int]]()
+    var partitionId = startPartitionId
+    partitionList = partitionList.takeRight(partitionList.size - partitionId)
+    for (i <- 0 until partitionList.size) {
+      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+      if (brokerList.size <= 0)
+        throw new AdminOperationException("replication factor must be larger than 0")
+      if (brokerList.size != brokerList.toSet.size)
+        throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
+      if (!brokerList.toSet.subsetOf(availableBrokerList))
+        throw new AdminOperationException("some specified brokers not available. specified
brokers: " + brokerList.toString +
+          "available broker:" + availableBrokerList.toString)
+      ret.put(partitionId, brokerList.toList)
+      if (ret(partitionId).size != ret(startPartitionId).size)
+        throw new AdminOperationException("partition " + i + " has different replication
factor: " + brokerList)
+      partitionId = partitionId + 1
+    }
+    ret.toMap
+  }
   
   def deleteTopic(zkClient: ZkClient, topic: String) {
     zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d4dbe60/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 06bbd37..56f3177 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -78,16 +78,22 @@ object TopicCommand {
   
   def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
-    val topics = opts.options.valuesOf(opts.topicOpt)
-    val configs = parseTopicConfigs(opts)
-    if(opts.options.has(opts.partitionsOpt))
-      Utils.croak("Changing the number of partitions is not supported.")
-    if(opts.options.has(opts.replicationFactorOpt))
-      Utils.croak("Changing the replication factor is not supported.")
-    for(topic <- topics) {
+    val topic = opts.options.valueOf(opts.topicOpt)
+    if(opts.options.has(opts.configOpt)) {
+      val configs = parseTopicConfigs(opts)
       AdminUtils.changeTopicConfig(zkClient, topic, configs)
       println("Updated config for topic \"%s\".".format(topic))
     }
+    if(opts.options.has(opts.partitionsOpt)) {
+      println("WARNING: If partitions are increased for a topic that has a key, the partition
" +
+        "logic or ordering of the messages will be affected")
+      val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
+      val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
+      AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+      println("adding partitions succeeded!")
+    }
+    if(opts.options.has(opts.replicationFactorOpt))
+      Utils.croak("Changing the replication factor is not supported.")
   }
   
   def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
@@ -182,7 +188,8 @@ object TopicCommand {
                           .withRequiredArg
                           .describedAs("name=value")
                           .ofType(classOf[String])
-    val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic
being created.")
+    val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic
being created or " +
+      "altered (WARNING: If partitions are increased for a topic that has a key, the partition
logic or ordering of the messages will be affected")
                            .withRequiredArg
                            .describedAs("# of partitions")
                            .ofType(classOf[java.lang.Integer])

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d4dbe60/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 09254cc..115e203 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -100,7 +100,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
 
   def testTopicDoesNotExist {
     try {
-      AddPartitionsCommand.addPartitions(zkClient, "Blah", 1)
+      AdminUtils.addPartitions(zkClient, "Blah", 1)
       fail("Topic should not exist")
     } catch {
       case e: AdminOperationException => //this is good
@@ -110,7 +110,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
 
   def testWrongReplicaCount {
     try {
-      AddPartitionsCommand.addPartitions(zkClient, topic1, 2, "0:1:2")
+      AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2")
       fail("Add partitions should fail")
     } catch {
       case e: AdminOperationException => //this is good
@@ -119,7 +119,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
   }
 
   def testIncrementPartitions {
-    AddPartitionsCommand.addPartitions(zkClient, topic1, 2)
+    AdminUtils.addPartitions(zkClient, topic1, 3)
     // wait until leader is elected
     var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500)
     var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500)
@@ -144,7 +144,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
   }
 
   def testManualAssignmentOfReplicas {
-    AddPartitionsCommand.addPartitions(zkClient, topic2, 2, "0:1,2:3")
+    AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3")
     // wait until leader is elected
     var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500)
     var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500)
@@ -170,7 +170,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
   }
 
   def testReplicaPlacement {
-    AddPartitionsCommand.addPartitions(zkClient, topic3, 6)
+    AdminUtils.addPartitions(zkClient, topic3, 7)
     // wait until leader is elected
     var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500)
     var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500)


Mime
View raw message