kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9293; Fix NPE in DumpLogSegments offsets parser and display tombstone keys (#7820)
Date Fri, 03 Jan 2020 21:13:59 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 0e1c00f  KAFKA-9293; Fix NPE in DumpLogSegments offsets parser and display tombstone
keys (#7820)
0e1c00f is described below

commit 0e1c00fb1ade36143c166931a515111913ff6b81
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Fri Jan 3 13:04:10 2020 -0800

    KAFKA-9293; Fix NPE in DumpLogSegments offsets parser and display tombstone keys (#7820)
    
    Fixes an NPE when UserData in a member's subscription is null and adds similar checks
for transaction log parser. Also modifies the output logic so that we show the keys of tombstones
for both group and transaction metadata.
    
    Reviewers: David Arthur <mumrah@gmail.com>
---
 .../coordinator/group/GroupMetadataManager.scala   | 100 +++++++++++++++---
 .../coordinator/transaction/TransactionLog.scala   |  53 +++++++---
 .../transaction/TransactionStateManager.scala      |  10 +-
 .../main/scala/kafka/tools/DumpLogSegments.scala   | 112 ++++-----------------
 .../coordinator/group/GroupCoordinatorTest.scala   |   2 +-
 .../group/GroupMetadataManagerTest.scala           |  72 +++++++++++--
 .../transaction/TransactionLogTest.scala           |  40 +++++++-
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     |   2 +-
 8 files changed, 252 insertions(+), 139 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index a143669..a952164 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -34,18 +34,18 @@ import kafka.utils.CoreUtils.inLock
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.metrics.stats.Meter
-import org.apache.kafka.common.metrics.stats.{Avg, Max}
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.stats.{Avg, Max, Meter}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.Type._
 import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
 import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.collection._
@@ -1146,8 +1146,7 @@ object GroupMetadataManager {
    *
    * @return key for offset commit message
    */
-  private[group] def offsetCommitKey(group: String,
-                                     topicPartition: TopicPartition): Array[Byte] = {
+  def offsetCommitKey(group: String, topicPartition: TopicPartition): Array[Byte] = {
     val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
     key.set(OFFSET_KEY_GROUP_FIELD, group)
     key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
@@ -1164,7 +1163,7 @@ object GroupMetadataManager {
    *
    * @return key bytes for group metadata message
    */
-  private[group] def groupMetadataKey(group: String): Array[Byte] = {
+  def groupMetadataKey(group: String): Array[Byte] = {
     val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
     key.set(GROUP_KEY_GROUP_FIELD, group)
 
@@ -1181,8 +1180,8 @@ object GroupMetadataManager {
    * @param apiVersion the api version
    * @return payload for offset commit message
    */
-  private[group] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
-                                       apiVersion: ApiVersion): Array[Byte] = {
+  def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
+                        apiVersion: ApiVersion): Array[Byte] = {
     // generate commit value according to schema version
     val (version, value) = {
       if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) {
@@ -1226,9 +1225,9 @@ object GroupMetadataManager {
    * @param apiVersion the api version
    * @return payload for offset commit message
    */
-  private[group] def groupMetadataValue(groupMetadata: GroupMetadata,
-                                        assignment: Map[String, Array[Byte]],
-                                        apiVersion: ApiVersion): Array[Byte] = {
+  def groupMetadataValue(groupMetadata: GroupMetadata,
+                         assignment: Map[String, Array[Byte]],
+                         apiVersion: ApiVersion): Array[Byte] = {
 
     val (version, value) = {
       if (apiVersion < KAFKA_0_10_1_IV0)
@@ -1467,6 +1466,83 @@ object GroupMetadataManager {
     }
   }
 
+  /**
+   * Exposed for printing records using [[kafka.tools.DumpLogSegments]]
+   */
+  def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = {
+    if (!record.hasKey) {
+      throw new KafkaException("Failed to decode message using offset topic decoder (message
had a missing key)")
+    } else {
+      GroupMetadataManager.readMessageKey(record.key) match {
+        case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value)
+        case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey,
record.value)
+        case _ => throw new KafkaException("Failed to decode message using offset topic
decoder (message had an invalid key)")
+      }
+    }
+  }
+
+  private def parseOffsets(offsetKey: OffsetKey, payload: ByteBuffer): (Option[String], Option[String])
= {
+    val groupId = offsetKey.key.group
+    val topicPartition = offsetKey.key.topicPartition
+    val keyString = s"offset_commit::group=$groupId,partition=$topicPartition"
+
+    val offset = GroupMetadataManager.readOffsetMessageValue(payload)
+    val valueString = if (offset == null) {
+      "<DELETE>"
+    } else {
+      if (offset.metadata.isEmpty)
+        s"offset=${offset.offset}"
+      else
+        s"offset=${offset.offset},metadata=${offset.metadata}"
+    }
+
+    (Some(keyString), Some(valueString))
+  }
+
+  private def parseGroupMetadata(groupMetadataKey: GroupMetadataKey, payload: ByteBuffer):
(Option[String], Option[String]) = {
+    val groupId = groupMetadataKey.key
+    val keyString = s"group_metadata::group=$groupId"
+
+    val group = GroupMetadataManager.readGroupMessageValue(groupId, payload, Time.SYSTEM)
+    val valueString = if (group == null)
+      "<DELETE>"
+    else {
+      val protocolType = group.protocolType.getOrElse("")
+
+      val assignment = group.allMemberMetadata.map { member =>
+        if (protocolType == ConsumerProtocol.PROTOCOL_TYPE) {
+          val partitionAssignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
+          val userData = Option(partitionAssignment.userData)
+            .map(Utils.toArray)
+            .map(hex)
+            .getOrElse("")
+
+          if (userData.isEmpty)
+            s"${member.memberId}=${partitionAssignment.partitions()}"
+          else
+            s"${member.memberId}=${partitionAssignment.partitions()}:$userData"
+        } else {
+          s"${member.memberId}=${hex(member.assignment)}"
+        }
+      }.mkString("{", ",", "}")
+
+      Json.encodeAsString(Map(
+        "protocolType" -> protocolType,
+        "protocol" -> group.protocolOrNull,
+        "generationId" -> group.generationId,
+        "assignment" -> assignment
+      ).asJava)
+    }
+    (Some(keyString), Some(valueString))
+  }
+
+  private def hex(bytes: Array[Byte]): String = {
+    if (bytes.isEmpty)
+      ""
+    else
+      "%X".format(BigInt(1, bytes))
+  }
+
 }
 
 case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index ef5b59f..7c7d055 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -16,16 +16,16 @@
  */
 package kafka.coordinator.transaction
 
-import kafka.common.MessageFormatter
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.protocol.types.Type._
-import org.apache.kafka.common.protocol.types._
 import java.io.PrintStream
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
 
-import org.apache.kafka.common.record.CompressionType
+import kafka.common.MessageFormatter
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.protocol.types.Type._
+import org.apache.kafka.common.protocol.types._
+import org.apache.kafka.common.record.{CompressionType, Record}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 
 import scala.collection.mutable
 
@@ -130,7 +130,7 @@ object TransactionLog {
     *
     * @return key bytes
     */
-  private[coordinator] def keyToBytes(transactionalId: String): Array[Byte] = {
+  private[transaction] def keyToBytes(transactionalId: String): Array[Byte] = {
     import KeySchema._
     val key = new Struct(CURRENT)
     key.set(TXN_ID_FIELD, transactionalId)
@@ -146,7 +146,7 @@ object TransactionLog {
     *
     * @return value payload bytes
     */
-  private[coordinator] def valueToBytes(txnMetadata: TxnTransitMetadata): Array[Byte] = {
+  private[transaction] def valueToBytes(txnMetadata: TxnTransitMetadata): Array[Byte] = {
     import ValueSchema._
     val value = new Struct(Current)
     value.set(ProducerIdField, txnMetadata.producerId)
@@ -204,9 +204,9 @@ object TransactionLog {
     *
     * @return a transaction metadata object from the message
     */
-  def readTxnRecordValue(transactionalId: String, buffer: ByteBuffer): TransactionMetadata
= {
+  def readTxnRecordValue(transactionalId: String, buffer: ByteBuffer): Option[TransactionMetadata]
= {
     if (buffer == null) { // tombstone
-      null
+      None
     } else {
       import ValueSchema._
       val version = buffer.getShort
@@ -243,7 +243,7 @@ object TransactionLog {
           }
         }
 
-        transactionMetadata
+        Some(transactionMetadata)
       } else {
         throw new IllegalStateException(s"Unknown version $version from the transaction log
message value")
       }
@@ -256,16 +256,39 @@ object TransactionLog {
       Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach
{ txnKey =>
         val transactionalId = txnKey.transactionalId
         val value = consumerRecord.value
-        val producerIdMetadata =
-          if (value == null) "NULL"
-          else readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
+        val producerIdMetadata = if (value == null)
+          None
+        else
+          readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
         output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
         output.write("::".getBytes(StandardCharsets.UTF_8))
-        output.write(producerIdMetadata.toString.getBytes(StandardCharsets.UTF_8))
+        output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
         output.write("\n".getBytes(StandardCharsets.UTF_8))
       }
     }
   }
+
+  /**
+   * Exposed for printing records using [[kafka.tools.DumpLogSegments]]
+   */
+  def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = {
+    val txnKey = TransactionLog.readTxnRecordKey(record.key)
+    val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}"
+
+    val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value)
match {
+      case None => "<DELETE>"
+
+      case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," +
+        s"producerEpoch:${txnMetadata.producerEpoch}," +
+        s"state=${txnMetadata.state}," +
+        s"partitions=${txnMetadata.topicPartitions}," +
+        s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
+        s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
+    }
+
+    (Some(keyString), Some(valueString))
+  }
+
 }
 
 case class TxnKey(version: Short, transactionalId: String) {
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 9a9ed73..79f01e3 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -343,11 +343,11 @@ class TransactionStateManager(brokerId: Int,
                 val txnKey = TransactionLog.readTxnRecordKey(record.key)
                 // load transaction metadata along with transaction state
                 val transactionalId = txnKey.transactionalId
-                if (!record.hasValue) {
-                  loadedTransactions.remove(transactionalId)
-                } else {
-                  val txnMetadata = TransactionLog.readTxnRecordValue(transactionalId, record.value)
-                  loadedTransactions.put(transactionalId, txnMetadata)
+                TransactionLog.readTxnRecordValue(transactionalId, record.value) match {
+                  case None =>
+                    loadedTransactions.remove(transactionalId)
+                  case Some(txnMetadata) =>
+                    loadedTransactions.put(transactionalId, txnMetadata)
                 }
                 currOffset = batch.nextOffset
               }
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 35521b5..5af4a47 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -18,21 +18,18 @@
 package kafka.tools
 
 import java.io._
-import java.nio.ByteBuffer
 
-import kafka.coordinator.group.{GroupMetadataKey, GroupMetadataManager, OffsetKey}
+import kafka.coordinator.group.GroupMetadataManager
 import kafka.coordinator.transaction.TransactionLog
 import kafka.log._
 import kafka.serializer.Decoder
 import kafka.utils._
-import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
-import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.utils.Utils
 
-import scala.collection.{Map, mutable}
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 object DumpLogSegments {
 
@@ -212,7 +209,7 @@ object DumpLogSegments {
     }
   }
 
-  private trait MessageParser[K, V] {
+  private[kafka] trait MessageParser[K, V] {
     def parse(record: Record): (Option[K], Option[V])
   }
 
@@ -233,93 +230,6 @@ object DumpLogSegments {
     }
   }
 
-  private class TransactionLogMessageParser extends MessageParser[String, String] {
-
-    override def parse(record: Record): (Option[String], Option[String]) = {
-      val txnKey = TransactionLog.readTxnRecordKey(record.key)
-      val txnMetadata = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value)
-
-      val keyString = s"transactionalId=${txnKey.transactionalId}"
-      val valueString = s"producerId:${txnMetadata.producerId}," +
-        s"producerEpoch:${txnMetadata.producerEpoch}," +
-        s"state=${txnMetadata.state}," +
-        s"partitions=${txnMetadata.topicPartitions}," +
-        s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
-        s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
-
-      (Some(keyString), Some(valueString))
-    }
-
-  }
-
-  private class OffsetsMessageParser extends MessageParser[String, String] {
-    private def hex(bytes: Array[Byte]): String = {
-      if (bytes.isEmpty)
-        ""
-      else
-        "%X".format(BigInt(1, bytes))
-    }
-
-    private def parseOffsets(offsetKey: OffsetKey, payload: ByteBuffer) = {
-      val group = offsetKey.key.group
-      val topicPartition = offsetKey.key.topicPartition
-      val offset = GroupMetadataManager.readOffsetMessageValue(payload)
-
-      val keyString = s"offset::$group:${topicPartition.topic}:${topicPartition.partition}"
-      val valueString = if (offset.metadata.isEmpty)
-        String.valueOf(offset.offset)
-      else
-        s"${offset.offset}:${offset.metadata}"
-
-      (Some(keyString), Some(valueString))
-    }
-
-    private def parseGroupMetadata(groupMetadataKey: GroupMetadataKey, payload: ByteBuffer)
= {
-      val groupId = groupMetadataKey.key
-      val group = GroupMetadataManager.readGroupMessageValue(groupId, payload, Time.SYSTEM)
-      val protocolType = group.protocolType.getOrElse("")
-
-      val assignment = group.allMemberMetadata.map { member =>
-        if (protocolType == ConsumerProtocol.PROTOCOL_TYPE) {
-          val partitionAssignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
-          val userData = hex(Utils.toArray(partitionAssignment.userData()))
-
-          if (userData.isEmpty)
-            s"${member.memberId}=${partitionAssignment.partitions()}"
-          else
-            s"${member.memberId}=${partitionAssignment.partitions()}:$userData"
-        } else {
-          s"${member.memberId}=${hex(member.assignment)}"
-        }
-      }.mkString("{", ",", "}")
-
-      val keyString = Json.encodeAsString(Map("metadata" -> groupId).asJava)
-
-      val valueString = Json.encodeAsString(Map(
-        "protocolType" -> protocolType,
-        "protocol" -> group.protocolOrNull,
-        "generationId" -> group.generationId,
-        "assignment" -> assignment
-      ).asJava)
-
-      (Some(keyString), Some(valueString))
-    }
-
-    override def parse(record: Record): (Option[String], Option[String]) = {
-      if (!record.hasValue)
-        (None, None)
-      else if (!record.hasKey) {
-        throw new KafkaException("Failed to decode message using offset topic decoder (message
had a missing key)")
-      } else {
-        GroupMetadataManager.readMessageKey(record.key) match {
-          case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value)
-          case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey,
record.value)
-          case _ => throw new KafkaException("Failed to decode message using offset topic
decoder (message had an invalid key)")
-        }
-      }
-    }
-  }
-
   /* print out the contents of the log */
   private def dumpLog(file: File,
                       printContents: Boolean,
@@ -450,6 +360,18 @@ object DumpLogSegments {
     }
   }
 
+  private class OffsetsMessageParser extends MessageParser[String, String] {
+    override def parse(record: Record): (Option[String], Option[String]) = {
+      GroupMetadataManager.formatRecordKeyAndValue(record)
+    }
+  }
+
+  private class TransactionLogMessageParser extends MessageParser[String, String] {
+    override def parse(record: Record): (Option[String], Option[String]) = {
+      TransactionLog.formatRecordKeyAndValue(record)
+    }
+  }
+
   private class DumpLogSegmentsOptions(args: Array[String]) extends CommandDefaultOptions(args)
{
     val printOpt = parser.accepts("print-data-log", "if set, printing the messages content
when dumping data logs. Automatically set if any decoder option is specified.")
     val verifyOpt = parser.accepts("verify-index-only", "if set, just verify the index log
without printing its content.")
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 50dc0db..cb67c70 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -420,7 +420,7 @@ class GroupCoordinatorTest {
 
     val deadGroupId = "deadGroupId"
 
-    groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, Dead, new MockTime()))
+    groupCoordinator.groupManager.addGroup(new  GroupMetadata(deadGroupId, Dead, new MockTime()))
     val joinGroupResult = dynamicJoinGroup(deadGroupId, memberId, protocolType, protocols)
     assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, joinGroupResult.error)
   }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 516180d..947e4b3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -17,13 +17,13 @@
 
 package kafka.coordinator.group
 
-import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.Gauge
 import java.lang.management.ManagementFactory
 import java.nio.ByteBuffer
-import java.util.Collections
-import java.util.Optional
 import java.util.concurrent.locks.ReentrantLock
+import java.util.{Collections, Optional}
+
+import com.yammer.metrics.Metrics
+import com.yammer.metrics.core.Gauge
 import javax.management.ObjectName
 import kafka.api._
 import kafka.cluster.Partition
@@ -32,6 +32,7 @@ import kafka.log.{Log, LogAppendInfo}
 import kafka.server.{FetchDataInfo, FetchLogEnd, HostedPartition, KafkaConfig, LogOffsetMetadata,
ReplicaManager}
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.TopicPartition
@@ -41,6 +42,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.utils.Utils
 import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
 import org.junit.{Before, Test}
@@ -667,7 +669,7 @@ class GroupMetadataManagerTest {
 
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
-      protocolType = "consumer", protocol = "range", memberId, assignmentSize)
+      protocolType = "consumer", protocol = "range", memberId, new Array[Byte](assignmentSize))
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
       (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
 
@@ -2109,6 +2111,62 @@ class GroupMetadataManagerTest {
     }
   }
 
+  @Test
+  def testCommittedOffsetParsing(): Unit = {
+    val groupId = "group"
+    val topicPartition = new TopicPartition("topic", 0)
+    val offsetCommitRecord = TestUtils.records(Seq(
+      new SimpleRecord(
+        GroupMetadataManager.offsetCommitKey(groupId, topicPartition),
+        GroupMetadataManager.offsetCommitValue(OffsetAndMetadata(35L, "", time.milliseconds()),
ApiVersion.latestVersion)
+      )
+    )).records.asScala.head
+    val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(offsetCommitRecord)
+    assertEquals(Some(s"offset_commit::group=$groupId,partition=$topicPartition"), keyStringOpt)
+    assertEquals(Some("offset=35"), valueStringOpt)
+  }
+
+  @Test
+  def testCommittedOffsetTombstoneParsing(): Unit = {
+    val groupId = "group"
+    val topicPartition = new TopicPartition("topic", 0)
+    val offsetCommitRecord = TestUtils.records(Seq(
+      new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, topicPartition), null)
+    )).records.asScala.head
+    val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(offsetCommitRecord)
+    assertEquals(Some(s"offset_commit::group=$groupId,partition=$topicPartition"), keyStringOpt)
+    assertEquals(Some("<DELETE>"), valueStringOpt)
+  }
+
+  @Test
+  def testGroupMetadataParsingWithNullUserData(): Unit = {
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val memberId = "98098230493"
+    val assignmentBytes = Utils.toArray(ConsumerProtocol.serializeAssignment(
+      new ConsumerPartitionAssignor.Assignment(List(new TopicPartition("topic", 0)).asJava,
null)
+    ))
+    val groupMetadataRecord = TestUtils.records(Seq(
+      buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, assignmentBytes)
+    )).records.asScala.head
+    val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(groupMetadataRecord)
+    assertEquals(Some(s"group_metadata::group=$groupId"), keyStringOpt)
+    assertEquals(Some("{\"protocolType\":\"consumer\",\"protocol\":\"range\"," +
+      "\"generationId\":935,\"assignment\":\"{98098230493=[topic-0]}\"}"), valueStringOpt)
+  }
+
+  @Test
+  def testGroupMetadataTombstoneParsing(): Unit = {
+    val groupId = "group"
+    val groupMetadataRecord = TestUtils.records(Seq(
+      new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
+    )).records.asScala.head
+    val (keyStringOpt, valueStringOpt) = GroupMetadataManager.formatRecordKeyAndValue(groupMetadataRecord)
+    assertEquals(Some(s"group_metadata::group=$groupId"), keyStringOpt)
+    assertEquals(Some("<DELETE>"), valueStringOpt)
+  }
+
   private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse]
=> Unit] = {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
@@ -2149,14 +2207,14 @@ class GroupMetadataManagerTest {
                                                protocolType: String,
                                                protocol: String,
                                                memberId: String,
-                                               assignmentSize: Int = 0,
+                                               assignmentBytes: Array[Byte] = Array.emptyByteArray,
                                                apiVersion: ApiVersion = ApiVersion.latestVersion):
SimpleRecord = {
     val memberProtocols = List((protocol, Array.emptyByteArray))
     val member = new MemberMetadata(memberId, groupId, groupInstanceId, "clientId", "clientHost",
30000, 10000, protocolType, memberProtocols)
     val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol,
memberId,
       if (apiVersion >= KAFKA_2_1_IV0) Some(time.milliseconds()) else None, Seq(member),
time)
     val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
-    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId
-> new Array[Byte](assignmentSize)), apiVersion)
+    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId
-> assignmentBytes), apiVersion)
     new SimpleRecord(groupMetadataKey, groupMetadataValue)
   }
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
index 5df399e..e9bf6d5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala
@@ -17,9 +17,9 @@
 package kafka.coordinator.transaction
 
 
+import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.{CompressionType, SimpleRecord, MemoryRecords}
-
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
 import org.junit.Assert.assertEquals
 import org.junit.Test
 import org.scalatest.Assertions.intercept
@@ -86,7 +86,7 @@ class TransactionLogTest {
     for (record <- records.records.asScala) {
       val txnKey = TransactionLog.readTxnRecordKey(record.key)
       val transactionalId = txnKey.transactionalId
-      val txnMetadata = TransactionLog.readTxnRecordValue(transactionalId, record.value)
+      val txnMetadata = TransactionLog.readTxnRecordValue(transactionalId, record.value).get
 
       assertEquals(pidMappings(transactionalId), txnMetadata.producerId)
       assertEquals(producerEpoch, txnMetadata.producerEpoch)
@@ -104,4 +104,38 @@ class TransactionLogTest {
     assertEquals(pidMappings.size, count)
   }
 
+  @Test
+  def testTransactionMetadataParsing(): Unit = {
+    val transactionalId = "id"
+    val producerId = 1334L
+    val topicPartition = new TopicPartition("topic", 0)
+
+    val txnMetadata = TransactionMetadata(transactionalId, producerId, producerEpoch,
+      transactionTimeoutMs, Ongoing, 0)
+    txnMetadata.addPartitions(Set(topicPartition))
+
+    val keyBytes = TransactionLog.keyToBytes(transactionalId)
+    val valueBytes = TransactionLog.valueToBytes(txnMetadata.prepareNoTransit())
+    val transactionMetadataRecord = TestUtils.records(Seq(
+      new SimpleRecord(keyBytes, valueBytes)
+    )).records.asScala.head
+
+    val (keyStringOpt, valueStringOpt) = TransactionLog.formatRecordKeyAndValue(transactionMetadataRecord)
+    assertEquals(Some(s"transaction_metadata::transactionalId=$transactionalId"), keyStringOpt)
+    assertEquals(Some(s"producerId:$producerId,producerEpoch:$producerEpoch,state=Ongoing,"
+
+      s"partitions=Set($topicPartition),txnLastUpdateTimestamp=0,txnTimeoutMs=$transactionTimeoutMs"),
valueStringOpt)
+  }
+
+  @Test
+  def testTransactionMetadataTombstoneParsing(): Unit = {
+    val transactionalId = "id"
+    val transactionMetadataRecord = TestUtils.records(Seq(
+      new SimpleRecord(TransactionLog.keyToBytes(transactionalId), null)
+    )).records.asScala.head
+
+    val (keyStringOpt, valueStringOpt) = TransactionLog.formatRecordKeyAndValue(transactionMetadataRecord)
+    assertEquals(Some(s"transaction_metadata::transactionalId=$transactionalId"), keyStringOpt)
+    assertEquals(Some("<DELETE>"), valueStringOpt)
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 0109feb..7bfbeab 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -20,7 +20,6 @@ package kafka.tools
 import java.io.{ByteArrayOutputStream, File}
 import java.util.Properties
 
-import scala.collection.mutable
 import kafka.log.{Log, LogConfig, LogManager}
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
 import kafka.tools.DumpLogSegments.TimeIndexDumpErrors
@@ -31,6 +30,7 @@ import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.scalatest.Assertions.fail
 
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 case class BatchInfo(records: Seq[SimpleRecord], hasKeys: Boolean, hasValues: Boolean)


Mime
View raw message