kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7605; Retry async commit failures in integration test cases to fix flaky tests (#5890)
Date Tue, 13 Nov 2018 06:21:30 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e08d721  KAFKA-7605; Retry async commit failures in integration test cases to fix
flaky tests (#5890)
e08d721 is described below

commit e08d721b048c70a0eea82d6c8a57faa69ed40163
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Mon Nov 12 22:21:21 2018 -0800

    KAFKA-7605; Retry async commit failures in integration test cases to fix flaky tests (#5890)
    
    We are seeing some timeouts in tests which depend on the awaitCommitCallback (e.g.
    SaslMultiMechanismConsumerTest.testCoordinatorFailover). After some investigation,
    we found that it is caused by a disconnect when attempting the async commit.
    To fix the problem, we have added simple retry logic to the test utility.
    
    Reviewers: Stanislav Kozlovski <stanislav_kozlovski@outlook.com>, Ismael Juma <ismael@juma.me.uk>
---
 .../integration/kafka/api/BaseConsumerTest.scala   | 51 ++++++++++++++++------
 .../kafka/api/PlaintextConsumerTest.scala          | 21 +++++----
 .../kafka/api/SaslMultiMechanismConsumerTest.scala | 11 ++---
 3 files changed, 51 insertions(+), 32 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 4d2e1a9..a6a81b0 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -84,9 +84,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
     consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset
= 0)
 
     // check async commit callbacks
-    val commitCallback = new CountConsumerCommitCallback()
-    consumer.commitAsync(commitCallback)
-    awaitCommitCallback(consumer, commitCallback)
+    sendAndAwaitAsyncCommit(consumer)
   }
 
   @Test
@@ -191,12 +189,38 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
     records
   }
 
-  protected def awaitCommitCallback[K, V](consumer: Consumer[K, V],
-                                          commitCallback: CountConsumerCommitCallback,
-                                          count: Int = 1): Unit = {
-    TestUtils.pollUntilTrue(consumer, () => commitCallback.successCount >= count,
+  protected def sendAndAwaitAsyncCommit[K, V](consumer: Consumer[K, V],
+                                              offsetsOpt: Option[Map[TopicPartition, OffsetAndMetadata]]
= None): Unit = {
+
+    def sendAsyncCommit(callback: OffsetCommitCallback) = {
+      offsetsOpt match {
+        case Some(offsets) => consumer.commitAsync(offsets.asJava, callback)
+        case None => consumer.commitAsync(callback)
+      }
+    }
+
+    class RetryCommitCallback extends OffsetCommitCallback {
+      var isComplete = false
+      var error: Option[Exception] = None
+
+      override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception:
Exception): Unit = {
+        exception match {
+          case e: RetriableCommitFailedException =>
+            sendAsyncCommit(this)
+          case e =>
+            isComplete = true
+            error = Option(e)
+        }
+      }
+    }
+
+    val commitCallback = new RetryCommitCallback
+
+    sendAsyncCommit(commitCallback)
+    TestUtils.pollUntilTrue(consumer, () => commitCallback.isComplete,
       "Failed to observe commit callback before timeout", waitTimeMs = 10000)
-    assertEquals(count, commitCallback.successCount)
+
+    assertEquals(None, commitCallback.error)
   }
 
   protected def awaitRebalance(consumer: Consumer[_, _], rebalanceListener: TestConsumerReassignmentListener):
Unit = {
@@ -209,21 +233,22 @@ abstract class BaseConsumerTest extends IntegrationTestHarness {
     // The best way to verify that the current membership is still active is to commit offsets.
     // This would fail if the group had rebalanced.
     val initialRevokeCalls = rebalanceListener.callsToRevoked
-    val commitCallback = new CountConsumerCommitCallback
-    consumer.commitAsync(commitCallback)
-    awaitCommitCallback(consumer, commitCallback)
+    sendAndAwaitAsyncCommit(consumer)
     assertEquals(initialRevokeCalls, rebalanceListener.callsToRevoked)
   }
 
   protected class CountConsumerCommitCallback extends OffsetCommitCallback {
     var successCount = 0
     var failCount = 0
+    var lastError: Option[Exception] = None
 
     override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception:
Exception): Unit = {
-      if (exception == null)
+      if (exception == null) {
         successCount += 1
-      else
+      } else {
         failCount += 1
+        lastError = Some(exception)
+      }
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 2aee15a..c06a796 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -480,14 +480,12 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // async commit
     val asyncMetadata = new OffsetAndMetadata(10, "bar")
-    val callback = new CountConsumerCommitCallback
-    consumer.commitAsync(Map((tp, asyncMetadata)).asJava, callback)
-    awaitCommitCallback(consumer, callback)
+    sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
     assertEquals(asyncMetadata, consumer.committed(tp))
 
     // handle null metadata
     val nullMetadata = new OffsetAndMetadata(5, null)
-    consumer.commitSync(Map((tp, nullMetadata)).asJava)
+    consumer.commitSync(Map(tp -> nullMetadata).asJava)
     assertEquals(nullMetadata, consumer.committed(tp))
   }
 
@@ -498,10 +496,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     val callback = new CountConsumerCommitCallback
     val count = 5
+
     for (i <- 1 to count)
       consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, callback)
 
-    awaitCommitCallback(consumer, callback, count=count)
+    TestUtils.pollUntilTrue(consumer, () => callback.successCount >= count || callback.lastError.isDefined,
+      "Failed to observe commit callback before timeout", waitTimeMs = 10000)
+
+    assertEquals(None, callback.lastError)
+    assertEquals(count, callback.successCount)
     assertEquals(new OffsetAndMetadata(count), consumer.committed(tp))
   }
 
@@ -1021,9 +1024,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue)
 
     // commit async and verify onCommit is called
-    val commitCallback = new CountConsumerCommitCallback()
-    testConsumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(5L))).asJava,
commitCallback)
-    awaitCommitCallback(testConsumer, commitCallback)
+    sendAndAwaitAsyncCommit(testConsumer, Some(Map(tp -> new OffsetAndMetadata(5L))))
     assertEquals(5, testConsumer.committed(tp).offset)
     assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue)
 
@@ -1327,9 +1328,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(5, consumer.committed(tp2).offset)
 
     // Using async should pick up the committed changes after commit completes
-    val commitCallback = new CountConsumerCommitCallback()
-    consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(7L))).asJava,
commitCallback)
-    awaitCommitCallback(consumer, commitCallback)
+    sendAndAwaitAsyncCommit(consumer, Some(Map(tp2 -> new OffsetAndMetadata(7L))))
     assertEquals(7, consumer.committed(tp2).offset)
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index 2e51bff..94b5e6f 100644
--- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -45,7 +45,6 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup
{
 
   @Test
   def testMultipleBrokerMechanisms() {
-
     val plainSaslProducer = createProducer()
     val plainSaslConsumer = createConsumer()
 
@@ -60,9 +59,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup
{
     plainSaslConsumer.assign(List(tp).asJava)
     plainSaslConsumer.seek(tp, 0)
     consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = numRecords, startingOffset
= startingOffset)
-    val plainCommitCallback = new CountConsumerCommitCallback()
-    plainSaslConsumer.commitAsync(plainCommitCallback)
-    awaitCommitCallback(plainSaslConsumer, plainCommitCallback)
+    sendAndAwaitAsyncCommit(plainSaslConsumer)
     startingOffset += numRecords
 
     // Test SASL/GSSAPI producer and consumer
@@ -70,9 +67,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup
{
     gssapiSaslConsumer.assign(List(tp).asJava)
     gssapiSaslConsumer.seek(tp, startingOffset)
     consumeAndVerifyRecords(consumer = gssapiSaslConsumer, numRecords = numRecords, startingOffset
= startingOffset)
-    val gssapiCommitCallback = new CountConsumerCommitCallback()
-    gssapiSaslConsumer.commitAsync(gssapiCommitCallback)
-    awaitCommitCallback(gssapiSaslConsumer, gssapiCommitCallback)
+    sendAndAwaitAsyncCommit(gssapiSaslConsumer)
     startingOffset += numRecords
 
     // Test SASL/PLAIN producer and SASL/GSSAPI consumer
@@ -87,6 +82,6 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup
{
     plainSaslConsumer.assign(List(tp).asJava)
     plainSaslConsumer.seek(tp, startingOffset)
     consumeAndVerifyRecords(consumer = plainSaslConsumer, numRecords = numRecords, startingOffset
= startingOffset)
-
   }
+
 }


Mime
View raw message