kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sriram...@apache.org
Subject [01/19] git commit: Move AddPartitions into TopicCommand
Date Tue, 25 Feb 2014 08:27:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3955915a5 -> 00afb619c


Move AddPartitions into TopicCommand


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c304a92a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c304a92a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c304a92a

Branch: refs/heads/trunk
Commit: c304a92a1ce39cc7b7e329fde0b97f93cb8caee3
Parents: 343a1e2
Author: Sriram Subramanian <sriram.sub@gmail.com>
Authored: Tue Oct 8 23:42:34 2013 -0700
Committer: Sriram Subramanian <sriram.sub@gmail.com>
Committed: Tue Oct 8 23:42:34 2013 -0700

----------------------------------------------------------------------
 .../kafka/admin/AddPartitionsCommand.scala      | 127 -------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  57 +++++++++
 .../main/scala/kafka/admin/TopicCommand.scala   |  22 ++--
 .../unit/kafka/admin/AddPartitionsTest.scala    |  10 +-
 4 files changed, 76 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c304a92a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala b/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
deleted file mode 100644
index c74d9c2..0000000
--- a/core/src/main/scala/kafka/admin/AddPartitionsCommand.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import joptsimple.OptionParser
-import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
-import scala.collection.mutable
-import kafka.common.TopicAndPartition
-
-object AddPartitionsCommand extends Logging {
-
-  def main(args: Array[String]): Unit = {
-    val parser = new OptionParser
-    val topicOpt = parser.accepts("topic", "REQUIRED: The topic for which partitions need
to be added.")
-      .withRequiredArg
-      .describedAs("topic")
-      .ofType(classOf[String])
-    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the
zookeeper connection in the form host:port. " +
-      "Multiple URLS can be given to allow fail-over.")
-      .withRequiredArg
-      .describedAs("urls")
-      .ofType(classOf[String])
-    val nPartitionsOpt = parser.accepts("partition", "REQUIRED: Number of partitions to add
to the topic")
-      .withRequiredArg
-      .describedAs("# of partitions")
-      .ofType(classOf[java.lang.Integer])
-    val replicaAssignmentOpt = parser.accepts("replica-assignment-list", "For manually assigning
replicas to brokers for the new partitions")
-      .withRequiredArg
-      .describedAs("broker_id_for_part1_replica1 : broker_id_for_part1_replica2, " +
-      "broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...")
-      .ofType(classOf[String])
-      .defaultsTo("")
-
-    val options = parser.parse(args : _*)
-
-    for(arg <- List(topicOpt, zkConnectOpt, nPartitionsOpt)) {
-      if(!options.has(arg)) {
-        System.err.println("***Please note that this tool can only be used to add partitions
when data for a topic does not use a key.***\n" +
-          "Missing required argument. " + " \"" + arg + "\"")
-        parser.printHelpOn(System.err)
-        System.exit(1)
-      }
-    }
-
-    val topic = options.valueOf(topicOpt)
-    val zkConnect = options.valueOf(zkConnectOpt)
-    val nPartitions = options.valueOf(nPartitionsOpt).intValue
-    val replicaAssignmentStr = options.valueOf(replicaAssignmentOpt)
-    var zkClient: ZkClient = null
-    try {
-      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
-      addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
-      println("adding partitions succeeded!")
-    } catch {
-      case e: Throwable =>
-        println("adding partitions failed because of " + e.getMessage)
-        println(Utils.stackTrace(e))
-    } finally {
-      if (zkClient != null)
-        zkClient.close()
-    }
-  }
-
-  def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr:
String = "") {
-    val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
-    if (existingPartitionsReplicaList.size == 0)
-      throw new AdminOperationException("The topic %s does not exist".format(topic))
-
-    val existingReplicaList = existingPartitionsReplicaList.get(TopicAndPartition(topic,0)).get
-
-    // create the new partition replication list
-    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
-    val newPartitionReplicaList = if (replicaAssignmentStr == "")
-      AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, existingReplicaList.size,
existingReplicaList.head, existingPartitionsReplicaList.size)
-    else
-      getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
-
-    // check if manual assignment has the right replication factor
-    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size !=
existingReplicaList.size))
-    if (unmatchedRepFactorList.size != 0)
-      throw new AdminOperationException("The replication factor in manual replication assignment
" +
-        " is not equal to the existing replication factor for the topic " + existingReplicaList.size)
-
-    info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
-    val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition ->
p._2)
-    // add the new list
-    partitionReplicaList ++= newPartitionReplicaList
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList,
update = true)
-  }
-
-  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int],
startPartitionId: Int): Map[Int, List[Int]] = {
-    val partitionList = replicaAssignmentList.split(",")
-    val ret = new mutable.HashMap[Int, List[Int]]()
-    var partitionId = startPartitionId
-    for (i <- 0 until partitionList.size) {
-      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
-      if (brokerList.size <= 0)
-        throw new AdminOperationException("replication factor must be larger than 0")
-      if (brokerList.size != brokerList.toSet.size)
-        throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
-      if (!brokerList.toSet.subsetOf(availableBrokerList))
-        throw new AdminOperationException("some specified brokers not available. specified
brokers: " + brokerList.toString +
-          "available broker:" + availableBrokerList.toString)
-      ret.put(partitionId, brokerList.toList)
-      if (ret(partitionId).size != ret(startPartitionId).size)
-        throw new AdminOperationException("partition " + i + " has different replication
factor: " + brokerList)
-      partitionId = partitionId + 1
-    }
-    ret.toMap
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c304a92a/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 6560fc6..1360955 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -29,6 +29,11 @@ import scala.collection._
 import mutable.ListBuffer
 import scala.collection.mutable
 import kafka.common._
+import scala.Predef._
+import collection.Map
+import scala.Some
+import collection.Set
+import kafka.common.TopicAndPartition
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -82,6 +87,58 @@ object AdminUtils extends Logging {
     }
     ret.toMap
   }
+
+  def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr:
String = "") {
+    val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
+    if (existingPartitionsReplicaList.size == 0)
+      throw new AdminOperationException("The topic %s does not exist".format(topic))
+
+    val existingReplicaList = existingPartitionsReplicaList.head._2
+    val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
+    if (partitionsToAdd <= 0)
+      throw new AdminOperationException("The number of partitions for a topic can only be
increased")
+
+    // create the new partition replication list
+    val brokerList = ZkUtils.getSortedBrokerList(zkClient)
+    val newPartitionReplicaList = if (replicaAssignmentStr == "")
+      AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size,
existingReplicaList.head, existingPartitionsReplicaList.size)
+    else
+      getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
+
+    // check if manual assignment has the right replication factor
+    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size !=
existingReplicaList.size))
+    if (unmatchedRepFactorList.size != 0)
+      throw new AdminOperationException("The replication factor in manual replication assignment
" +
+        " is not equal to the existing replication factor for the topic " + existingReplicaList.size)
+
+    info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
+    val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition ->
p._2)
+    // add the new list
+    partitionReplicaList ++= newPartitionReplicaList
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList,
update = true)
+  }
+
+  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int],
startPartitionId: Int): Map[Int, List[Int]] = {
+    var partitionList = replicaAssignmentList.split(",")
+    val ret = new mutable.HashMap[Int, List[Int]]()
+    var partitionId = startPartitionId
+    partitionList = partitionList.takeRight(partitionList.size - partitionId)
+    for (i <- 0 until partitionList.size) {
+      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+      if (brokerList.size <= 0)
+        throw new AdminOperationException("replication factor must be larger than 0")
+      if (brokerList.size != brokerList.toSet.size)
+        throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
+      if (!brokerList.toSet.subsetOf(availableBrokerList))
+        throw new AdminOperationException("some specified brokers not available. specified
brokers: " + brokerList.toString +
+          "available broker:" + availableBrokerList.toString)
+      ret.put(partitionId, brokerList.toList)
+      if (ret(partitionId).size != ret(startPartitionId).size)
+        throw new AdminOperationException("partition " + i + " has different replication
factor: " + brokerList)
+      partitionId = partitionId + 1
+    }
+    ret.toMap
+  }
   
   def deleteTopic(zkClient: ZkClient, topic: String) {
     zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c304a92a/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 06bbd37..ee940e9 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -78,16 +78,21 @@ object TopicCommand {
   
   def alterTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
     CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.topicOpt)
-    val topics = opts.options.valuesOf(opts.topicOpt)
-    val configs = parseTopicConfigs(opts)
-    if(opts.options.has(opts.partitionsOpt))
-      Utils.croak("Changing the number of partitions is not supported.")
-    if(opts.options.has(opts.replicationFactorOpt))
-      Utils.croak("Changing the replication factor is not supported.")
-    for(topic <- topics) {
+    val topic = opts.options.valueOf(opts.topicOpt)
+    if(opts.options.has(opts.configOpt)) {
+      val configs = parseTopicConfigs(opts)
       AdminUtils.changeTopicConfig(zkClient, topic, configs)
       println("Updated config for topic \"%s\".".format(topic))
     }
+    if(opts.options.has(opts.partitionsOpt)) {
+      println("partitions can only be added when topic has no key")
+      val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
+      val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
+      AdminUtils.addPartitions(zkClient, topic, nPartitions, replicaAssignmentStr)
+      println("adding partitions succeeded!")
+    }
+    if(opts.options.has(opts.replicationFactorOpt))
+      Utils.croak("Changing the replication factor is not supported.")
   }
   
   def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
@@ -182,7 +187,8 @@ object TopicCommand {
                           .withRequiredArg
                           .describedAs("name=value")
                           .ofType(classOf[String])
-    val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic
being created.")
+    val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic
being created or " +
+      "altered (Partitions can only be added for a topic which has no key. Partitions cannot
be decreased")
                            .withRequiredArg
                            .describedAs("# of partitions")
                            .ofType(classOf[java.lang.Integer])

http://git-wip-us.apache.org/repos/asf/kafka/blob/c304a92a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 09254cc..115e203 100644
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -100,7 +100,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
 
   def testTopicDoesNotExist {
     try {
-      AddPartitionsCommand.addPartitions(zkClient, "Blah", 1)
+      AdminUtils.addPartitions(zkClient, "Blah", 1)
       fail("Topic should not exist")
     } catch {
       case e: AdminOperationException => //this is good
@@ -110,7 +110,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
 
   def testWrongReplicaCount {
     try {
-      AddPartitionsCommand.addPartitions(zkClient, topic1, 2, "0:1:2")
+      AdminUtils.addPartitions(zkClient, topic1, 2, "0:1,0:1:2")
       fail("Add partitions should fail")
     } catch {
       case e: AdminOperationException => //this is good
@@ -119,7 +119,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
   }
 
   def testIncrementPartitions {
-    AddPartitionsCommand.addPartitions(zkClient, topic1, 2)
+    AdminUtils.addPartitions(zkClient, topic1, 3)
     // wait until leader is elected
     var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 1, 500)
     var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 2, 500)
@@ -144,7 +144,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
   }
 
   def testManualAssignmentOfReplicas {
-    AddPartitionsCommand.addPartitions(zkClient, topic2, 2, "0:1,2:3")
+    AdminUtils.addPartitions(zkClient, topic2, 3, "1:2,0:1,2:3")
     // wait until leader is elected
     var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 1, 500)
     var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 2, 500)
@@ -170,7 +170,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness
{
   }
 
   def testReplicaPlacement {
-    AddPartitionsCommand.addPartitions(zkClient, topic3, 6)
+    AdminUtils.addPartitions(zkClient, topic3, 7)
     // wait until leader is elected
     var leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 1, 500)
     var leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 2, 500)


Mime
View raw message