kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-8091; Wait for processor shutdown before testing removed listeners (#6425)
Date Mon, 11 Mar 2019 18:51:42 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new f6c0630  KAFKA-8091; Wait for processor shutdown before testing removed listeners
(#6425)
f6c0630 is described below

commit f6c0630d2ffea2398844e6fa2b25a924088e276d
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Mon Mar 11 18:43:46 2019 +0000

    KAFKA-8091; Wait for processor shutdown before testing removed listeners (#6425)
    
    DynamicBrokerReconfigurationTest.testAddRemoveSaslListeners removes a listener, waits
for the config to be propagated to all brokers and then validates that connections to the
removed listener fail. But there is a small timing window between config update and Processor
shutdown. Before validating that connections to a removed listener fail, this commit waits
for all metrics of the removed listener to be deleted, ensuring that the Processors of the
listener have been shutdown.
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
---
 .../kafka/server/DynamicBrokerReconfigurationTest.scala        | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 80ed131..798961e 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -908,6 +908,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
   private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
                                 saslMechanisms: Seq[String]): Unit = {
     addListener(servers, listenerName, securityProtocol, saslMechanisms)
+    TestUtils.waitUntilTrue(() => servers.forall(hasListenerMetric(_, listenerName)),
+      "Processors not started for new listener")
     if (saslMechanisms.nonEmpty)
       saslMechanisms.foreach { mechanism =>
         verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism")
@@ -954,6 +956,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
 
     TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size
== existingListenerCount - 1),
       "Listeners not updated")
+    // Wait until metrics of the listener have been removed to ensure that processors have
been shutdown before
+    // verifying that connections to the removed listener fail.
+    TestUtils.waitUntilTrue(() => !servers.exists(hasListenerMetric(_, listenerName)),
+      "Processors not shutdown for removed listener")
 
     // Test that connections using deleted listener don't work
     val producerFuture = verifyConnectionFailure(producer1)
@@ -992,6 +998,10 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
   }
 
+  private def hasListenerMetric(server: KafkaServer, listenerName: String): Boolean = {
+    server.socketServer.metrics.metrics.keySet.asScala.exists(_.tags.get("listener") == listenerName)
+  }
+
   private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = {
     val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
     server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true)


Mime
View raw message