kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5248; Remove unused/unneeded retention time in TxnOffsetCommitRequest
Date Mon, 15 May 2017 19:59:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c26545ea5 -> 018679410


KAFKA-5248; Remove unused/unneeded retention time in TxnOffsetCommitRequest

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Apurva Mehta <apurva@confluent.io>,
Ismael Juma <ismael@juma.me.uk>

Closes #3058 from hachikuji/KAFKA-5248


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01867941
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01867941
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01867941

Branch: refs/heads/trunk
Commit: 0186794104b7106fe426024d65730fec79ad999a
Parents: c26545e
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon May 15 12:58:36 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon May 15 12:58:50 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/TransactionManager.java     |  3 +--
 .../org/apache/kafka/common/protocol/Protocol.java |  3 ---
 .../common/requests/TxnOffsetCommitRequest.java    | 17 +++--------------
 .../kafka/common/requests/RequestResponseTest.java |  2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  2 +-
 5 files changed, 6 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/01867941/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 7e2f813..f3ed252 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -37,7 +37,6 @@ import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest.CommittedOffset;
@@ -461,7 +460,7 @@ public class TransactionManager {
             pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
         }
         TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(consumerGroupId,
-                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, OffsetCommitRequest.DEFAULT_RETENTION_TIME,
+                producerIdAndEpoch.producerId, producerIdAndEpoch.epoch,
                 pendingTxnOffsetCommits);
         return new TxnOffsetCommitHandler(result, builder);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01867941/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index fb3c8c9..5e05738 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1521,9 +1521,6 @@ public class Protocol {
             new Field("producer_epoch",
                     INT16,
                     "Current epoch associated with the producer id."),
-            new Field("retention_time",
-                    INT64,
-                    "The time in ms to retain the offset."),
             new Field("topics",
                     new ArrayOf(new Schema(
                             new Field("topic", STRING),

http://git-wip-us.apache.org/repos/asf/kafka/blob/01867941/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 3f3024f..f5334f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -30,7 +30,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
     private static final String CONSUMER_GROUP_ID_KEY_NAME = "consumer_group_id";
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String PRODUCER_EPOCH_KEY_NAME = "producer_epoch";
-    private static final String RETENTION_TIME_KEY_NAME = "retention_time";
     private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
@@ -42,16 +41,14 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
         private final String consumerGroupId;
         private final long producerId;
         private final short producerEpoch;
-        private final long retentionTimeMs;
         private final Map<TopicPartition, CommittedOffset> offsets;
 
-        public Builder(String consumerGroupId, long producerId, short producerEpoch, long
retentionTimeMs,
+        public Builder(String consumerGroupId, long producerId, short producerEpoch,
                        Map<TopicPartition, CommittedOffset> offsets) {
             super(ApiKeys.TXN_OFFSET_COMMIT);
             this.consumerGroupId = consumerGroupId;
             this.producerId = producerId;
             this.producerEpoch = producerEpoch;
-            this.retentionTimeMs = retentionTimeMs;
             this.offsets = offsets;
         }
 
@@ -61,23 +58,21 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
 
         @Override
         public TxnOffsetCommitRequest build(short version) {
-            return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch,
retentionTimeMs, offsets);
+            return new TxnOffsetCommitRequest(version, consumerGroupId, producerId, producerEpoch,
offsets);
         }
     }
 
     private final String consumerGroupId;
     private final long producerId;
     private final short producerEpoch;
-    private final long retentionTimeMs;
     private final Map<TopicPartition, CommittedOffset> offsets;
 
     public TxnOffsetCommitRequest(short version, String consumerGroupId, long producerId,
short producerEpoch,
-                                  long retentionTimeMs, Map<TopicPartition, CommittedOffset>
offsets) {
+                                  Map<TopicPartition, CommittedOffset> offsets) {
         super(version);
         this.consumerGroupId = consumerGroupId;
         this.producerId = producerId;
         this.producerEpoch = producerEpoch;
-        this.retentionTimeMs = retentionTimeMs;
         this.offsets = offsets;
     }
 
@@ -86,7 +81,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
         this.consumerGroupId = struct.getString(CONSUMER_GROUP_ID_KEY_NAME);
         this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
         this.producerEpoch = struct.getShort(PRODUCER_EPOCH_KEY_NAME);
-        this.retentionTimeMs = struct.getLong(RETENTION_TIME_KEY_NAME);
 
         Map<TopicPartition, CommittedOffset> offsets = new HashMap<>();
         Object[] topicPartitionsArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
@@ -116,10 +110,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
         return producerEpoch;
     }
 
-    public long retentionTimeMs() {
-        return retentionTimeMs;
-    }
-
     public Map<TopicPartition, CommittedOffset> offsets() {
         return offsets;
     }
@@ -130,7 +120,6 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
         struct.set(CONSUMER_GROUP_ID_KEY_NAME, consumerGroupId);
         struct.set(PRODUCER_ID_KEY_NAME, producerId);
         struct.set(PRODUCER_EPOCH_KEY_NAME, producerEpoch);
-        struct.set(RETENTION_TIME_KEY_NAME, retentionTimeMs);
 
         Map<String, Map<Integer, CommittedOffset>> mappedPartitionOffsets = CollectionUtils.groupDataByTopic(offsets);
         Object[] partitionsArray = new Object[mappedPartitionOffsets.size()];

http://git-wip-us.apache.org/repos/asf/kafka/blob/01867941/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index cbfb6a9..1cfd6a3 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -951,7 +951,7 @@ public class RequestResponseTest {
         final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> offsets =
new HashMap<>();
         offsets.put(new TopicPartition("topic", 73),
                     new TxnOffsetCommitRequest.CommittedOffset(100, null));
-        return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, 73, offsets).build();
+        return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, offsets).build();
     }
 
     private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01867941/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 425b9f1..4cf3e7d 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -270,7 +270,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new WriteTxnMarkersRequest.Builder(List.empty.asJava)
 
         case ApiKeys.TXN_OFFSET_COMMIT =>
-          new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, 3600, Map.empty.asJava)
+          new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, Map.empty.asJava)
 
         case key =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)


Mime
View raw message