kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (#9022)
Date Sun, 26 Jul 2020 15:32:49 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 8698b7a  KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
(#9022)
8698b7a is described below

commit 8698b7afcb432e7049403c2a556d1867cea1fe26
Author: Brian Byrne <bbyrne@confluent.io>
AuthorDate: Sun Jul 26 08:28:47 2020 -0700

    KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress
(#9022)
    
    Set `replica.fetch.max.bytes` to `1` and produce multiple record batches to allow
    for throttling to take place. This helps avoid a race condition where the
    reassignment would complete more quickly than expected causing an
    assertion to fail some times.
    
    Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jason Gustafson <jason@confluent.io>,
Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/admin/TopicCommandWithAdminClientTest.scala | 19 +++++++++++++++++--
 .../org/apache/kafka/jmh/server/CheckpointBench.java  |  3 ++-
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index a1e49e4..28323f5 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -50,14 +50,20 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with
Loggin
   /**
     * Implementations must override this method to return a set of KafkaConfigs. This method
will be invoked for every
     * test and should not reuse previous configurations unless they select their ports randomly
when servers are started.
+    *
+    * Note the replica fetch max bytes is set to `1` in order to throttle the rate of replication
for test
+    * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
     */
   override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(
     numConfigs = 6,
     zkConnect = zkConnect,
     rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4
-> "rack3", 5 -> "rack3"),
     numPartitions = numPartitions,
-    defaultReplicationFactor = defaultReplicationFactor
-  ).map(KafkaConfig.fromProps)
+    defaultReplicationFactor = defaultReplicationFactor,
+  ).map { props =>
+    props.put(KafkaConfig.ReplicaFetchMaxBytesProp, "1")
+    KafkaConfig.fromProps(props)
+  }
 
   private val numPartitions = 1
   private val defaultReplicationFactor = 1.toShort
@@ -672,8 +678,13 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness
with Loggin
     adminClient.createTopics(
       Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get()
     waitForTopicCreated(testTopicName)
+
+    // Produce multiple batches.
+    TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks =
-1)
     TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks =
-1)
 
+    // Enable throttling. Note the broker config sets the replica max fetch bytes to `1`
upon to minimize replication
+    // throughput so the reassignment doesn't complete quickly.
     val brokerIds = servers.map(_.config.brokerId)
     TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes
= 1)
 
@@ -703,6 +714,10 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness
with Loggin
       topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
     assertEquals(s"--under-replicated-partitions shouldn't return anything: '$underReplicatedOutput'",
"", underReplicatedOutput)
 
+    // Verify reassignment is still ongoing.
+    val reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments.get().get(tp)
+    assertFalse(Option(reassignments).forall(_.addingReplicas.isEmpty))
+
     TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp))
     TestUtils.waitForAllReassignmentsToComplete(adminClient)
   }
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index e103f13..7e24857 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -97,7 +97,8 @@ public class CheckpointBench {
         this.scheduler = new KafkaScheduler(1, "scheduler-thread", true);
         this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig(
                 0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(),
-                Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true,
1, (short) 1));
+                Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true,
1,
+                (short) 1));
         this.metrics = new Metrics();
         this.time = new MockTime();
         this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());


Mime
View raw message