kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7403; Use default timestamp if no expire timestamp set in offset commit value (#5690)
Date Tue, 25 Sep 2018 16:06:43 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

The following commit(s) were added to refs/heads/trunk by this push:
     new 2933f21  KAFKA-7403; Use default timestamp if no expire timestamp set in offset commit
value (#5690)
2933f21 is described below

commit 2933f21374ed383e4db1dfb9c7df02f1707903ba
Author: Vahid Hashemian <vahid.hashemian@gmail.com>
AuthorDate: Tue Sep 25 09:06:30 2018 -0700

    KAFKA-7403; Use default timestamp if no expire timestamp set in offset commit value (#5690)
    This fixes a regression caused by KAFKA-4682 (KIP-211) which caused offset commit failures
after upgrading from an older version which used the v1 inter-broker format.
 .../src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala | 4 ++--
 core/src/main/scala/kafka/server/KafkaApis.scala                      | 3 +--
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 8cf99fc..dba8b4e 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.Type._
 import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
+import org.apache.kafka.common.requests.{IsolationLevel, OffsetCommitRequest, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -1129,7 +1129,7 @@ object GroupMetadataManager {
       value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
       value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
       // version 1 has a non empty expireTimestamp field
-      value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.get)
+      value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index afbe5b8..10119c8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -332,11 +332,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       } else {
         // for version 1 and beyond store offsets in offset manager
-        // commit timestamp is always set to now.
         // "default" expiration timestamp is now + retention (and retention may be overridden
if v2)
         // expire timestamp is computed differently for v1 and v2.
         //   - If v1 and no explicit commit timestamp is provided we treat it the same as
-        //   - If v1 and explicit commit timestamp is provided we calculate retention from
that explicit commit timestamp
+        //   - If v1 and explicit retention time is provided we calculate expiration timestamp
based on that
         //   - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5.
         //   - For v5 and beyond there is no per partition expiration timestamp, so this
field is no longer in effect
         val currentTimestamp = time.milliseconds

View raw message