kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.0 updated: MINOR: Fix timing issue in advertised listener update test (#5256)
Date Mon, 25 Jun 2018 10:02:29 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new da526a3  MINOR: Fix timing issue in advertised listener update test (#5256)
da526a3 is described below

commit da526a3bd09bb32e847c11c1df37544f24c4e45d
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Mon Jun 25 10:52:31 2018 +0100

    MINOR: Fix timing issue in advertised listener update test (#5256)
    
    Wait for produce to fail before updating listener to avoid send succeeding after the listener
update. Also use different topics in tests with connection failures where one is expected
to fail and the other is expected to succeed.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../server/DynamicBrokerReconfigurationTest.scala  | 28 +++++++++++++++-------
 1 file changed, 19 insertions(+), 9 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 96cca23..38b9f45 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -733,17 +733,25 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
     // Verify that producer connections fail since advertised listener is invalid
     val bootstrap = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
       .replaceAll(invalidHost, "localhost") // allow bootstrap connection to succeed
-    val producer1 = ProducerBuilder().trustStoreProps(sslProperties1).maxRetries(0).bootstrapServers(bootstrap).build()
+    val producer1 = ProducerBuilder().trustStoreProps(sslProperties1)
+      .maxRetries(0)
+      .requestTimeoutMs(1000)
+      .bootstrapServers(bootstrap)
+      .build()
 
-    val sendFuture = verifyConnectionFailure(producer1)
+    assertTrue(intercept[ExecutionException] {
+      producer1.send(new ProducerRecord(topic, "key", "value")).get(2, TimeUnit.SECONDS)
+    }.getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException])
 
     alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
     servers.foreach(validateEndpointsInZooKeeper(_, endpoints => !endpoints.contains(invalidHost)))
 
     // Verify that produce/consume work now
+    val topic2 = "testtopic2"
+    TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers,
servers)
     val producer = ProducerBuilder().trustStoreProps(sslProperties1).maxRetries(0).build()
-    val consumer = ConsumerBuilder("group2").trustStoreProps(sslProperties1).topic(topic).build()
-    verifyProduceConsume(producer, consumer, 10, topic)
+    val consumer = ConsumerBuilder("group2").trustStoreProps(sslProperties1).topic(topic2).build()
+    verifyProduceConsume(producer, consumer, 10, topic2)
 
     // Verify updating inter-broker listener
     val props = new Properties
@@ -756,9 +764,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
         assertTrue(s"Unexpected exception ${e.getCause}", e.getCause.isInstanceOf[InvalidRequestException])
         servers.foreach(server => assertEquals(SecureInternal, server.config.interBrokerListenerName.value))
     }
-
-    // Verify that the other send did not complete
-    verifyTimeout(sendFuture)
   }
 
   @Test
@@ -938,13 +943,15 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
     val consumerFuture = verifyConnectionFailure(consumer1)
 
     // Test that other listeners still work
+    val topic2 = "testtopic2"
+    TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers,
servers)
     val producer2 = ProducerBuilder().trustStoreProps(sslProperties1).maxRetries(0).build()
     val consumer2 = ConsumerBuilder(s"remove-listener-group2-$securityProtocol")
       .trustStoreProps(sslProperties1)
-      .topic(topic)
+      .topic(topic2)
       .autoOffsetReset("latest")
       .build()
-    verifyProduceConsume(producer2, consumer2, numRecords = 10, topic)
+    verifyProduceConsume(producer2, consumer2, numRecords = 10, topic2)
 
     // Verify that producer/consumer using old listener don't work
     verifyTimeout(producerFuture)
@@ -1361,13 +1368,16 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
   private case class ProducerBuilder() extends ClientBuilder[KafkaProducer[String, String]]
{
     private var _retries = 0
     private var _acks = -1
+    private var _requestTimeoutMs = 30000L
 
     def maxRetries(retries: Int): ProducerBuilder = { _retries = retries; this }
     def acks(acks: Int): ProducerBuilder = { _acks = acks; this }
+    def requestTimeoutMs(timeoutMs: Long): ProducerBuilder = { _requestTimeoutMs = timeoutMs;
this }
 
     override def build(): KafkaProducer[String, String] = {
       val producer = TestUtils.createProducer(bootstrapServers,
         acks = _acks,
+        requestTimeoutMs = _requestTimeoutMs,
         retries = _retries,
         securityProtocol = _securityProtocol,
         trustStoreFile = Some(trustStoreFile1),


Mime
View raw message