kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5213; Mark a MemoryRecordsBuilder as full as soon as the append stream is closed
Date Thu, 11 May 2017 01:18:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0bede30ad -> 970c00eab


KAFKA-5213; Mark a MemoryRecordsBuilder as full as soon as the append stream is closed

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3015 from apurvam/KAFKA-5213-illegalstateexception-in-ensureOpenForAppend


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/970c00ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/970c00ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/970c00ea

Branch: refs/heads/trunk
Commit: 970c00eab80d82b97456e276ee0f5615cb1ccfa1
Parents: 0bede30
Author: Apurva Mehta <apurva@confluent.io>
Authored: Wed May 10 18:15:54 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Wed May 10 18:15:54 2017 -0700

----------------------------------------------------------------------
 .../kafka/common/record/MemoryRecordsBuilder.java    |  2 +-
 .../producer/internals/ProducerBatchTest.java        | 15 +++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/970c00ea/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index f7451cf..025b402 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -664,7 +664,7 @@ public class MemoryRecordsBuilder {
     public boolean isFull() {
         // note that the write limit is respected only after the first record is added which
ensures we can always
         // create non-empty batches (this is used to disable batching when the producer's
batch size is set to 0).
-        return isClosed() || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
+        return appendStreamIsClosed || (this.numRecords > 0 && this.writeLimit
<= estimatedBytesWritten());
     }
 
     public int sizeInBytes() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/970c00ea/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 6895fce..fede528 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -20,12 +20,16 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.TimestampType;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class ProducerBatchTest {
 
@@ -70,4 +74,15 @@ public class ProducerBatchTest {
         // Set `now` to 2ms before the create time.
         assertFalse(batch.maybeExpire(10240, 10240L, now - 2L, 10240L, true));
     }
+
+    @Test
+    public void testShouldNotAttemptAppendOnceRecordsBuilderIsClosedForAppends() {
+        ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder,
now);
+        FutureRecordMetadata result0 = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS,
null, now);
+        assertNotNull(result0);
+        assertTrue(memoryRecordsBuilder.hasRoomFor(now, null, new byte[10]));
+        memoryRecordsBuilder.closeForRecordAppends();
+        assertFalse(memoryRecordsBuilder.hasRoomFor(now, null, new byte[10]));
+        assertEquals(null, batch.tryAppend(now + 1, null, new byte[10], Record.EMPTY_HEADERS,
null, now + 1));
+    }
 }


Mime
View raw message