kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4420; Group StopReplicaRequests for partitions on the same broker into one StopReplicaRequest
Date Sat, 19 Nov 2016 01:27:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0e7ba7000 -> edfa28067


KAFKA-4420; Group StopReplicaRequests for partitions on the same broker into one StopReplicaRequest

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Onur Karaman <okaraman@linkedin.com>, Jun Rao <junrao@gmail.com>, Ismael
Juma <ismael@juma.me.uk>

Closes #2148 from lindong28/KAFKA-4420


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/edfa2806
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/edfa2806
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/edfa2806

Branch: refs/heads/trunk
Commit: edfa2806712da9663998195c1a7eed0c96bfa509
Parents: 0e7ba70
Author: Dong Lin <lindong28@gmail.com>
Authored: Sat Nov 19 01:22:38 2016 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Nov 19 01:26:53 2016 +0000

----------------------------------------------------------------------
 .../kafka/controller/ControllerChannelManager.scala      | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/edfa2806/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 1935ea2..0e17688 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -411,7 +411,16 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends
 Logging
           .format(broker, stopReplicaWithDelete.mkString(",")))
         debug("The stop replica request (delete = false) sent to broker %d is %s"
           .format(broker, stopReplicaWithoutDelete.mkString(",")))
-        replicaInfoList.foreach { r =>
+
+        val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition
&& r.callback == null)
+
+        // Send one StopReplicaRequest for all partitions that require neither delete nor
callback. This potentially
+        // changes the order in which the requests are sent for the same partitions, but
that's OK.
+        val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, false,
+          replicasToGroup.map(r => new TopicPartition(r.replica.topic, r.replica.partition)).toSet.asJava)
+        controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest)
+
+        replicasToNotGroup.foreach { r =>
           val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch,
r.deletePartition,
             Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
           controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest,
r.callback)


Mime
View raw message