kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sriram...@apache.org
Subject [15/19] git commit: Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk
Date Tue, 25 Feb 2014 08:27:25 GMT
Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk

Conflicts:
	core/src/main/scala/kafka/controller/KafkaController.scala


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f473566
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f473566
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f473566

Branch: refs/heads/trunk
Commit: 3f473566ab2ef51f17034dc02f632df5d38fe307
Parents: 72610d1 26a02c3
Author: Sriram Subramanian <sriram.sub@gmail.com>
Authored: Mon Jan 27 13:25:06 2014 -0800
Committer: Sriram Subramanian <sriram.sub@gmail.com>
Committed: Mon Jan 27 13:25:06 2014 -0800

----------------------------------------------------------------------
 bin/zookeeper-shell.sh                          |   6 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  10 +-
 .../kafka/api/ControlledShutdownRequest.scala   |  16 +-
 .../kafka/api/ControlledShutdownResponse.scala  |   3 +
 .../src/main/scala/kafka/api/FetchRequest.scala |  26 ++-
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |  23 +-
 .../scala/kafka/api/LeaderAndIsrResponse.scala  |   3 +
 .../scala/kafka/api/OffsetCommitRequest.scala   |  16 ++
 .../scala/kafka/api/OffsetCommitResponse.scala  |   3 +
 .../scala/kafka/api/OffsetFetchRequest.scala    |  18 +-
 .../scala/kafka/api/OffsetFetchResponse.scala   |   4 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |  23 +-
 .../main/scala/kafka/api/OffsetResponse.scala   |   1 +
 .../main/scala/kafka/api/ProducerRequest.scala  |  24 ++-
 .../main/scala/kafka/api/ProducerResponse.scala |   2 +
 .../scala/kafka/api/RequestOrResponse.scala     |   8 +-
 .../scala/kafka/api/StopReplicaRequest.scala    |  25 ++-
 .../scala/kafka/api/StopReplicaResponse.scala   |   2 +
 .../scala/kafka/api/TopicMetadataRequest.scala  |  21 +-
 .../scala/kafka/api/TopicMetadataResponse.scala |   2 +
 .../scala/kafka/api/UpdateMetadataRequest.scala |  19 +-
 .../kafka/api/UpdateMetadataResponse.scala      |   2 +
 .../main/scala/kafka/cluster/Partition.scala    |  19 +-
 .../common/LogCleaningAbortedException.scala    |  24 +++
 .../common/OptimisticLockFailureException.scala |  23 --
 .../kafka/common/ThreadShutdownException.scala  |  24 +++
 .../controller/ControllerChannelManager.scala   |  53 +++--
 .../kafka/controller/KafkaController.scala      | 198 +++++++++++-------
 .../controller/PartitionLeaderSelector.scala    |  40 ++--
 .../controller/PartitionStateMachine.scala      |  22 +-
 .../kafka/controller/ReplicaStateMachine.scala  |  35 ++--
 .../kafka/javaapi/TopicMetadataRequest.scala    |  25 +++
 core/src/main/scala/kafka/log/Log.scala         |  37 ++--
 core/src/main/scala/kafka/log/LogCleaner.scala  | 208 +++++++------------
 .../scala/kafka/log/LogCleanerManager.scala     | 188 +++++++++++++++++
 core/src/main/scala/kafka/log/LogManager.scala  |  76 ++++---
 .../scala/kafka/network/RequestChannel.scala    |   6 +-
 .../scala/kafka/producer/SyncProducer.scala     |   5 -
 .../kafka/server/AbstractFetcherThread.scala    |  10 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   5 +-
 .../kafka/server/KafkaRequestHandler.scala      |   1 +
 .../kafka/server/ReplicaFetcherThread.scala     |   7 +-
 .../scala/kafka/server/ReplicaManager.scala     |  18 +-
 .../kafka/server/ZookeeperLeaderElector.scala   |  36 ++--
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  43 ----
 .../test/scala/unit/kafka/admin/AdminTest.scala |  15 +-
 .../test/scala/unit/kafka/log/CleanerTest.scala |  39 ++--
 examples/README                                 |   6 +-
 system_test/utils/kafka_system_test_utils.py    |   6 +-
 49 files changed, 913 insertions(+), 513 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3f473566/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/controller/KafkaController.scala
index ca2f09b,a0267ae..f4f00b2
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@@ -488,7 -546,7 +546,7 @@@ class KafkaController(val config : Kafk
      }
    }
  
--  def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
++  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance:
Boolean = true) {
      info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
      try {
        controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@@ -496,7 -554,7 +554,7 @@@
      } catch {
        case e: Throwable => error("Error completing preferred replica leader election
for partitions %s".format(partitions.mkString(",")), e)
      } finally {
--      removePartitionsFromPreferredReplicaElection(partitions)
++      removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
      }
    }
  
@@@ -765,7 -823,7 +823,8 @@@
      }
    }
  
--  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition])
{
++  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
++                                                   isTriggeredByAutoRebalance : Boolean)
{
      for(partition <- partitionsToBeRemoved) {
        // check the status
        val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
@@@ -776,7 -834,7 +835,8 @@@
          warn("Partition %s failed to complete preferred replica leader election. Leader
is %d".format(partition, currentLeader))
        }
      }
--    ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
++    if (!isTriggeredByAutoRebalance)
++      ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
      controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
    }
  
@@@ -966,26 -1018,26 +1020,16 @@@
            // check ratio and if greater than desired ratio, trigger a rebalance for the
topic partitions
            // that need to be on this broker
            if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble /
100)) {
--            controllerContext.controllerLock synchronized {
--              // do this check only if the broker is live and there are no partitions being
reassigned currently
--              // and preferred replica election is not in progress
--              if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
--                  controllerContext.partitionsBeingReassigned.size == 0 &&
--                  controllerContext.partitionsUndergoingPreferredReplicaElection.size ==
0) {
--                val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
--                val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic"
-> e.topic, "partition" -> e.partition))
--                val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
--                try {
--                  ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
--                  info("Created preferred replica election path with %s".format(jsonData))
--                } catch {
--                  case e2: ZkNodeExistsException =>
--                    val partitionsUndergoingPreferredReplicaElection =
--                      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient,
zkPath)._1)
--                    error("Preferred replica leader election currently in progress for "
+
--                          "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
--                  case e3: Throwable =>
--                    error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
++            topicsNotInPreferredReplica.foreach {
++              case(topicPartition, replicas) => {
++                controllerContext.controllerLock synchronized {
++                  // do this check only if the broker is live and there are no partitions
being reassigned currently
++                  // and preferred replica election is not in progress
++                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
++                    controllerContext.partitionsBeingReassigned.size == 0 &&
++                    controllerContext.partitionsUndergoingPreferredReplicaElection.size
== 0) {
++                    onPreferredReplicaElection(Set(topicPartition), false)
++                  }
                  }
                }
              }


Mime
View raw message