kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3875; Transient test failure: kafka.api.SslProducerSendTest.testSendNonCompressedMessageWithCreateTime
Date Thu, 04 Aug 2016 11:30:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2e3722a23 -> 6fb33afff


KAFKA-3875; Transient test failure: kafka.api.SslProducerSendTest.testSendNonCompressedMessageWithCreateTime

1. The IllegalStateException is actually thrown from testCloseWithZeroTimeoutFromSenderThread()
due to a bug. We call producer.close() in the callback. Once the first callback is called,
producing records in the callback will hit the IllegalStateException. This only pollutes the
output, but doesn't fail the test. I fixed this by only calling producer.send() in the first
callback.
2. It's not clear which test throws TimeoutException and it's not reproducible locally. One
thing is that the error message in TimeoutException is mis-leading since the timeout is not
necessarily due to metadata. Improved this by making the error message in TimeoutException
clearer.
3. It's not clear what actually failed testSendNonCompressedMessageWithCreateTime(). One thing
I found is that since we set the linger time to MAX_LONG and are sending small messages, those
produced messages won't be drained until we call producer.close(10000L, TimeUnit.MILLISECONDS).
Normally, 10 secs should be enough for the records to be sent. My only hypothesis is that
since SSL is more expensive, occasionally, 10 secs is still not enough. So, I bumped up the
timeout from 10 secs to 20 secs.

Author: Jun Rao <junrao@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1703 from junrao/kafka-3875


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6fb33aff
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6fb33aff
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6fb33aff

Branch: refs/heads/trunk
Commit: 6fb33afff976e467bfa8e0b29eb82770a2a3aaec
Parents: 2e3722a
Author: Jun Rao <junrao@gmail.com>
Authored: Thu Aug 4 12:30:24 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Aug 4 12:30:24 2016 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/internals/RecordBatch.java | 14 ++++++++++----
 .../integration/kafka/api/BaseProducerSendTest.scala  | 11 +++++++----
 2 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6fb33aff/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index e6cd68f..6706bfd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -142,17 +142,23 @@ public final class RecordBatch {
      */
     public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long
lingerMs, boolean isFull) {
         boolean expire = false;
+        String errorMessage = null;
 
-        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
+        if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
{
             expire = true;
-        else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs
+ lingerMs)))
+            errorMessage = (now - this.lastAppendTime) + " ms has passed since last append";
+        } else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs
+ lingerMs))) {
             expire = true;
-        else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs
+ retryBackoffMs)))
+            errorMessage = (now - (this.createdMs + lingerMs)) + " ms has passed since batch
creation plus linger time";
+        } else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs
+ retryBackoffMs))) {
             expire = true;
+            errorMessage = (now - (this.lastAttemptMs + retryBackoffMs)) + " ms has passed
since last attempt plus backoff time";
+        }
 
         if (expire) {
             this.records.close();
-            this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch containing "
+ recordCount + " record(s) expired due to timeout while requesting metadata from brokers
for " + topicPartition));
+            this.done(-1L, Record.NO_TIMESTAMP,
+                      new TimeoutException("Expiring " + recordCount + " record(s) for "
+ topicPartition + " due to " + errorMessage));
         }
 
         return expire;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6fb33aff/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 8eaf827..b5a1284 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -224,7 +224,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, baseTimestamp
+ i, "key".getBytes, "value".getBytes)
         producer.send(record, callback)
       }
-      producer.close(10000L, TimeUnit.MILLISECONDS)
+      producer.close(20000L, TimeUnit.MILLISECONDS)
       assertEquals(s"Should have offset $numRecords but only successfully sent ${callback.offset}",
numRecords, callback.offset)
     } finally {
       producer.close()
@@ -408,11 +408,12 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
 
     // Test closing from sender thread.
-    class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]]) extends Callback
{
+    class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]], sendRecords: Boolean)
extends Callback {
       override def onCompletion(metadata: RecordMetadata, exception: Exception) {
         // Trigger another batch in accumulator before close the producer. These messages
should
         // not be sent.
-        (0 until numRecords) map (i => producer.send(record))
+        if (sendRecords)
+          (0 until numRecords) foreach (i => producer.send(record))
         // The close call will be called by all the message callbacks. This tests idempotence
of the close call.
         producer.close(0, TimeUnit.MILLISECONDS)
         // Test close with non zero timeout. Should not block at all.
@@ -423,7 +424,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       val producer = createProducer(brokerList, lingerMs = Long.MaxValue)
       try {
         // send message to partition 0
-        val responses = (0 until numRecords) map (i => producer.send(record, new CloseCallback(producer)))
+        // Only send the records in the first callback since we close the producer in the
callback and no records
+        // can be sent afterwards.
+        val responses = (0 until numRecords) map (i => producer.send(record, new CloseCallback(producer,
i == 0)))
         assertTrue("No request is complete.", responses.forall(!_.isDone()))
         // flush the messages.
         producer.flush()


Mime
View raw message