kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3594; After calling MemoryRecords.close() method, hasRoomFor() method should return false
Date Fri, 24 Jun 2016 08:57:35 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 ec276b38f -> cc884ee6a


KAFKA-3594; After calling MemoryRecords.close() method, hasRoomFor() method should return
false

This exception is occurring when producer is trying to append a record to a Re-enqueued record
batch in the accumulator. We should not allow to add a record to Re-enqueued record batch.
This is due a bug in MemoryRecords.java/hasRoomFor() method. After calling MemoryRecords.close()
method, hasRoomFor() method should return false.

This is a backport to the 0.9.0 branch.

Author: Manikumar reddy O <manikumar.reddy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1547 from ijuma/kafka-3594-for-0.9.0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cc884ee6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cc884ee6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cc884ee6

Branch: refs/heads/0.9.0
Commit: cc884ee6a264d3552a99876b199d09bb3d640839
Parents: ec276b3
Author: Manikumar reddy O <manikumar.reddy@gmail.com>
Authored: Fri Jun 24 10:57:07 2016 +0200
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Jun 24 10:57:07 2016 +0200

----------------------------------------------------------------------
 .../org/apache/kafka/common/record/MemoryRecords.java    |  5 ++++-
 .../apache/kafka/common/record/MemoryRecordsTest.java    | 11 +++++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cc884ee6/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 971f0a2..de27d5f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -112,7 +112,10 @@ public class MemoryRecords implements Records {
      * to accept this single record.
      */
     public boolean hasRoomFor(byte[] key, byte[] value) {
-        return this.writable && this.compressor.numRecordsWritten() == 0 ?
+        if (!this.writable)
+            return false;
+
+        return this.compressor.numRecordsWritten() == 0 ?
             this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value)
:
             this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD
+ Record.recordSize(key, value);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cc884ee6/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index e343327..7338503 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -67,6 +67,17 @@ public class MemoryRecordsTest {
         }
     }
 
+    @Test
+    public void testHasRoomForMethod() {
+        MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression);
+        recs1.append(0, new Record("a".getBytes(), "1".getBytes()));
+
+        assertTrue(recs1.hasRoomFor("b".getBytes(), "2".getBytes()));
+        recs1.close();
+        assertFalse(recs1.hasRoomFor("b".getBytes(), "2".getBytes()));
+
+    }
+
     @Parameterized.Parameters
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<Object[]>();


Mime
View raw message