kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [01/11] kafka git commit: KAFKA-4816; Message format changes for idempotent/transactional producer (KIP-98)
Date Fri, 24 Mar 2017 19:43:54 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f615c9e9d -> 5bd06f1d5


http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 5b89bac..f856e52 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -24,7 +24,7 @@ import kafka.log.Log
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords}
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.easymock.EasyMock
 import EasyMock._
@@ -37,7 +37,7 @@ class ReplicaManagerQuotasTest {
   val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_,
new Properties()))
   val time = new MockTime
   val metrics = new Metrics
-  val record = Record.create("some-data-in-a-message".getBytes())
+  val record = new SimpleRecord("some-data-in-a-message".getBytes())
   val topicPartition1 = new TopicPartition("test-topic", 1)
   val topicPartition2 = new TopicPartition("test-topic", 2)
   val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 100), topicPartition2 ->
new PartitionData(0, 100))
@@ -62,10 +62,10 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with only one throttled, we should get the first",
1,
-      fetch.find(_._1 == topicPartition1).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
 
     assertEquals("But we shouldn't get the second", 0,
-      fetch.find(_._1 == topicPartition2).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition2).get._2.info.records.batches.asScala.size)
   }
 
   @Test
@@ -87,9 +87,9 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with both throttled, we should get no messages",
0,
-      fetch.find(_._1 == topicPartition1).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
     assertEquals("Given two partitions, with both throttled, we should get no messages",
0,
-      fetch.find(_._1 == topicPartition2).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition2).get._2.info.records.batches.asScala.size)
   }
 
   @Test
@@ -111,9 +111,9 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages",
1,
-      fetch.find(_._1 == topicPartition1).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages",
1,
-      fetch.find(_._1 == topicPartition2).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition2).get._2.info.records.batches.asScala.size)
   }
 
   @Test
@@ -135,13 +135,13 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with only one throttled, we should get the first",
1,
-      fetch.find(_._1 == topicPartition1).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
 
     assertEquals("But we should get the second too since it's throttled but in sync", 1,
-      fetch.find(_._1 == topicPartition2).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition2).get._2.info.records.batches.asScala.size)
   }
 
-  def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: Record = this.record,
bothReplicasInSync: Boolean = false) {
+  def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord =
this.record, bothReplicasInSync: Boolean = false) {
     val zkUtils = createNiceMock(classOf[ZkUtils])
     val scheduler = createNiceMock(classOf[KafkaScheduler])
 
@@ -154,7 +154,7 @@ class ReplicaManagerQuotasTest {
     expect(log.read(anyObject(), geq(1), anyObject(), anyObject())).andReturn(
       FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(record)
+        MemoryRecords.withRecords(CompressionType.NONE, record)
       )).anyTimes()
 
     //if we ask for len = 0, return 0 messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c481ac4..d48e4f2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -28,7 +28,7 @@ import TestUtils.createBroker
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{MemoryRecords, Record, Records}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -109,7 +109,8 @@ class ReplicaManagerTest {
         timeout = 0,
         requiredAcks = 3,
         internalTopicsAllowed = false,
-        entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(Record.create("first
message".getBytes))),
+        entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(CompressionType.NONE,
+          new SimpleRecord("first message".getBytes))),
         responseCallback = callback)
     } finally {
       rm.shutdown(checkpointHW = false)
@@ -166,7 +167,8 @@ class ReplicaManagerTest {
         timeout = 1000,
         requiredAcks = -1,
         internalTopicsAllowed = false,
-        entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(Record.create("first
message".getBytes()))),
+        entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(CompressionType.NONE,
+          new SimpleRecord("first message".getBytes()))),
         responseCallback = produceCallback)
 
       // Fetch some messages
@@ -230,7 +232,7 @@ class ReplicaManagerTest {
           timeout = 1000,
           requiredAcks = -1,
           internalTopicsAllowed = false,
-          entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(Record.create("message
%d".format(i).getBytes))),
+          entriesPerPartition = Map(new TopicPartition(topic, 0) -> TestUtils.singletonRecords("message
%d".format(i).getBytes)),
           responseCallback = produceCallback)
       
       var fetchCallbackFired = false
@@ -255,7 +257,7 @@ class ReplicaManagerTest {
       
       assertTrue(fetchCallbackFired)
       assertEquals("Should not give an exception", Errors.NONE, fetchError)
-      assertTrue("Should return some data", fetchedRecords.shallowEntries.iterator.hasNext)
+      assertTrue("Should return some data", fetchedRecords.batches.iterator.hasNext)
       fetchCallbackFired = false
       
       // Fetch a message above the high watermark as a consumer

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index cffb04d..f33c73a 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 import kafka.api._
 import kafka.utils._
 import kafka.cluster.Replica
-import kafka.common.TopicAndPartition
 import kafka.log.Log
 import kafka.server.QuotaFactory.UnboundedQuota
 import org.apache.kafka.common.metrics.Metrics
@@ -29,7 +28,7 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{MemoryRecords, Record}
+import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords}
 import org.easymock.EasyMock
 import org.junit.Assert._
 
@@ -53,8 +52,8 @@ class SimpleFetchTest {
   val partitionHW = 5
 
   val fetchSize = 100
-  val messagesToHW = Record.create("messageToHW".getBytes())
-  val messagesToLEO = Record.create("messageToLEO".getBytes())
+  val recordToHW = new SimpleRecord("recordToHW".getBytes())
+  val recordToLEO = new SimpleRecord("recordToLEO".getBytes())
 
   val topic = "test-topic"
   val partitionId = 0
@@ -81,12 +80,12 @@ class SimpleFetchTest {
     EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true)).andReturn(
       FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(messagesToHW)
+        MemoryRecords.withRecords(CompressionType.NONE, recordToHW)
       )).anyTimes()
     EasyMock.expect(log.read(0, fetchSize, None, true)).andReturn(
       FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
-        MemoryRecords.withRecords(messagesToLEO)
+        MemoryRecords.withRecords(CompressionType.NONE, recordToLEO)
       )).anyTimes()
     EasyMock.replay(log)
 
@@ -149,24 +148,30 @@ class SimpleFetchTest {
     val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count()
     val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count()
 
-    assertEquals("Reading committed data should return messages only up to high watermark",
messagesToHW,
-      replicaManager.readFromLocalLog(
-        replicaId = Request.OrdinaryConsumerId,
-        fetchOnlyFromLeader = true,
-        readOnlyCommitted = true,
-        fetchMaxBytes = Int.MaxValue,
-        hardMaxBytesLimit = false,
-        readPartitionInfo = fetchInfo,
-        quota = UnboundedQuota).find(_._1 == topicPartition).get._2.info.records.shallowEntries.iterator.next().record)
-    assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
-      replicaManager.readFromLocalLog(
-        replicaId = Request.OrdinaryConsumerId,
-        fetchOnlyFromLeader = true,
-        readOnlyCommitted = false,
-        fetchMaxBytes = Int.MaxValue,
-        hardMaxBytesLimit = false,
-        readPartitionInfo = fetchInfo,
-        quota = UnboundedQuota).find(_._1 == topicPartition).get._2.info.records.shallowEntries.iterator.next().record)
+    val readCommittedRecords = replicaManager.readFromLocalLog(
+      replicaId = Request.OrdinaryConsumerId,
+      fetchOnlyFromLeader = true,
+      readOnlyCommitted = true,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = fetchInfo,
+      quota = UnboundedQuota).find(_._1 == topicPartition)
+    val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next()
+    assertEquals("Reading committed data should return messages only up to high watermark",
recordToHW,
+      new SimpleRecord(firstReadRecord))
+
+    val readAllRecords = replicaManager.readFromLocalLog(
+      replicaId = Request.OrdinaryConsumerId,
+      fetchOnlyFromLeader = true,
+      readOnlyCommitted = false,
+      fetchMaxBytes = Int.MaxValue,
+      hardMaxBytesLimit = false,
+      readPartitionInfo = fetchInfo,
+      quota = UnboundedQuota).find(_._1 == topicPartition)
+
+    val firstRecord = readAllRecords.get._2.info.records.records.iterator.next()
+    assertEquals("Reading any data can return messages up to the end of the log", recordToLEO,
+      new SimpleRecord(firstRecord))
 
     assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
     assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
index d6a5470..8564f2d 100644
--- a/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/MirrorMakerTest.scala
@@ -18,7 +18,7 @@
 package kafka.tools
 
 import kafka.consumer.BaseConsumerRecord
-import org.apache.kafka.common.record.{Record, TimestampType}
+import org.apache.kafka.common.record.{RecordBatch, TimestampType}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -42,7 +42,7 @@ class MirrorMakerTest {
 
   @Test
   def testDefaultMirrorMakerMessageHandlerWithNoTimestampInSourceMessage() {
-    val consumerRecord = BaseConsumerRecord("topic", 0, 1L, Record.NO_TIMESTAMP, TimestampType.CREATE_TIME,
"key".getBytes, "value".getBytes)
+    val consumerRecord = BaseConsumerRecord("topic", 0, 1L, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME,
"key".getBytes, "value".getBytes)
 
     val result = MirrorMaker.defaultMirrorMakerMessageHandler.handle(consumerRecord)
     assertEquals(1, result.size)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4f6a204..1ffc7c3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -312,10 +312,28 @@ object TestUtils extends Logging {
   def singletonRecords(value: Array[Byte],
                        key: Array[Byte] = null,
                        codec: CompressionType = CompressionType.NONE,
-                       timestamp: Long = Record.NO_TIMESTAMP,
-                       magicValue: Byte = Record.CURRENT_MAGIC_VALUE) = {
-    val record = Record.create(magicValue, timestamp, key, value)
-    MemoryRecords.withRecords(codec, record)
+                       timestamp: Long = RecordBatch.NO_TIMESTAMP,
+                       magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE): MemoryRecords
= {
+    records(Seq(new SimpleRecord(timestamp, key, value)), magicValue = magicValue, codec
= codec)
+  }
+
+  def recordsWithValues(magicValue: Byte,
+                        codec: CompressionType,
+                        values: Array[Byte]*): MemoryRecords = {
+    records(values.map(value => new SimpleRecord(value)), magicValue, codec)
+  }
+
+  def records(records: Iterable[SimpleRecord],
+              magicValue: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
+              codec: CompressionType = CompressionType.NONE,
+              pid: Long = RecordBatch.NO_PRODUCER_ID,
+              epoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
+              sequence: Int = RecordBatch.NO_SEQUENCE): MemoryRecords = {
+    val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava))
+    val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME,
0L,
+      System.currentTimeMillis, pid, epoch, sequence)
+    records.foreach(builder.append)
+    builder.build()
   }
 
   /**
@@ -1022,7 +1040,7 @@ object TestUtils extends Logging {
           var i = 0
           while ((shouldGetAllMessages && iterator.hasNext()) || (i < nMessagesPerThread))
{
             assertTrue(iterator.hasNext)
-            val message = iterator.next.message // will throw a timeout exception if the
message isn't there
+            val message = iterator.next().message // will throw a timeout exception if the
message isn't there
             messages ::= message
             debug("received message: " + message)
             i += 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 2a6c1b7..ec29da7 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,14 +19,14 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
-<h4><a id="upgrade_10_3_0" href="#upgrade_10_3_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x or 0.10.2.x to 0.10.3.0</a></h4>
-<p>0.10.2.0 has wire protocol changes. By following the recommended rolling upgrade
plan below, you guarantee no downtime during the upgrade.
-    However, please review the <a href="#upgrade_1030_notable">notable changes in 0.10.3.0</a>
before upgrading.
+<h4><a id="upgrade_11_0_0" href="#upgrade_11_0_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x, 0.10.1.x or 0.10.2.x to 0.11.0.0</a></h4>
+<p>0.11.0.0 has wire protocol changes. By following the recommended rolling upgrade
plan below, you guarantee no downtime during the upgrade.
+    However, please review the <a href="#upgrade_1100_notable">notable changes in 0.11.0.0</a>
before upgrading.
 </p>
 
-<p>Starting with version 0.10.2, Java clients (producer and consumer) have acquired
the ability to communicate with older brokers. Version 0.10.3
+<p>Starting with version 0.10.2, Java clients (producer and consumer) have acquired
the ability to communicate with older brokers. Version 0.11.0
     clients can talk to version 0.10.0 or newer brokers. However, if your brokers are older
than 0.10.0, you must upgrade all the brokers in the
-    Kafka cluster before upgrading your clients. Version 0.10.3 brokers support 0.8.x and
newer clients.
+    Kafka cluster before upgrading your clients. Version 0.11.0 brokers support 0.8.x and
newer clients.
 </p>
 
 <p><b>For a rolling upgrade:</b></p>
@@ -39,22 +39,30 @@
         </ul>
     </li>
     <li> Upgrade the brokers one at a time: shut down the broker, update the code,
and restart it. </li>
-    <li> Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version and setting it to 0.10.3. </li>
-    <li> If your previous message format is 0.10.0, change log.message.format.version
to 0.10.3 (this is a no-op as the message format is the same for 0.10.0, 0.10.1, 0.10.2 and
0.10.3).
+    <li> Once the entire cluster is upgraded, bump the protocol version by editing
inter.broker.protocol.version and setting it to 0.11.0. </li>
+    <li> If your previous message format is 0.10.0, change log.message.format.version
to 0.11.0 (this is a no-op as the message format is the same for 0.10.0, 0.10.1, 0.10.2 and
0.11.0).
         If your previous message format version is lower than 0.10.0, do not change log.message.format.version
yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0
or later.</li>
     <li> Restart the brokers one by one for the new protocol version to take effect.
</li>
     <li> If log.message.format.version is still lower than 0.10.0 at this point, wait
until all consumers have been upgraded to 0.10.0 or later,
-        then change log.message.format.version to 0.10.3 on each broker and restart them
one by one. </li>
+        then change log.message.format.version to 0.11.0 on each broker and restart them
one by one. </li>
 </ol>
 
 <p><b>Note:</b> If you are willing to accept downtime, you can simply take
all the brokers down, update the code and start all of them. They will start with the new
protocol by default.
 
 <p><b>Note:</b> Bumping the protocol version and restarting can be done
any time after the brokers were upgraded. It does not have to be immediately after.
 
-<h5><a id="upgrade_1030_notable" href="#upgrade_1030_notable">Notable changes
in 0.10.3.0</a></h5>
+<h5><a id="upgrade_1100_notable" href="#upgrade_1100_notable">Notable changes
in 0.11.0.0</a></h5>
 <ul>
-    <li>The <code>offsets.topic.replication.factor</code> broker config
is now enforced upon auto topic creation. Internal auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE
error until the cluster size meets this replication factor requirement.</li>
-    <li>By default <code>message.timestamp.difference.max.ms</code> is
the same as <code>retention.ms</code> instead of <code>Long.MAX_VALUE</code>.</li>
+    <li>The <code>offsets.topic.replication.factor</code> broker config
is now enforced upon auto topic creation. Internal
+        auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until
the cluster size meets this
+        replication factor requirement.</li>
+    <li>By default <code>message.timestamp.difference.max.ms</code> is
the same as <code>retention.ms</code> instead of
+        <code>Long.MAX_VALUE</code>.</li>
+    <li>The broker configuration <code>max.message.bytes</code> now applies
to the total size of a batch of messages.
+        Previously the setting applied to batches of compressed messages, or to non-compressed
messages individually. In practice,
+        the change is minor since a message batch may consist of only a single message, so
the limitation on the size of
+        individual messages is only reduced by the overhead of the batch format. This similarly
affects the
+        producer's <code>batch.size</code> configuration.</li>
 </ul>
 
 <h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x,
0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index c493d74..3217e46 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
@@ -128,7 +129,8 @@ public class StreamsKafkaClient {
             streamsConfig.getInt(StreamsConfig.RECEIVE_BUFFER_CONFIG),
             streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG),
             time,
-            true);
+            true,
+            new ApiVersions());
     }
 
     public void close() throws IOException {


Mime
View raw message