kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-2265; creating a topic with large number of partitions takes a long time; patched by Manikumar Reddy; reviewed by Jun Rao
Date Thu, 18 Jun 2015 22:59:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5c9040745 -> d9c0ad685


kafka-2265; creating a topic with large number of partitions takes a long time; patched by
Manikumar Reddy; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9c0ad68
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9c0ad68
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9c0ad68

Branch: refs/heads/trunk
Commit: d9c0ad6855b4871694ddf17c9d4546b021302ee8
Parents: 5c90407
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Authored: Thu Jun 18 15:59:11 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Jun 18 15:59:11 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/controller/PartitionStateMachine.scala | 13 +------------
 1 file changed, 1 insertion(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d9c0ad68/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 92fd92d..b4e7c88 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -188,7 +188,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
         case NewPartition =>
           // pre: partition did not exist before this
           assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
-          assignReplicasToPartitions(topic, partition)
           partitionState.put(topicAndPartition, NewPartition)
           val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from
%s to %s with assigned replicas %s"
@@ -266,17 +265,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
   }
 
   /**
-   * Invoked on the NonExistentPartition->NewPartition state transition to update the
controller's cache with the
-   * partition's replica assignment.
-   * @param topic     The topic of the partition whose replica assignment is to be cached
-   * @param partition The partition whose replica assignment is to be cached
-   */
-  private def assignReplicasToPartitions(topic: String, partition: Int) {
-    val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient, topic,
partition)
-    controllerContext.partitionReplicaAssignment += TopicAndPartition(topic, partition) ->
assignedReplicas
-  }
-
-  /**
    * Invoked on the NewPartition->OnlinePartition state change. When a partition is in
the New state, it does not have
    * a leader and isr path in zookeeper. Once the partition moves to the OnlinePartition
state, it's leader and isr
    * path gets initialized and it never goes back to the NewPartition state. From here, it
can only go to the
@@ -526,6 +514,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging
{
           else {
             if (partitionsToBeAdded.size > 0) {
               info("New partitions to be added %s".format(partitionsToBeAdded))
+              controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
               controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
             }
           }


Mime
View raw message