kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject kafka git commit: KAFKA-5289: handleStopReplica should not send a second response
Date Mon, 22 May 2017 12:39:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d3f1407f7 -> ceb10c533


KAFKA-5289: handleStopReplica should not send a second response

`shutdownIdleFetcherThreads()` can throw InterruptedException
for example.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3096 from ijuma/kafka-5289-stop-replica-should-not-send-two-responses


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ceb10c53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ceb10c53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ceb10c53

Branch: refs/heads/trunk
Commit: ceb10c53302bd4824737e03ab86ca19e2aa64c7a
Parents: d3f1407
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon May 22 13:37:29 2017 +0100
Committer: Rajini Sivaram <rajinisivaram@googlemail.com>
Committed: Mon May 22 13:37:29 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ceb10c53/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1346fb3..197298c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -35,7 +35,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinat
 import kafka.log.{Log, LogManager, TimestampOffset}
 import kafka.network.{RequestChannel, RequestOrResponseSend}
 import kafka.security.auth._
-import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME,
isInternal}
@@ -195,7 +195,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // request to become a follower due to which cache for groups that belong to an offsets
topic partition for which broker 1 was the leader,
       // is not cleared.
       result.foreach { case (topicPartition, error) =>
-        if (error == Errors.NONE && stopReplicaRequest.deletePartitions() &&
topicPartition.topic == GROUP_METADATA_TOPIC_NAME) {
+        if (error == Errors.NONE && stopReplicaRequest.deletePartitions &&
topicPartition.topic == GROUP_METADATA_TOPIC_NAME) {
           groupCoordinator.handleGroupEmigration(topicPartition.partition)
         }
       }
@@ -207,7 +207,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava))
     }
 
-    replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
+    CoreUtils.swallow(replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads())
   }
 
   def handleUpdateMetadataRequest(request: RequestChannel.Request) {


Mime
View raw message