kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Use distinct consumer groups in dynamic listener tests (#4870)
Date Fri, 20 Apr 2018 16:47:01 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9e062b3e MINOR: Use distinct consumer groups in dynamic listener tests (#4870)
9e062b3e is described below

commit 9e062b3e651bd41ad3275fb78ef577b49a108f7a
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Fri Apr 20 17:46:57 2018 +0100

    MINOR: Use distinct consumer groups in dynamic listener tests (#4870)
---
 .../server/DynamicBrokerReconfigurationTest.scala     | 19 +++++++++++--------
 1 file changed, 11 insertions(+), 8 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index bb62fb7..79dec26 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -40,7 +40,7 @@ import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
 import org.apache.kafka.clients.admin._
-import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, ConsumerRecords,
KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable,
TopicPartition}
 import org.apache.kafka.common.config.{ConfigException, ConfigResource, SslConfigs}
@@ -700,7 +700,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
       }
     }
 
-    verifyListener(SecurityProtocol.SSL, None)
+    verifyListener(SecurityProtocol.SSL, None, "add-ssl-listener-group2")
     createAdminClient(SecurityProtocol.SSL, SecureInternal)
     verifyRemoveListener("SSL", SecurityProtocol.SSL, Seq.empty)
   }
@@ -759,9 +759,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     }), "Listener not created")
 
     if (saslMechanisms.nonEmpty)
-      saslMechanisms.foreach(mechanism => verifyListener(securityProtocol, Some(mechanism)))
+      saslMechanisms.foreach { mechanism =>
+        verifyListener(securityProtocol, Some(mechanism), s"add-listener-group-$securityProtocol-$mechanism")
+      }
     else
-      verifyListener(securityProtocol, None)
+      verifyListener(securityProtocol, None, s"add-listener-group-$securityProtocol")
 
     val brokerConfigs = describeConfig(adminClients.head).entries.asScala
     props.asScala.foreach { case (name, value) =>
@@ -818,12 +820,11 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
     verifyTimeout(consumerFuture)
   }
 
-  private def verifyListener(securityProtocol: SecurityProtocol, saslMechanism: Option[String]):
Unit = {
+  private def verifyListener(securityProtocol: SecurityProtocol, saslMechanism: Option[String],
groupId: String): Unit = {
     val mechanism = saslMechanism.getOrElse("")
     val retries = 1000 // since it may take time for metadata to be updated on all brokers
     val producer = createProducer(securityProtocol.name, securityProtocol, mechanism, retries)
-    val consumer = createConsumer(securityProtocol.name, securityProtocol, mechanism,
-      s"add-listener-group-$securityProtocol-$mechanism")
+    val consumer = createConsumer(securityProtocol.name, securityProtocol, mechanism, groupId)
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
   }
 
@@ -917,12 +918,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
   private def createConsumer(listenerName: String, securityProtocol: SecurityProtocol,
                              saslMechanism: String, group: String): KafkaConsumer[String,
String] = {
     val bootstrapServers =  TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
+    val consumerProps = clientProps(securityProtocol, saslMechanism)
+    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
     val consumer = TestUtils.createNewConsumer(bootstrapServers, group,
       autoOffsetReset = "latest",
       securityProtocol = securityProtocol,
       keyDeserializer = new StringDeserializer,
       valueDeserializer = new StringDeserializer,
-      props = Some(clientProps(securityProtocol, saslMechanism)))
+      props = Some(consumerProps))
     consumer.subscribe(Collections.singleton(topic))
     awaitInitialPositions(consumer)
     consumers += consumer

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message