This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e08d721 KAFKA-7605; Retry async commit failures in integration test cases to fix flaky tests (#5890) e08d721 is described below commit e08d721b048c70a0eea82d6c8a57faa69ed40163 Author: Jason Gustafson AuthorDate: Mon Nov 12 22:21:21 2018 -0800 KAFKA-7605; Retry async commit failures in integration test cases to fix flaky tests (#5890) We are seeing some timeouts in tests which depend on the awaitCommitCallback (e.g. SaslMultiMechanismConsumerTest.testCoordinatorFailover). After some investigation, we found that it is caused by a disconnect when attempting the async commit. To fix the problem, we have added simple retry logic to the test utility. Reviewers: Stanislav Kozlovski , Ismael Juma --- .../integration/kafka/api/BaseConsumerTest.scala | 51 ++++++++++++++++------ .../kafka/api/PlaintextConsumerTest.scala | 21 +++++---- .../kafka/api/SaslMultiMechanismConsumerTest.scala | 11 ++--- 3 files changed, 51 insertions(+), 32 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 4d2e1a9..a6a81b0 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -84,9 +84,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0) // check async commit callbacks - val commitCallback = new CountConsumerCommitCallback() - consumer.commitAsync(commitCallback) - awaitCommitCallback(consumer, commitCallback) + sendAndAwaitAsyncCommit(consumer) } @Test @@ -191,12 +189,38 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { records } - protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], - commitCallback: CountConsumerCommitCallback, - count: Int = 1): Unit = { - TestUtils.pollUntilTrue(consumer, () => commitCallback.successCount >= count, + protected def sendAndAwaitAsyncCommit[K, V](consumer: Consumer[K, V], + offsetsOpt: Option[Map[TopicPartition, OffsetAndMetadata]] = None): Unit = { + + def sendAsyncCommit(callback: OffsetCommitCallback) = { + offsetsOpt match { + case Some(offsets) => consumer.commitAsync(offsets.asJava, callback) + case None => consumer.commitAsync(callback) + } + } + + class RetryCommitCallback extends OffsetCommitCallback { + var isComplete = false + var error: Option[Exception] = None + + override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = { + exception match { + case e: RetriableCommitFailedException => + sendAsyncCommit(this) + case e => + isComplete = true + error = Option(e) + } + } + } + + val commitCallback = new RetryCommitCallback + + sendAsyncCommit(commitCallback) + TestUtils.pollUntilTrue(consumer, () => commitCallback.isComplete, "Failed to observe commit callback before timeout", waitTimeMs = 10000) - assertEquals(count, commitCallback.successCount) + + assertEquals(None, commitCallback.error) } protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener: TestConsumerReassignmentListener): Unit = { @@ -209,21 +233,22 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { // The best way to verify that the current membership is still active is to commit offsets. // This would fail if the group had rebalanced. val initialRevokeCalls = rebalanceListener.callsToRevoked - val commitCallback = new CountConsumerCommitCallback - consumer.commitAsync(commitCallback) - awaitCommitCallback(consumer, commitCallback) + sendAndAwaitAsyncCommit(consumer) assertEquals(initialRevokeCalls, rebalanceListener.callsToRevoked) } protected class CountConsumerCommitCallback extends OffsetCommitCallback { var successCount = 0 var failCount = 0 + var lastError: Option[Exception] = None override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = { - if (exception == null) + if (exception == null) { successCount += 1 - else + } else { failCount += 1 + lastError = Some(exception) + } } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 2aee15a..c06a796 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -480,14 +480,12 @@ class PlaintextConsumerTest extends BaseConsumerTest { // async commit val asyncMetadata = new OffsetAndMetadata(10, "bar") - val callback = new CountConsumerCommitCallback - consumer.commitAsync(Map((tp, asyncMetadata)).asJava, callback) - awaitCommitCallback(consumer, callback) + sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata))) assertEquals(asyncMetadata, consumer.committed(tp)) // handle null metadata val nullMetadata = new OffsetAndMetadata(5, null) - consumer.commitSync(Map((tp, nullMetadata)).asJava) + consumer.commitSync(Map(tp -> nullMetadata).asJava) assertEquals(nullMetadata, consumer.committed(tp)) } @@ -498,10 +496,15 @@ class PlaintextConsumerTest extends BaseConsumerTest { val callback = new CountConsumerCommitCallback val count = 5 + for (i <- 1 to count) consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, callback) - awaitCommitCallback(consumer, callback, count=count) + TestUtils.pollUntilTrue(consumer, () => callback.successCount >= count || callback.lastError.isDefined, + "Failed to observe commit callback before timeout", waitTimeMs = 10000) + + assertEquals(None, callback.lastError) + assertEquals(count, callback.successCount) assertEquals(new OffsetAndMetadata(count), consumer.committed(tp)) } @@ -1021,9 +1024,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue) // commit async and verify onCommit is called - val commitCallback = new CountConsumerCommitCallback() - testConsumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(5L))).asJava, commitCallback) - awaitCommitCallback(testConsumer, commitCallback) + sendAndAwaitAsyncCommit(testConsumer, Some(Map(tp -> new OffsetAndMetadata(5L)))) assertEquals(5, testConsumer.committed(tp).offset) assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue) @@ -1327,9 +1328,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(5, consumer.committed(tp2).offset) // Using async should pick up the committed changes after commit completes - val commitCallback = new CountConsumerCommitCallback() - consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava, commitCallback) - awaitCommitCallback(consumer, commitCallback) + sendAndAwaitAsyncCommit(consumer, Some(Map(tp2 -> new OffsetAndMetadata(7L)))) assertEquals(7, consumer.committed(tp2).offset) } diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala index 2e51bff..94b5e6f 100644 --- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala @@ -45,7 +45,6 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup { @Test def testMultipleBrokerMechanisms() { - val plainSaslProducer = createProducer() val plainSaslConsumer = createConsumer() @@ -60,9 +59,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup { plainSaslConsumer.assign(List(tp).asJava) plainSaslConsumer.seek(tp, 0) consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = numRecords, startingOffset = startingOffset) - val plainCommitCallback = new CountConsumerCommitCallback() - plainSaslConsumer.commitAsync(plainCommitCallback) - awaitCommitCallback(plainSaslConsumer, plainCommitCallback) + sendAndAwaitAsyncCommit(plainSaslConsumer) startingOffset += numRecords // Test SASL/GSSAPI producer and consumer @@ -70,9 +67,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup { gssapiSaslConsumer.assign(List(tp).asJava) gssapiSaslConsumer.seek(tp, startingOffset) consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords = numRecords, startingOffset = startingOffset) - val gssapiCommitCallback = new CountConsumerCommitCallback() - gssapiSaslConsumer.commitAsync(gssapiCommitCallback) - awaitCommitCallback(gssapiSaslConsumer, gssapiCommitCallback) + sendAndAwaitAsyncCommit(gssapiSaslConsumer) startingOffset += numRecords // Test SASL/PLAIN producer and SASL/GSSAPI consumer @@ -87,6 +82,6 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup { plainSaslConsumer.assign(List(tp).asJava) plainSaslConsumer.seek(tp, startingOffset) consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = numRecords, startingOffset = startingOffset) - } + }