kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.2 updated: KAFKA-7976 - Fix DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable (#6374)
Date Wed, 06 Mar 2019 12:59:50 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new c87b180  KAFKA-7976 - Fix DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable
(#6374)
c87b180 is described below

commit c87b180b08ade2b08c674d8879e2c21fb0785617
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Wed Mar 6 09:55:05 2019 +0000

    KAFKA-7976 - Fix DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable (#6374)
    
    Ensure that controller is not shutdown in the test.
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
---
 .../kafka/server/DynamicBrokerReconfigurationTest.scala    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c13b0a3..80ed131 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -32,7 +32,6 @@ import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.coordinator.group.OffsetConfig
 import kafka.log.LogConfig
 import kafka.message.ProducerCompressionCodec
 import kafka.network.{Processor, RequestChannel}
@@ -133,7 +132,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     }
 
     TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers,
servers)
-    TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
+    TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, servers.head.config.offsetsTopicPartitions,
       replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
 
     createAdminClient(SecurityProtocol.SSL, SecureInternal)
@@ -445,8 +444,14 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
 
   @Test
   def testUncleanLeaderElectionEnable(): Unit = {
+    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+    val controllerId = controller.config.brokerId
+
+    // Create a topic with two replicas on brokers other than the controller
     val topic = "testtopic2"
-    TestUtils.createTopic(zkClient, topic, 1, replicationFactor = 2, servers)
+    val assignment = Map(0 -> Seq((controllerId + 1) % servers.size, (controllerId + 2)
% servers.size))
+    TestUtils.createTopic(zkClient, topic, assignment, servers)
+
     val producer = ProducerBuilder().acks(1).build()
     val consumer = ConsumerBuilder("unclean-leader-test").enableAutoCommit(false).topic(topic).build()
     verifyProduceConsume(producer, consumer, numRecords = 10, topic)
@@ -472,7 +477,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     leaderBroker.shutdown()
     leaderBroker.awaitShutdown()
     followerBroker.startup()
-    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
 
     // Verify that new leader is not elected with unclean leader disabled since there are
no ISRs
     TestUtils.waitUntilTrue(() => partitionInfo.leader == null, "Unclean leader elected")
@@ -928,7 +932,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
       .build()
     verifyProduceConsume(producer1, consumer1, numRecords = 10, topic)
     // send another message to check consumer later
-    producer1.send(new ProducerRecord(topic, "key", "value")).get(100, TimeUnit.MILLISECONDS)
+    producer1.send(new ProducerRecord(topic, "key", "value")).get(1, TimeUnit.SECONDS)
 
     val config = servers.head.config
     val existingListenerCount = config.listeners.size


Mime
View raw message