kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5316; LogCleaner should account for larger record sets after cleaning
Date Thu, 01 Jun 2017 10:02:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 8b306ca2a -> 386a8d041


KAFKA-5316; LogCleaner should account for larger record sets after cleaning

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3165, #3187 from ijuma/kafka-5316-log-cleaner-0.10.2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/386a8d04
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/386a8d04
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/386a8d04

Branch: refs/heads/0.10.2
Commit: 386a8d041c474b8ba66f3abd815338347f944b79
Parents: 8b306ca
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu Jun 1 11:01:07 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Jun 1 11:01:24 2017 +0100

----------------------------------------------------------------------
 .../common/record/ByteBufferOutputStream.java   | 125 ++++++++++++++-----
 .../kafka/common/record/MemoryRecords.java      |  58 +++++++--
 .../common/record/MemoryRecordsBuilder.java     |  50 +++++---
 .../org/apache/kafka/common/record/Record.java  |   2 +-
 .../clients/consumer/internals/FetcherTest.java |   3 +-
 .../record/ByteBufferOutputStreamTest.java      | 101 +++++++++++++++
 .../kafka/common/record/MemoryRecordsTest.java  |  58 ++++++++-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  11 +-
 8 files changed, 334 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
index 3fb7f49..39c1cfe 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
@@ -16,54 +16,111 @@
  */
 package org.apache.kafka.common.record;
 
-import java.io.DataOutputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 /**
- * A byte buffer backed output outputStream
+ * A ByteBuffer-backed OutputStream that expands the internal ByteBuffer as required. Given
this, the caller should
+ * always access the underlying ByteBuffer via the {@link #buffer()} method until all writes
are completed.
+ *
+ * This class is typically used for 2 purposes:
+ *
+ * 1. Write to a ByteBuffer when there is a chance that we may need to expand it in order
to fit all the desired data
+ * 2. Write to a ByteBuffer via methods that expect an OutputStream interface
+ *
+ * Hard to track bugs can happen when this class is used for the second reason and unexpected
buffer expansion happens.
+ * So, it's best to assume that buffer expansion can always happen. An improvement would
be to create a separate class
+ * that throws an error if buffer expansion is required to avoid the issue altogether.
  */
-public class ByteBufferOutputStream extends DataOutputStream {
+public class ByteBufferOutputStream extends OutputStream {
 
     private static final float REALLOCATION_FACTOR = 1.1f;
 
+    private final int initialCapacity;
+    private final int initialPosition;
+    private ByteBuffer buffer;
+
+    /**
+     * Creates an instance of this class that will write to the received `buffer` up to its
`limit`. If necessary to
+     * satisfy `write` or `position` calls, larger buffers will be allocated so the {@link
#buffer()} method may return
+     * a different buffer than the received `buffer` parameter.
+     *
+     * Prefer one of the constructors that allocate the internal buffer for clearer semantics.
+     */
     public ByteBufferOutputStream(ByteBuffer buffer) {
-        super(new UnderlyingOutputStream(buffer));
+        this.buffer = buffer;
+        this.initialPosition = buffer.position();
+        this.initialCapacity = buffer.capacity();
+    }
+
+    public ByteBufferOutputStream(int initialCapacity) {
+        this(initialCapacity, false);
+    }
+
+    public ByteBufferOutputStream(int initialCapacity, boolean directBuffer) {
+        this(directBuffer ? ByteBuffer.allocateDirect(initialCapacity) : ByteBuffer.allocate(initialCapacity));
+    }
+
+    public void write(int b) {
+        maybeExpandBuffer(1);
+        buffer.put((byte) b);
+    }
+
+    public void write(byte[] bytes, int off, int len) {
+        maybeExpandBuffer(len);
+        buffer.put(bytes, off, len);
+    }
+
+    public void write(ByteBuffer sourceBuffer) {
+        maybeExpandBuffer(sourceBuffer.remaining());
+        buffer.put(sourceBuffer);
     }
 
     public ByteBuffer buffer() {
-        return ((UnderlyingOutputStream) out).buffer;
+        return buffer;
+    }
+
+    public int position() {
+        return buffer.position();
+    }
+
+    public int remaining() {
+        return buffer.remaining();
+    }
+
+    public int limit() {
+        return buffer.limit();
+    }
+
+    public void position(int position) {
+        maybeExpandBuffer(position - buffer.position());
+        buffer.position(position);
+    }
+
+    /**
+     * The capacity of the first internal ByteBuffer used by this class. This is useful in
cases where a pooled
+     * ByteBuffer was passed via the constructor and it needs to be returned to the pool.
+     */
+    public int initialCapacity() {
+        return initialCapacity;
+    }
+
+    private void maybeExpandBuffer(int remainingRequired) {
+        if (remainingRequired > buffer.remaining())
+            expandBuffer(remainingRequired);
     }
 
-    public static class UnderlyingOutputStream extends OutputStream {
-        private ByteBuffer buffer;
-
-        public UnderlyingOutputStream(ByteBuffer buffer) {
-            this.buffer = buffer;
-        }
-
-        public void write(int b) {
-            if (buffer.remaining() < 1)
-                expandBuffer(buffer.capacity() + 1);
-            buffer.put((byte) b);
-        }
-
-        public void write(byte[] bytes, int off, int len) {
-            if (buffer.remaining() < len)
-                expandBuffer(buffer.capacity() + len);
-            buffer.put(bytes, off, len);
-        }
-
-        public ByteBuffer buffer() {
-            return buffer;
-        }
-
-        private void expandBuffer(int size) {
-            int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
-            ByteBuffer temp = ByteBuffer.allocate(expandSize);
-            temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
-            buffer = temp;
-        }
+    private void expandBuffer(int remainingRequired) {
+        int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position()
+ remainingRequired);
+        ByteBuffer temp = ByteBuffer.allocate(expandSize);
+        int limit = limit();
+        buffer.flip();
+        temp.put(buffer);
+        buffer.limit(limit);
+        // reset the old buffer's position so that the partial data in the new buffer cannot
be mistakenly consumed
+        // we should ideally only do this for the original buffer, but the additional complexity
doesn't seem worth it
+        buffer.position(initialPosition);
+        buffer = temp;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 6c31b25..f1a6e43 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -12,7 +12,10 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.ByteBufferLogInputStream.ByteBufferLogEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -28,7 +31,7 @@ import java.util.List;
  * or one of the {@link #builder(ByteBuffer, byte, CompressionType, TimestampType) builder}
variants.
  */
 public class MemoryRecords extends AbstractRecords {
-
+    private static final Logger log = LoggerFactory.getLogger(MemoryRecords.class);
     public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
 
     private final ByteBuffer buffer;
@@ -107,12 +110,16 @@ public class MemoryRecords extends AbstractRecords {
      * @param destinationBuffer The byte buffer to write the filtered records to
      * @return A FilterResult with a summary of the output (for metrics)
      */
-    public FilterResult filterTo(LogEntryFilter filter, ByteBuffer destinationBuffer) {
-        return filterTo(shallowEntries(), filter, destinationBuffer);
+    public FilterResult filterTo(TopicPartition partition, LogEntryFilter filter, ByteBuffer
destinationBuffer,
+                                 int maxRecordSize) {
+        return filterTo(partition, shallowEntries(), filter, destinationBuffer, maxRecordSize);
     }
 
-    private static FilterResult filterTo(Iterable<ByteBufferLogEntry> fromShallowEntries,
LogEntryFilter filter,
-                                       ByteBuffer destinationBuffer) {
+    private static FilterResult filterTo(TopicPartition partition,
+                                         Iterable<ByteBufferLogEntry> fromShallowEntries,
+                                         LogEntryFilter filter,
+                                         ByteBuffer destinationBuffer,
+                                         int maxRecordSize) {
         long maxTimestamp = Record.NO_TIMESTAMP;
         long maxOffset = -1L;
         long shallowOffsetOfMaxTimestamp = -1L;
@@ -121,6 +128,8 @@ public class MemoryRecords extends AbstractRecords {
         int messagesRetained = 0;
         int bytesRetained = 0;
 
+        ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
+
         for (ByteBufferLogEntry shallowEntry : fromShallowEntries) {
             bytesRead += shallowEntry.sizeInBytes();
 
@@ -155,7 +164,7 @@ public class MemoryRecords extends AbstractRecords {
 
             if (writeOriginalEntry) {
                 // There are no messages compacted out and no message format conversion,
write the original message set back
-                shallowEntry.writeTo(destinationBuffer);
+                bufferOutputStream.write(shallowEntry.buffer());
                 messagesRetained += retainedEntries.size();
                 bytesRetained += shallowEntry.sizeInBytes();
 
@@ -164,23 +173,45 @@ public class MemoryRecords extends AbstractRecords {
                     shallowOffsetOfMaxTimestamp = shallowEntry.offset();
                 }
             } else if (!retainedEntries.isEmpty()) {
-                ByteBuffer slice = destinationBuffer.slice();
-                MemoryRecordsBuilder builder = builderWithEntries(slice, shallowRecord.timestampType(),
shallowRecord.compressionType(),
-                        shallowRecord.timestamp(), retainedEntries);
+                LogEntry firstEntry = retainedEntries.iterator().next();
+                long firstOffset = firstEntry.offset();
+                byte magic = firstEntry.record().magic();
+
+                MemoryRecordsBuilder builder = new MemoryRecordsBuilder(bufferOutputStream,
magic,
+                        shallowRecord.compressionType(), shallowRecord.timestampType(),
+                        firstOffset, shallowRecord.timestamp(), bufferOutputStream.buffer().remaining());
+                for (LogEntry entry : retainedEntries)
+                    builder.appendWithOffset(entry.offset(), entry.record());
+
                 MemoryRecords records = builder.build();
-                destinationBuffer.position(destinationBuffer.position() + slice.position());
+                int filteredSizeInBytes = records.sizeInBytes();
+
                 messagesRetained += retainedEntries.size();
                 bytesRetained += records.sizeInBytes();
 
+                if (filteredSizeInBytes > shallowEntry.sizeInBytes() && filteredSizeInBytes
> maxRecordSize)
+                    log.warn("Record batch from {} with first offset {} exceeded max record
size {} after cleaning " +
+                                    "(new size is {}). Consumers with version earlier than
0.10.1.0 may need to " +
+                                    "increase their fetch sizes.",
+                            partition, firstOffset, maxRecordSize, filteredSizeInBytes);
+
                 MemoryRecordsBuilder.RecordsInfo info = builder.info();
                 if (info.maxTimestamp > maxTimestamp) {
                     maxTimestamp = info.maxTimestamp;
                     shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp;
                 }
             }
+
+            // If we had to allocate a new buffer to fit the filtered output (see KAFKA-5316),
return early to
+            // avoid the need for additional allocations.
+            ByteBuffer outputBuffer = bufferOutputStream.buffer();
+            if (outputBuffer != destinationBuffer)
+                return new FilterResult(outputBuffer, messagesRead, bytesRead, messagesRetained,
bytesRetained,
+                    maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
         }
 
-        return new FilterResult(messagesRead, bytesRead, messagesRetained, bytesRetained,
maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
+        return new FilterResult(destinationBuffer, messagesRead, bytesRead, messagesRetained,
bytesRetained,
+                maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
     }
 
     /**
@@ -259,6 +290,7 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     public static class FilterResult {
+        public final ByteBuffer output;
         public final int messagesRead;
         public final int bytesRead;
         public final int messagesRetained;
@@ -267,13 +299,15 @@ public class MemoryRecords extends AbstractRecords {
         public final long maxTimestamp;
         public final long shallowOffsetOfMaxTimestamp;
 
-        public FilterResult(int messagesRead,
+        public FilterResult(ByteBuffer output,
+                            int messagesRead,
                             int bytesRead,
                             int messagesRetained,
                             int bytesRetained,
                             long maxOffset,
                             long maxTimestamp,
                             long shallowOffsetOfMaxTimestamp) {
+            this.output = output;
             this.messagesRead = messagesRead;
             this.bytesRead = bytesRead;
             this.messagesRetained = messagesRetained;

http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 02bfc24..a46c1c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -105,6 +105,34 @@ public class MemoryRecordsBuilder {
 
     private MemoryRecords builtRecords;
 
+
+    public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
+                                byte magic,
+                                CompressionType compressionType,
+                                TimestampType timestampType,
+                                long baseOffset,
+                                long logAppendTime,
+                                int writeLimit) {
+        this.magic = magic;
+        this.timestampType = timestampType;
+        this.compressionType = compressionType;
+        this.baseOffset = baseOffset;
+        this.logAppendTime = logAppendTime;
+        this.initPos = bufferStream.position();
+        this.writeLimit = writeLimit;
+        this.initialCapacity = bufferStream.initialCapacity();
+
+        if (compressionType != CompressionType.NONE) {
+            // for compressed records, leave space for the header and the shallow message
metadata
+            // and move the starting position to the value payload offset
+            bufferStream.position(initPos + Records.LOG_OVERHEAD + Record.recordOverhead(magic));
+        }
+
+        // create the stream
+        this.bufferStream = bufferStream;
+        appendStream = wrapForOutput(bufferStream, compressionType, magic, COMPRESSION_DEFAULT_BUFFER_SIZE);
+    }
+
     /**
      * Construct a new builder.
      *
@@ -126,24 +154,8 @@ public class MemoryRecordsBuilder {
                                 long baseOffset,
                                 long logAppendTime,
                                 int writeLimit) {
-        this.magic = magic;
-        this.timestampType = timestampType;
-        this.compressionType = compressionType;
-        this.baseOffset = baseOffset;
-        this.logAppendTime = logAppendTime;
-        this.initPos = buffer.position();
-        this.writeLimit = writeLimit;
-        this.initialCapacity = buffer.capacity();
-
-        if (compressionType != CompressionType.NONE) {
-            // for compressed records, leave space for the header and the shallow message
metadata
-            // and move the starting position to the value payload offset
-            buffer.position(initPos + Records.LOG_OVERHEAD + Record.recordOverhead(magic));
-        }
-
-        // create the stream
-        bufferStream = new ByteBufferOutputStream(buffer);
-        appendStream = wrapForOutput(bufferStream, compressionType, magic, COMPRESSION_DEFAULT_BUFFER_SIZE);
+        this(new ByteBufferOutputStream(buffer), magic, compressionType, timestampType, baseOffset,
logAppendTime,
+                writeLimit);
     }
 
     public ByteBuffer buffer() {
@@ -400,7 +412,7 @@ public class MemoryRecordsBuilder {
         try {
             switch (type) {
                 case NONE:
-                    return buffer;
+                    return new DataOutputStream(buffer);
                 case GZIP:
                     return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
                 case SNAPPY:

http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 0c0fa3c..de092c7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -469,7 +469,7 @@ public final class Record {
                               CompressionType compressionType,
                               TimestampType timestampType) {
         try {
-            ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
+            DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
             write(out, magic, timestamp, key, value, compressionType, timestampType);
         } catch (IOException e) {
             throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 24ba434..063c232 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -63,6 +63,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -219,7 +220,7 @@ public class FetcherTest {
     @Test
     public void testParseInvalidRecord() throws Exception {
         ByteBuffer buffer = ByteBuffer.allocate(1024);
-        ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
+        DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
 
         byte magic = Record.CURRENT_MAGIC_VALUE;
         byte[] key = "foo".getBytes();

http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/test/java/org/apache/kafka/common/record/ByteBufferOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/ByteBufferOutputStreamTest.java
b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferOutputStreamTest.java
new file mode 100644
index 0000000..6e7b81b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/ByteBufferOutputStreamTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class ByteBufferOutputStreamTest {
+
+    @Test
+    public void testExpandByteBufferOnPositionIncrease() throws Exception {
+        testExpandByteBufferOnPositionIncrease(ByteBuffer.allocate(16));
+    }
+
+    @Test
+    public void testExpandDirectByteBufferOnPositionIncrease() throws Exception {
+        testExpandByteBufferOnPositionIncrease(ByteBuffer.allocateDirect(16));
+    }
+
+    private void testExpandByteBufferOnPositionIncrease(ByteBuffer initialBuffer) throws
Exception {
+        ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer);
+        output.write("hello".getBytes());
+        output.position(32);
+        assertEquals(32, output.position());
+        assertEquals(0, initialBuffer.position());
+
+        ByteBuffer buffer = output.buffer();
+        assertEquals(32, buffer.limit());
+        buffer.position(0);
+        buffer.limit(5);
+        byte[] bytes = new byte[5];
+        buffer.get(bytes);
+        assertArrayEquals("hello".getBytes(), bytes);
+    }
+
+    @Test
+    public void testExpandByteBufferOnWrite() throws Exception {
+        testExpandByteBufferOnWrite(ByteBuffer.allocate(16));
+    }
+
+    @Test
+    public void testExpandDirectByteBufferOnWrite() throws Exception {
+        testExpandByteBufferOnWrite(ByteBuffer.allocateDirect(16));
+    }
+
+    private void testExpandByteBufferOnWrite(ByteBuffer initialBuffer) throws Exception {
+        ByteBufferOutputStream output = new ByteBufferOutputStream(initialBuffer);
+        output.write("hello".getBytes());
+        output.write(new byte[27]);
+        assertEquals(32, output.position());
+        assertEquals(0, initialBuffer.position());
+
+        ByteBuffer buffer = output.buffer();
+        assertEquals(32, buffer.limit());
+        buffer.position(0);
+        buffer.limit(5);
+        byte[] bytes = new byte[5];
+        buffer.get(bytes);
+        assertArrayEquals("hello".getBytes(), bytes);
+    }
+
+    @Test
+    public void testWriteByteBuffer() {
+        testWriteByteBuffer(ByteBuffer.allocate(16));
+    }
+
+    @Test
+    public void testWriteDirectByteBuffer() {
+        testWriteByteBuffer(ByteBuffer.allocateDirect(16));
+    }
+
+    private void testWriteByteBuffer(ByteBuffer input) {
+        long value = 234239230L;
+        input.putLong(value);
+        input.flip();
+
+        ByteBufferOutputStream output = new ByteBufferOutputStream(ByteBuffer.allocate(32));
+        output.write(input);
+        assertEquals(8, output.position());
+        assertEquals(value, output.buffer().getLong(0));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 9271a3f..5f668de 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -31,6 +32,7 @@ import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.toNullableArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(value = Parameterized.class)
@@ -93,6 +95,56 @@ public class MemoryRecordsTest {
     }
 
     @Test
+    public void testFilterToWithUndersizedBuffer() {
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, null, "a".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME,
1L);
+        builder.append(11L, "1".getBytes(), new byte[128]);
+        builder.append(12L, "2".getBytes(), "c".getBytes());
+        builder.append(13L, null, "d".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME,
4L);
+        builder.append(14L, null, "e".getBytes());
+        builder.append(15L, "5".getBytes(), "f".getBytes());
+        builder.append(16L, "6".getBytes(), "g".getBytes());
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME,
7L);
+        builder.append(17L, "7".getBytes(), new byte[128]);
+        builder.close();
+
+        buffer.flip();
+
+        ByteBuffer output = ByteBuffer.allocate(64);
+
+        List<Record> records = new ArrayList<>();
+        while (buffer.hasRemaining()) {
+            output.rewind();
+
+            MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer)
+                    .filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(),
output, Integer.MAX_VALUE);
+
+            buffer.position(buffer.position() + result.bytesRead);
+            result.output.flip();
+
+            if (output != result.output)
+                assertEquals(0, output.position());
+
+            MemoryRecords filtered = MemoryRecords.readableRecords(result.output);
+            records.addAll(TestUtils.toList(filtered.records()));
+        }
+
+        assertEquals(5, records.size());
+        for (Record record : records)
+            assertNotNull(record.key());
+    }
+
+
+    @Test
     public void testFilterTo() {
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
TimestampType.CREATE_TIME, 0L);
@@ -117,7 +169,8 @@ public class MemoryRecordsTest {
         buffer.flip();
 
         ByteBuffer filtered = ByteBuffer.allocate(2048);
-        MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(new
RetainNonNullKeysFilter(), filtered);
+        MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer).filterTo(
+                new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE);
 
         filtered.flip();
 
@@ -192,7 +245,8 @@ public class MemoryRecordsTest {
         buffer.flip();
 
         ByteBuffer filtered = ByteBuffer.allocate(2048);
-        MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+        MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0),
+                new RetainNonNullKeysFilter(), filtered, Integer.MAX_VALUE);
 
         filtered.flip();
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);

http://git-wip-us.apache.org/repos/asf/kafka/blob/386a8d04/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 5f06a73..6c1f13d 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -465,22 +465,23 @@ private[log] class Cleaner(val id: Int,
       source.log.readInto(readBuffer, position)
       val records = MemoryRecords.readableRecords(readBuffer)
       throttler.maybeThrottle(records.sizeInBytes)
-      val result = records.filterTo(logCleanerFilter, writeBuffer)
+      val result = records.filterTo(topicPartition, logCleanerFilter, writeBuffer, maxLogMessageSize)
       stats.readMessages(result.messagesRead, result.bytesRead)
       stats.recopyMessages(result.messagesRetained, result.bytesRetained)
 
       position += result.bytesRead
 
       // if any messages are to be retained, write them out
-      if (writeBuffer.position > 0) {
-        writeBuffer.flip()
-        val retained = MemoryRecords.readableRecords(writeBuffer)
+      val outputBuffer = result.output
+      if (outputBuffer.position > 0) {
+        outputBuffer.flip()
+        val retained = MemoryRecords.readableRecords(outputBuffer)
         dest.append(firstOffset = retained.deepEntries.iterator.next().offset,
           largestOffset = result.maxOffset,
           largestTimestamp = result.maxTimestamp,
           shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
           records = retained)
-        throttler.maybeThrottle(writeBuffer.limit)
+        throttler.maybeThrottle(outputBuffer.limit)
       }
       
       // if we read bytes but didn't get even one complete message, our I/O buffer is too
small, grow it and try again


Mime
View raw message