kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-7467; NoSuchElementException is raised because controlBatch is empty (#5727)
Date Fri, 05 Oct 2018 19:34:47 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 8ef64b7  KAFKA-7467; NoSuchElementException is raised because controlBatch is empty
(#5727)
8ef64b7 is described below

commit 8ef64b7619e06bcb26b52bdd2981688e69e72d49
Author: Bob Barrett <bob.barrett@outlook.com>
AuthorDate: Fri Oct 5 11:58:59 2018 -0700

    KAFKA-7467; NoSuchElementException is raised because controlBatch is empty (#5727)
    
    This patch adds checks before reading the first record of a control batch. If the batch
is empty, it is treated as having already been cleaned. In the case of LogCleaner this means
it is safe to discard. In the case of ProducerStateManager it means it shouldn't cause state
to be stored because the relevant transaction has already been cleaned. In the case of Fetcher,
it just preempts the check for an abort. In the case of GroupMetadataManager, it doesn't process
the offset as a commit [...]
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  3 +-
 .../clients/consumer/internals/FetcherTest.java    | 58 ++++++++++++++++++++++
 .../coordinator/group/GroupMetadataManager.scala   | 23 +++++----
 core/src/main/scala/kafka/log/LogCleaner.scala     | 38 ++++++++------
 .../scala/kafka/log/ProducerStateManager.scala     | 14 ++++--
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  3 +-
 .../group/GroupMetadataManagerTest.scala           | 56 +++++++++++++++++++++
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 31 ++++++++++++
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 32 +++++++++++-
 9 files changed, 224 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4c68f1f..efcc5d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1156,8 +1156,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
 
             Iterator<Record> batchIterator = batch.iterator();
             if (!batchIterator.hasNext())
-                throw new InvalidRecordException("Invalid batch for partition " + partition
+ " at offset " +
-                        batch.baseOffset() + " with control sequence set, but no records");
+                return false;
 
             Record firstRecord = batchIterator.next();
             return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index cab385c..789c0fa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1898,6 +1898,64 @@ public class FetcherTest {
         assertEquals(currentOffset, (long) subscriptions.position(tp0));
     }
 
+    @Test
+    public void testEmptyControlBatch() {
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(),
new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        int currentOffset = 1;
+
+        // Empty control batch should not cause an exception
+        DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, 1L,
+                (short) 0, -1, 0, 0,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH, TimestampType.CREATE_TIME, time.milliseconds(),
+                true, true);
+
+        currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
+                new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
+
+        commitTransaction(buffer, 1L, currentOffset);
+        buffer.flip();
+
+        List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        subscriptions.assignFromUser(singleton(tp0));
+
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                FetchRequest request = (FetchRequest) body;
+                assertEquals(IsolationLevel.READ_COMMITTED, request.isolationLevel());
+                return true;
+            }
+        }, fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE,
100L, 100L, 0));
+
+        consumerClient.poll(0);
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords
= fetcher.fetchedRecords();
+        assertTrue(fetchedRecords.containsKey(tp0));
+        assertEquals(fetchedRecords.get(tp0).size(), 2);
+    }
+
+    private FetchResponse fullFetchResponseWithAbortedTransactions(MemoryRecords records,
+                                                                   List<FetchResponse.AbortedTransaction>
abortedTransactions,
+                                                                   Errors error,
+                                                                   long lastStableOffset,
+                                                                   long hw,
+                                                                   int throttleTime) {
+
+        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> partitions = new
LinkedHashMap<>();
+        partitions.put(tp0, new FetchResponse.PartitionData(error, hw, lastStableOffset,
0L, abortedTransactions, records));
+        return new FetchResponse(partitions, throttleTime);
+    }
+
     private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset,
int baseSequence, SimpleRecord... records) {
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
CompressionType.NONE,
                 TimestampType.CREATE_TIME, baseOffset, time.milliseconds(), pid, (short)
0, baseSequence, true,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index e22623f..d6fdc8e 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -498,17 +498,20 @@ class GroupMetadataManager(brokerId: Int,
           memRecords.batches.asScala.foreach { batch =>
             val isTxnOffsetCommit = batch.isTransactional
             if (batch.isControlBatch) {
-              val record = batch.iterator.next()
-              val controlRecord = ControlRecordType.parse(record.key)
-              if (controlRecord == ControlRecordType.COMMIT) {
-                pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition,
CommitRecordMetadataAndOffset]())
-                  .foreach {
-                    case (groupTopicPartition, commitRecordMetadataAndOffset) =>
-                      if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
-                        loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset)
-                  }
+              val recordIterator = batch.iterator
+              if (recordIterator.hasNext) {
+                val record = recordIterator.next()
+                val controlRecord = ControlRecordType.parse(record.key)
+                if (controlRecord == ControlRecordType.COMMIT) {
+                  pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition,
CommitRecordMetadataAndOffset]())
+                    .foreach {
+                      case (groupTopicPartition, commitRecordMetadataAndOffset) =>
+                        if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
+                          loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset)
+                    }
+                }
+                pendingOffsets.remove(batch.producerId)
               }
-              pendingOffsets.remove(batch.producerId)
             } else {
               var batchBaseOffset: Option[Long] = None
               for (record <- batch.asScala) {
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 88bf4a7..29c3af9 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -890,24 +890,30 @@ private[log] class CleanedTransactionMetadata(val abortedTransactions:
mutable.P
   def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
     consumeAbortedTxnsUpTo(controlBatch.lastOffset)
 
-    val controlRecord = controlBatch.iterator.next()
-    val controlType = ControlRecordType.parse(controlRecord.key)
-    val producerId = controlBatch.producerId
-    controlType match {
-      case ControlRecordType.ABORT =>
-        ongoingAbortedTxns.remove(producerId) match {
-          // Retain the marker until all batches from the transaction have been removed
-          case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined
=>
-            transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
-            false
-          case _ => true
-        }
+    val controlRecordIterator = controlBatch.iterator
+    if (controlRecordIterator.hasNext) {
+      val controlRecord = controlRecordIterator.next()
+      val controlType = ControlRecordType.parse(controlRecord.key)
+      val producerId = controlBatch.producerId
+      controlType match {
+        case ControlRecordType.ABORT =>
+          ongoingAbortedTxns.remove(producerId) match {
+            // Retain the marker until all batches from the transaction have been removed
+            case Some(abortedTxnMetadata) if abortedTxnMetadata.lastObservedBatchOffset.isDefined
=>
+              transactionIndex.foreach(_.append(abortedTxnMetadata.abortedTxn))
+              false
+            case _ => true
+          }
 
-      case ControlRecordType.COMMIT =>
-        // This marker is eligible for deletion if we didn't traverse any batches from the
transaction
-        !ongoingCommittedTxns.remove(producerId)
+        case ControlRecordType.COMMIT =>
+          // This marker is eligible for deletion if we didn't traverse any batches from
the transaction
+          !ongoingCommittedTxns.remove(producerId)
 
-      case _ => false
+        case _ => false
+      }
+    } else {
+      // An empty control batch was already cleaned, so it's safe to discard
+      true
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index d4ce104..642b24d 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -250,10 +250,16 @@ private[log] class ProducerAppendInfo(val producerId: Long,
 
   def append(batch: RecordBatch): Option[CompletedTxn] = {
     if (batch.isControlBatch) {
-      val record = batch.iterator.next()
-      val endTxnMarker = EndTransactionMarker.deserialize(record)
-      val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset,
record.timestamp)
-      Some(completedTxn)
+      val recordIterator = batch.iterator
+      if (recordIterator.hasNext) {
+        val record = recordIterator.next()
+        val endTxnMarker = EndTransactionMarker.deserialize(record)
+        val completedTxn = appendEndTxnMarker(endTxnMarker, batch.producerEpoch, batch.baseOffset,
record.timestamp)
+        Some(completedTxn)
+      } else {
+        // An empty control batch means the entire transaction has been cleaned from the
log, so no need to append
+        None
+      }
     } else {
       append(batch.producerEpoch, batch.baseSequence, batch.lastSequence, batch.maxTimestamp,
batch.lastOffset,
         batch.isTransactional)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 3261906..77ad14e 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -422,7 +422,8 @@ object DumpLogSegments {
           print("baseOffset: " + batch.baseOffset + " lastOffset: " + batch.lastOffset +
             " baseSequence: " + batch.baseSequence + " lastSequence: " + batch.lastSequence
+
             " producerId: " + batch.producerId + " producerEpoch: " + batch.producerEpoch
+
-            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: "
+ batch.isTransactional)
+            " partitionLeaderEpoch: " + batch.partitionLeaderEpoch + " isTransactional: "
+ batch.isTransactional +
+            " isControl: " + batch.isControlBatch)
         else
           print("offset: " + batch.lastOffset)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 3bdffbd..3b98e2a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -33,6 +33,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue, assertNull}
 import org.junit.{Before, Test}
 import java.nio.ByteBuffer
+import java.util.Collections
 
 import org.apache.kafka.common.internals.Topic
 
@@ -1389,6 +1390,61 @@ class GroupMetadataManagerTest {
     EasyMock.verify(replicaManager)
   }
 
+  @Test
+  def testLoadOffsetsWithEmptyControlBatch() {
+    val groupMetadataTopicPartition = groupTopicPartition
+    val startOffset = 15L
+    val generation = 15
+
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+    // Prepend empty control batch to valid records
+    val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch])
+    EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record])
+    EasyMock.expect(mockBatch.isControlBatch).andReturn(true)
+    EasyMock.expect(mockBatch.isTransactional).andReturn(true)
+    EasyMock.expect(mockBatch.nextOffset).andReturn(16L)
+    EasyMock.replay(mockBatch)
+    val mockRecords = EasyMock.createMock(classOf[MemoryRecords])
+    EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch)
++ records.batches.asScala).asJava).anyTimes()
+    EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes()
+    EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD
+ records.sizeInBytes()).anyTimes()
+    EasyMock.replay(mockRecords)
+
+    val logMock = EasyMock.mock(classOf[Log])
+    EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
+      EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
+      .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), mockRecords))
+    EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
+    EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(18))
+    EasyMock.replay(logMock)
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    // Empty control batch should not have caused the load to fail
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded
into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(generation, group.generationId)
+    assertEquals(Some(protocolType), group.protocolType)
+    assertNull(group.leaderOrNull)
+    assertNull(group.protocolOrNull)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
   private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse]
=> Unit] = {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 2f9d8d1..ab7901a 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -381,6 +381,37 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testCleanEmptyControlBatch(): Unit = {
+    val tp = new TopicPartition("test", 0)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val producerEpoch = 0.toShort
+
+    // [{Producer1: Commit}, {2}, {3}]
+    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, isFromClient = false)
// offset 7
+    log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 2
+    log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 3
+    log.roll()
+
+    // first time through the control batch is retained as an empty batch
+    // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+    var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
+    assertEquals(List(2, 3), keysInLog(log))
+    assertEquals(List(1, 2), offsetsInLog(log))
+    assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+
+    // the empty control batch does not cause an exception when cleaned
+    // Expected State: [{Producer1: EmptyBatch}], [{2}, {3}]
+    dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs
= Long.MaxValue)._1
+    assertEquals(List(2, 3), keysInLog(log))
+    assertEquals(List(1, 2), offsetsInLog(log))
+    assertEquals(List(0, 1, 2), lastOffsetsPerBatchInLog(log))
+  }
+
+  @Test
   def testAbortMarkerRemoval(): Unit = {
     val tp = new TopicPartition("test", 0)
     val cleaner = makeCleaner(Int.MaxValue)
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 053aed7..a4ffb46 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -21,14 +21,16 @@ import java.io.File
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.StandardOpenOption
+import java.util.Collections
 
 import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, RecordBatch}
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.{MockTime, Utils}
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -717,6 +719,22 @@ class ProducerStateManagerTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testAppendEmptyControlBatch(): Unit = {
+    val producerId = 23423L
+    val producerEpoch = 145.toShort
+    val baseOffset = 15
+
+    val batch = EasyMock.createMock(classOf[RecordBatch])
+    EasyMock.expect(batch.isControlBatch).andReturn(true).once
+    EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once
+    EasyMock.replay(batch)
+
+    // Appending the empty control batch should not throw and a new transaction shouldn't
be started
+    append(stateManager, producerId, producerEpoch, baseOffset, batch, isFromClient = true)
+    assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
+  }
+
   private def testLoadFromCorruptSnapshot(makeFileCorrupt: FileChannel => Unit): Unit
= {
     val epoch = 0.toShort
     val producerId = 1L
@@ -777,6 +795,18 @@ class ProducerStateManagerTest extends JUnitSuite {
     stateManager.updateMapEndOffset(offset + 1)
   }
 
+  private def append(stateManager: ProducerStateManager,
+                     producerId: Long,
+                     producerEpoch: Short,
+                     offset: Long,
+                     batch: RecordBatch,
+                     isFromClient : Boolean): Unit = {
+    val producerAppendInfo = stateManager.prepareUpdate(producerId, isFromClient)
+    producerAppendInfo.append(batch)
+    stateManager.update(producerAppendInfo)
+    stateManager.updateMapEndOffset(offset + 1)
+  }
+
   private def currentSnapshotOffsets =
     logDir.listFiles.map(Log.offsetFromFile).toSet
 


Mime
View raw message