kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-8069; Fix early expiration of offsets due to invalid loading of expire timestamp (#6401)
Date Fri, 08 Mar 2019 17:21:56 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 5626d03  KAFKA-8069; Fix early expiration of offsets due to invalid loading of expire
timestamp (#6401)
5626d03 is described below

commit 5626d03e36d6168d73cb421e9b384ba79df5b9d5
Author: Zhanxiang (Patrick) Huang <hzxa21@hotmail.com>
AuthorDate: Fri Mar 8 09:04:04 2019 -0800

    KAFKA-8069; Fix early expiration of offsets due to invalid loading of expire timestamp
(#6401)
    
    After the 2.1 release, if the broker hasn't been upgrade to the latest inter-broker protocol
version, the committed offsets stored in the __consumer_offset topic will get cleaned up way
earlier than it should be when the offsets are loaded back from the __consumer_offset topic
in GroupCoordinator, which will happen during leadership transition or after broker bounce.
This patch fixes the bug by setting expireTimestamp to None if it is the default value after
loading v1 offset records  [...]
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.scala   |  5 +++-
 .../group/GroupMetadataManagerTest.scala           | 28 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 626aaad..de8adab 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -1273,7 +1273,10 @@ object GroupMetadataManager {
         val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
         val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
 
-        OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
+        if (expireTimestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
+          OffsetAndMetadata(offset, metadata, commitTimestamp)
+        else
+          OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
       } else if (version == 2) {
         val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
         val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 3ab4a13..0287888 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1768,6 +1768,34 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testSerdeOffsetCommitValueWithNoneExpireTimestamp(): Unit = {
+    val offsetAndMetadata = OffsetAndMetadata(
+      offset = 537L,
+      leaderEpoch = Optional.empty(),
+      metadata = "metadata",
+      commitTimestamp = time.milliseconds(),
+      expireTimestamp = None)
+
+    def verifySerde(apiVersion: ApiVersion): Unit = {
+      val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
+      val buffer = ByteBuffer.wrap(bytes)
+      val version = buffer.getShort(0).toInt
+      if (apiVersion < KAFKA_2_1_IV0)
+        assertEquals(1, version)
+      else if (apiVersion < KAFKA_2_1_IV1)
+        assertEquals(2, version)
+      else
+        assertEquals(3, version)
+
+      val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
+      assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
+    }
+
+    for (version <- ApiVersion.allVersions)
+      verifySerde(version)
+  }
+
+  @Test
   def testLoadOffsetsWithEmptyControlBatch() {
     val groupMetadataTopicPartition = groupTopicPartition
     val startOffset = 15L


Mime
View raw message