kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] kafka git commit: KAFKA-5494; Enable idempotence with max.in.flight.requests.per.connection > 1
Date Thu, 14 Sep 2017 23:11:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 8a5e86660 -> 5d2422258


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index bb41380..976bbd7 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -63,7 +63,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     append(stateManager, producerId, epoch, 1, 0L, 1L)
 
     // Duplicate sequence number (matches previous sequence number)
-    assertThrows[DuplicateSequenceNumberException] {
+    assertThrows[DuplicateSequenceException] {
       append(stateManager, producerId, epoch, 1, 0L, 1L)
     }
 
@@ -95,7 +95,8 @@ class ProducerStateManagerTest extends JUnitSuite {
 
     val lastEntry = maybeLastEntry.get
     assertEquals(epoch, lastEntry.producerEpoch)
-    assertEquals(0, lastEntry.firstSeq)
+
+    assertEquals(Int.MaxValue, lastEntry.firstSeq)
     assertEquals(0, lastEntry.lastSeq)
   }
 
@@ -122,7 +123,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertEquals(epoch, lastEntry.producerEpoch)
     assertEquals(sequence, lastEntry.firstSeq)
     assertEquals(sequence, lastEntry.lastSeq)
-    assertEquals(offset, lastEntry.lastOffset)
+    assertEquals(offset, lastEntry.lastDataOffset)
     assertEquals(offset, lastEntry.firstOffset)
   }
 
@@ -158,7 +159,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers
= true,
+    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId),
validateSequenceNumbers = true,
       loadingFromLog = false)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional
= true)
 
@@ -175,7 +176,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 0.toShort
     val offset = 992342L
     val seq = 0
-    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.Empty, validateSequenceNumbers
= true,
+    val producerAppendInfo = new ProducerAppendInfo(producerId, ProducerIdEntry.empty(producerId),
validateSequenceNumbers = true,
       loadingFromLog = false)
     producerAppendInfo.append(producerEpoch, seq, seq, time.milliseconds(), offset, isTransactional
= true)
 
@@ -198,22 +199,22 @@ class ProducerStateManagerTest extends JUnitSuite {
 
     val appendInfo = stateManager.prepareUpdate(producerId, loadingFromLog = false)
     appendInfo.append(producerEpoch, 1, 5, time.milliseconds(), 20L, isTransactional = true)
-    var lastEntry = appendInfo.lastEntry
+    var lastEntry = appendInfo.latestEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
-    assertEquals(1, lastEntry.firstSeq)
+    assertEquals(0, lastEntry.firstSeq)
     assertEquals(5, lastEntry.lastSeq)
-    assertEquals(16L, lastEntry.firstOffset)
-    assertEquals(20L, lastEntry.lastOffset)
+    assertEquals(9L, lastEntry.firstOffset)
+    assertEquals(20L, lastEntry.lastDataOffset)
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
     appendInfo.append(producerEpoch, 6, 10, time.milliseconds(), 30L, isTransactional = true)
-    lastEntry = appendInfo.lastEntry
+    lastEntry = appendInfo.latestEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
-    assertEquals(6, lastEntry.firstSeq)
+    assertEquals(0, lastEntry.firstSeq)
     assertEquals(10, lastEntry.lastSeq)
-    assertEquals(26L, lastEntry.firstOffset)
-    assertEquals(30L, lastEntry.lastOffset)
+    assertEquals(9L, lastEntry.firstOffset)
+    assertEquals(30L, lastEntry.lastDataOffset)
     assertEquals(Some(16L), lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
 
@@ -224,12 +225,13 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertEquals(40L, completedTxn.lastOffset)
     assertFalse(completedTxn.isAborted)
 
-    lastEntry = appendInfo.lastEntry
+    lastEntry = appendInfo.latestEntry
     assertEquals(producerEpoch, lastEntry.producerEpoch)
-    assertEquals(10, lastEntry.firstSeq)
+    // verify that appending the transaction marker doesn't affect the metadata of the cached
record batches.
+    assertEquals(0, lastEntry.firstSeq)
     assertEquals(10, lastEntry.lastSeq)
-    assertEquals(40L, lastEntry.firstOffset)
-    assertEquals(40L, lastEntry.lastOffset)
+    assertEquals(9L, lastEntry.firstOffset)
+    assertEquals(30L, lastEntry.lastDataOffset)
     assertEquals(coordinatorEpoch, lastEntry.coordinatorEpoch)
     assertEquals(None, lastEntry.currentTxnFirstOffset)
     assertEquals(List(new TxnMetadata(producerId, 16L)), appendInfo.startedTransactions)
@@ -421,7 +423,7 @@ class ProducerStateManagerTest extends JUnitSuite {
 
     val maybeEntry = stateManager.lastEntry(anotherPid)
     assertTrue(maybeEntry.isDefined)
-    assertEquals(3L, maybeEntry.get.lastOffset)
+    assertEquals(3L, maybeEntry.get.lastDataOffset)
 
     stateManager.truncateHead(3)
     assertEquals(Set(anotherPid), stateManager.activeProducers.keySet)
@@ -452,7 +454,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val entry = stateManager.lastEntry(pid2)
     assertTrue(entry.isDefined)
     assertEquals(0, entry.get.lastSeq)
-    assertEquals(1L, entry.get.lastOffset)
+    assertEquals(1L, entry.get.lastDataOffset)
   }
 
   @Test
@@ -663,7 +665,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     assertFalse(snapshotToTruncate.exists())
 
     val loadedProducerState = reloadedStateManager.activeProducers(producerId)
-    assertEquals(0L, loadedProducerState.lastOffset)
+    assertEquals(0L, loadedProducerState.lastDataOffset)
   }
 
   private def appendEndTxnMarker(mapping: ProducerStateManager,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index a52c83c..902d1c3 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1379,11 +1379,12 @@ object TestUtils extends Logging {
     records
   }
 
-  def createTransactionalProducer(transactionalId: String, servers: Seq[KafkaServer]) = {
+  def createTransactionalProducer(transactionalId: String, servers: Seq[KafkaServer], batchSize:
Int = 16384) = {
     val props = new Properties()
     props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
-    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5")
     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
     TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries =
Integer.MAX_VALUE, acks = -1, props = Some(props))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d242225/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 3903a3a..0d74645 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -149,6 +149,11 @@ public class TransactionalMessageCopier {
                 "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringSerializer");
+        // We set a small batch size to ensure that we have multiple inflight requests per
transaction.
+        // If it is left at the default, each transaction will have only one batch per partition,
hence not testing
+        // the case with multiple inflights.
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "512");
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
 
         return new KafkaProducer<>(props);
     }
@@ -252,7 +257,6 @@ public class TransactionalMessageCopier {
         maxMessages = Math.min(messagesRemaining(consumer, inputPartition), maxMessages);
         final boolean enableRandomAborts = parsedArgs.getBoolean("enableRandomAborts");
 
-
         producer.initTransactions();
 
         final AtomicBoolean isShuttingDown = new AtomicBoolean(false);


Mime
View raw message