kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-4761; Fix producer regression handling small or zero batch size
Date Mon, 13 Feb 2017 22:08:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ed91af512 -> 3b36d5cff


KAFKA-4761; Fix producer regression handling small or zero batch size

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva.1618@gmail.com>, Vahid Hashemian <vahidhashemian@us.ibm.com>,
Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>

Closes #2545 from hachikuji/KAFKA-4761


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b36d5cf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b36d5cf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b36d5cf

Branch: refs/heads/trunk
Commit: 3b36d5cff0b7a51e737a97144d6d479af708b2d7
Parents: ed91af5
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Feb 13 14:07:19 2017 -0800
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Feb 13 14:07:19 2017 -0800

----------------------------------------------------------------------
 .../common/record/MemoryRecordsBuilder.java     | 18 +++++++++++-
 .../kafka/common/record/FileRecordsTest.java    |  4 +--
 .../common/record/MemoryRecordsBuilderTest.java | 29 ++++++++++++++++++
 .../kafka/common/record/MemoryRecordsTest.java  |  6 ++--
 .../java/org/apache/kafka/test/TestUtils.java   | 26 +++++-----------
 .../kafka/api/BaseProducerSendTest.scala        | 31 ++++++++++++++++++--
 .../kafka/api/PlaintextProducerSendTest.scala   |  8 +++++
 .../message/BaseMessageSetTestCases.scala       |  4 +--
 8 files changed, 96 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3b36d5cf/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 0260f17..a17c0e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -105,6 +105,20 @@ public class MemoryRecordsBuilder {
 
     private MemoryRecords builtRecords;
 
+    /**
+     * Construct a new builder.
+     *
+     * @param buffer The underlying buffer to use (note that this class will allocate a new
buffer if necessary
+     *               to fit the records appended)
+     * @param magic The magic value to use
+     * @param compressionType The compression codec to use
+     * @param timestampType The desired timestamp type. For magic > 0, this cannot be
{@link TimestampType#NO_TIMESTAMP_TYPE}.
+     * @param baseOffset The initial offset to use for
+     * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP
if CREATE_TIME is used.
+     * @param writeLimit The desired limit on the total bytes for this record set (note that
this can be exceeded
+     *                   when compression is used since size estimates are rough, and in
the case that the first
+     *                   record added exceeds the size).
+     */
     public MemoryRecordsBuilder(ByteBuffer buffer,
                                 byte magic,
                                 CompressionType compressionType,
@@ -376,7 +390,9 @@ public class MemoryRecordsBuilder {
     }
 
     public boolean isFull() {
-        return isClosed() || this.writeLimit <= estimatedBytesWritten();
+        // note that the write limit is respected only after the first record is added which
ensures we can always
+        // create non-empty batches (this is used to disable batching when the producer's
batch size is set to 0).
+        return isClosed() || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten());
     }
 
     public int sizeInBytes() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b36d5cf/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index d0ab427..d7df27d 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -381,11 +381,11 @@ public class FileRecordsTest {
     }
 
     private static List<LogEntry> shallowEntries(Records buffer) {
-        return TestUtils.toList(buffer.shallowEntries().iterator());
+        return TestUtils.toList(buffer.shallowEntries());
     }
 
     private static List<LogEntry> deepEntries(Records buffer) {
-        return TestUtils.toList(buffer.deepEntries().iterator());
+        return TestUtils.toList(buffer.deepEntries());
     }
 
     private FileRecords createFileRecords(Record ... records) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b36d5cf/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 034faf6..02ee75e 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,6 +16,7 @@
  **/
 package org.apache.kafka.common.record;
 
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -28,6 +29,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(value = Parameterized.class)
 public class MemoryRecordsBuilderTest {
@@ -178,6 +180,33 @@ public class MemoryRecordsBuilderTest {
     }
 
     @Test
+    public void testSmallWriteLimit() {
+        // with a small write limit, we always allow at least one record to be added
+
+        byte[] key = "foo".getBytes();
+        byte[] value = "bar".getBytes();
+        int writeLimit = 0;
+        ByteBuffer buffer = ByteBuffer.allocate(512);
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE,
compressionType,
+                TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, writeLimit);
+
+        assertFalse(builder.isFull());
+        assertTrue(builder.hasRoomFor(key, value));
+        builder.append(0L, key, value);
+
+        assertTrue(builder.isFull());
+        assertFalse(builder.hasRoomFor(key, value));
+
+        MemoryRecords memRecords = builder.build();
+        List<Record> records = TestUtils.toList(memRecords.records());
+        assertEquals(1, records.size());
+
+        Record record = records.get(0);
+        assertEquals(ByteBuffer.wrap(key), record.key());
+        assertEquals(ByteBuffer.wrap(value), record.value());
+    }
+
+    @Test
     public void writePastLimit() {
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.position(bufferOffset);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b36d5cf/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 9c8ca7f..9271a3f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -135,7 +135,7 @@ public class MemoryRecordsTest {
 
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
-        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator());
+        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
         List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L,
4L, 5L, 6L) : asList(1L, 5L, 6L);
         assertEquals(expectedOffsets.size(), shallowEntries.size());
 
@@ -148,7 +148,7 @@ public class MemoryRecordsTest {
                     shallowEntry.record().timestampType());
         }
 
-        List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries().iterator());
+        List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries());
         assertEquals(4, deepEntries.size());
 
         LogEntry first = deepEntries.get(0);
@@ -197,7 +197,7 @@ public class MemoryRecordsTest {
         filtered.flip();
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
-        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator());
+        List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
         assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size());
 
         for (LogEntry shallowEntry : shallowEntries) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b36d5cf/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index c39f402..0cb32be 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -31,7 +31,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -43,7 +42,6 @@ import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -291,27 +289,17 @@ public class TestUtils {
     }
 
     /**
-     * Throw an exception if the two iterators are of differing lengths or contain
-     * different messages on their Nth element
-     */
-    public static <T> void checkEquals(Iterator<T> s1, Iterator<T> s2)
{
-        while (s1.hasNext() && s2.hasNext())
-            assertEquals(s1.next(), s2.next());
-        assertFalse("Iterators have uneven length--first has more", s1.hasNext());
-        assertFalse("Iterators have uneven length--second has more", s2.hasNext());
-    }
-
-    /**
      * Checks the two iterables for equality by first converting both to a list.
      */
     public static <T> void checkEquals(Iterable<T> it1, Iterable<T> it2)
{
-        assertEquals(toList(it1.iterator()), toList(it2.iterator()));
+        assertEquals(toList(it1), toList(it2));
     }
 
-    public static <T> List<T> toList(Iterator<? extends T> iterator) {
-        List<T> res = new ArrayList<>();
-        while (iterator.hasNext())
-            res.add(iterator.next());
-        return res;
+    public static <T> List<T> toList(Iterable<? extends T> iterable) {
+        List<T> list = new ArrayList<>();
+        for (T item : iterable)
+            list.add(item);
+        return list;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b36d5cf/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index ad61a37..da3f651 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -96,7 +96,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   @Test
   def testSendOffset() {
     val producer = createProducer(brokerList)
-    val partition = new Integer(0)
+    val partition = 0
 
     object callback extends Callback {
       var offset = 0L
@@ -175,8 +175,33 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
+  protected def sendAndVerify(producer: KafkaProducer[Array[Byte], Array[Byte]],
+                              numRecords: Int = numRecords,
+                              timeoutMs: Long = 20000L) {
+    val partition = 0
+    try {
+      TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
+
+      val recordAndFutures = for (i <- 1 to numRecords) yield {
+        val record = new ProducerRecord(topic, partition, s"key$i".getBytes, s"value$i".getBytes)
+        (record, producer.send(record))
+      }
+      producer.close(timeoutMs, TimeUnit.MILLISECONDS)
+      val lastOffset = recordAndFutures.foldLeft(0) { case (offset, (record, future)) =>
+        val recordMetadata = future.get
+        assertEquals(topic, recordMetadata.topic)
+        assertEquals(partition, recordMetadata.partition)
+        assertEquals(offset, recordMetadata.offset)
+        offset + 1
+      }
+      assertEquals(numRecords, lastOffset)
+    } finally {
+      producer.close()
+    }
+  }
+
   protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]],
timestampType: TimestampType) {
-    val partition = new Integer(0)
+    val partition = 0
 
     val baseTimestamp = 123456L
     val startTime = System.currentTimeMillis()
@@ -212,7 +237,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
 
       val recordAndFutures = for (i <- 1 to numRecords) yield {
-        val record = new ProducerRecord(topic, partition, baseTimestamp + i, "key".getBytes,
"value".getBytes)
+        val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes,
s"value$i".getBytes)
         (record, producer.send(record, callback))
       }
       producer.close(20000L, TimeUnit.MILLISECONDS)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b36d5cf/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 956fe61..10063a9 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -42,6 +42,14 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
   }
 
   @Test
+  def testBatchSizeZero() {
+    val producerProps = new Properties()
+    producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "0")
+    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props
= Some(producerProps))
+    sendAndVerify(producer)
+  }
+
+  @Test
   def testSendCompressedMessageWithLogAppendTime() {
     val producerProps = new Properties()
     producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b36d5cf/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
index 3327a65..a53602d 100644
--- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -40,14 +40,14 @@ trait BaseMessageSetTestCases extends JUnitSuite {
   def testIteratorIsConsistent() {
     val m = createMessageSet(messages)
     // two iterators over the same set should give the same results
-    TestUtils.checkEquals(m.iterator, m.iterator)
+    TestUtils.checkEquals(m, m)
   }
 
   @Test
   def testIteratorIsConsistentWithCompression() {
     val m = createMessageSet(messages, DefaultCompressionCodec)
     // two iterators over the same set should give the same results
-    TestUtils.checkEquals(m.iterator, m.iterator)
+    TestUtils.checkEquals(m, m)
   }
 
   @Test


Mime
View raw message