This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 99472c5 KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (#9022) 99472c5 is described below commit 99472c54f01e1f654a3ccb774a7857f10d69e2e3 Author: Brian Byrne AuthorDate: Sun Jul 26 08:28:47 2020 -0700 KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (#9022) Set `replica.fetch.max.bytes` to `1` and produce multiple record batches to allow for throttling to take place. This helps avoid a race condition where the reassignment would complete more quickly than expected causing an assertion to fail some times. Reviewers: Lucas Bradstreet , Jason Gustafson , Chia-Ping Tsai , Ismael Juma --- .../kafka/admin/TopicCommandWithAdminClientTest.scala | 19 +++++++++++++++++-- .../org/apache/kafka/jmh/server/CheckpointBench.java | 3 ++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala index a1e49e4..28323f5 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala @@ -50,14 +50,20 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin /** * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every * test and should not reuse previous configurations unless they select their ports randomly when servers are started. + * + * Note the replica fetch max bytes is set to `1` in order to throttle the rate of replication for test + * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`. */ override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs( numConfigs = 6, zkConnect = zkConnect, rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"), numPartitions = numPartitions, - defaultReplicationFactor = defaultReplicationFactor - ).map(KafkaConfig.fromProps) + defaultReplicationFactor = defaultReplicationFactor, + ).map { props => + props.put(KafkaConfig.ReplicaFetchMaxBytesProp, "1") + KafkaConfig.fromProps(props) + } private val numPartitions = 1 private val defaultReplicationFactor = 1.toShort @@ -672,8 +678,13 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin adminClient.createTopics( Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap))).all().get() waitForTopicCreated(testTopicName) + + // Produce multiple batches. + TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) TestUtils.generateAndProduceMessages(servers, testTopicName, numMessages = 10, acks = -1) + // Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication + // throughput so the reassignment doesn't complete quickly. val brokerIds = servers.map(_.config.brokerId) TestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp), throttleBytes = 1) @@ -703,6 +714,10 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions")))) assertEquals(s"--under-replicated-partitions shouldn't return anything: '$underReplicatedOutput'", "", underReplicatedOutput) + // Verify reassignment is still ongoing. + val reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments.get().get(tp) + assertFalse(Option(reassignments).forall(_.addingReplicas.isEmpty)) + TestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Set(tp)) TestUtils.waitForAllReassignmentsToComplete(adminClient) } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index e103f13..7e24857 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -97,7 +97,8 @@ public class CheckpointBench { this.scheduler = new KafkaScheduler(1, "scheduler-thread", true); this.brokerProperties = KafkaConfig.fromProps(TestUtils.createBrokerConfig( 0, TestUtils.MockZkConnect(), true, true, 9092, Option.empty(), Option.empty(), - Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, (short) 1)); + Option.empty(), true, false, 0, false, 0, false, 0, Option.empty(), 1, true, 1, + (short) 1)); this.metrics = new Metrics(); this.time = new MockTime(); this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());