kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-8069; Fix early expiration of offsets due to invalid loading of expire timestamp (#6401)
Date Fri, 08 Mar 2019 17:04:17 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 44be5d2  KAFKA-8069; Fix early expiration of offsets due to invalid loading of expire
timestamp (#6401)
44be5d2 is described below

commit 44be5d2221c9faff2ecb17878d6193504b7289aa
Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>
AuthorDate: Fri Mar 8 09:04:04 2019 -0800

    KAFKA-8069; Fix early expiration of offsets due to invalid loading of expire timestamp
(#6401)
    
    After the 2.1 release, if the broker hasn't been upgrade to the latest inter-broker protocol
version, the committed offsets stored in the __consumer_offset topic will get cleaned up way
earlier than it should be when the offsets are loaded back from the __consumer_offset topic
in GroupCoordinator, which will happen during leadership transition or after broker bounce.
This patch fixes the bug by setting expireTimestamp to None if it is the default value after
loading v1 offset records  [...]
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.scala   |  5 +++-
 .../group/GroupMetadataManagerTest.scala           | 28 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index edec228..7b24498 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -1273,7 +1273,10 @@ object GroupMetadataManager {
         val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
         val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
 
-        OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
+        if (expireTimestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
+          OffsetAndMetadata(offset, metadata, commitTimestamp)
+        else
+          OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
       } else if (version == 2) {
         val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
         val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 3ab4a13..0287888 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1768,6 +1768,34 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testSerdeOffsetCommitValueWithNoneExpireTimestamp(): Unit = {
+    val offsetAndMetadata = OffsetAndMetadata(
+      offset = 537L,
+      leaderEpoch = Optional.empty(),
+      metadata = "metadata",
+      commitTimestamp = time.milliseconds(),
+      expireTimestamp = None)
+
+    def verifySerde(apiVersion: ApiVersion): Unit = {
+      val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
+      val buffer = ByteBuffer.wrap(bytes)
+      val version = buffer.getShort(0).toInt
+      if (apiVersion < KAFKA_2_1_IV0)
+        assertEquals(1, version)
+      else if (apiVersion < KAFKA_2_1_IV1)
+        assertEquals(2, version)
+      else
+        assertEquals(3, version)
+
+      val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
+      assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
+    }
+
+    for (version <- ApiVersion.allVersions)
+      verifySerde(version)
+  }
+
+  @Test
   def testLoadOffsetsWithEmptyControlBatch() {
     val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L


Mime
View raw message