kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1634; Bump up Offset Commit Request to v2 to add global retention and remove per-partition commit timestamp; reviewed by Joel Koshy and Jun Rao
Date Fri, 27 Mar 2015 00:16:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d2f50fc38 -> c5df2a8e3


KAFKA-1634; Bump up Offset Commit Request to v2 to add global retention and remove per-partition commit timestamp; reviewed by Joel Koshy and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c5df2a8e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c5df2a8e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c5df2a8e

Branch: refs/heads/trunk
Commit: c5df2a8e3acca1e2c905fa6b78e73e09b1dd0cd7
Parents: d2f50fc
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Mar 26 17:16:33 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Mar 26 17:16:33 2015 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Coordinator.java |   8 +-
 .../apache/kafka/common/protocol/Protocol.java  |  77 ++++++++----
 .../kafka/common/protocol/types/Struct.java     |   6 +-
 .../common/requests/OffsetCommitRequest.java    | 103 +++++++++++++---
 .../common/requests/RequestResponseTest.java    |   4 +-
 .../scala/kafka/api/OffsetCommitRequest.scala   |  73 +++++++----
 .../scala/kafka/api/OffsetFetchRequest.scala    |  13 +-
 .../kafka/common/OffsetMetadataAndError.scala   |  64 ++++++----
 .../consumer/ZookeeperConsumerConnector.scala   |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  34 +++++-
 .../main/scala/kafka/server/KafkaConfig.scala   |   2 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   1 +
 .../main/scala/kafka/server/OffsetManager.scala |  97 +++++++++++----
 .../scala/kafka/server/ReplicaManager.scala     |   2 +-
 .../api/RequestResponseSerializationTest.scala  |  50 +++++---
 .../unit/kafka/server/OffsetCommitTest.scala    | 122 +++++++++++++++----
 .../unit/kafka/server/ServerShutdownTest.scala  |   4 +-
 17 files changed, 490 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 436f9b2..8d44814 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -162,8 +162,12 @@ public final class Coordinator {
             Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData;
             offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
             for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
-                offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), now, ""));
-            OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, this.generation, this.consumerId, offsetData);
+                offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
+            OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
+                this.generation,
+                this.consumerId,
+                OffsetCommitRequest.DEFAULT_RETENTION_TIME,
+                offsetData);
 
             // send request and possibly wait for response if it is blocking
             RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 101f382..9c4518e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -89,31 +89,24 @@ public class Protocol {
     /* Produce api */
 
     public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
-                                                                  new Field("data",
-                                                                            new ArrayOf(new Schema(new Field("partition",
-                                                                                                             INT32),
-                                                                                                   new Field("record_set",
-                                                                                                             BYTES)))));
+                                                                  new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
+                                                                                                     new Field("record_set", BYTES)))));
 
     public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
-                                                                         INT16,
-                                                                         "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
-                                                               new Field("timeout",
-                                                                         INT32,
-                                                                         "The time to await a response in ms."),
-                                                               new Field("topic_data",
-                                                                         new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+                                                                   INT16,
+                                                                   "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
+                                                               new Field("timeout", INT32, "The time to await a response in ms."),
+                                                               new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
 
     public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                          new ArrayOf(new Schema(new Field("topic",
-                                                                                                           STRING),
-                                                                                                 new Field("partition_responses",
-                                                                                                           new ArrayOf(new Schema(new Field("partition",
-                                                                                                                                            INT32),
-                                                                                                                                  new Field("error_code",
-                                                                                                                                            INT16),
-                                                                                                                                  new Field("base_offset",
-                                                                                                                                            INT64))))))));
+                                                                    new ArrayOf(new Schema(new Field("topic", STRING),
+                                                                                           new Field("partition_responses",
+                                                                                                     new ArrayOf(new Schema(new Field("partition",
+                                                                                                                                      INT32),
+                                                                                                                            new Field("error_code",
+                                                                                                                                      INT16),
+                                                                                                                            new Field("base_offset",
+                                                                                                                                      INT64))))))));
 
     public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
     public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
@@ -132,6 +125,16 @@ public class Protocol {
                                                                                          STRING,
                                                                                          "Any associated metadata the client wants to keep."));
 
+    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
+                                                                                         INT32,
+                                                                                         "Topic partition id."),
+                                                                               new Field("offset",
+                                                                                         INT64,
+                                                                                         "Message offset to be committed."),
+                                                                               new Field("metadata",
+                                                                                         STRING,
+                                                                                         "Any associated metadata the client wants to keep."));
+
     public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
                                                                                      STRING,
                                                                                      "Topic to commit."),
@@ -139,6 +142,13 @@ public class Protocol {
                                                                                      new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
                                                                                      "Partitions to commit offsets."));
 
+    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
+                                                                                     STRING,
+                                                                                     "Topic to commit."),
+                                                                           new Field("partitions",
+                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
+                                                                                     "Partitions to commit offsets."));
+
     public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
                                                                                STRING,
                                                                                "The consumer group id."),
@@ -159,10 +169,27 @@ public class Protocol {
                                                                                new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
                                                                                "Topics to commit offsets."));
 
+    public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id",
+                                                                               STRING,
+                                                                               "The consumer group id."),
+                                                                     new Field("group_generation_id",
+                                                                               INT32,
+                                                                               "The generation of the consumer group."),
+                                                                     new Field("consumer_id",
+                                                                               STRING,
+                                                                               "The consumer id assigned by the group coordinator."),
+                                                                     new Field("retention_time",
+                                                                               INT64,
+                                                                               "Time period in ms to retain the offset."),
+                                                                     new Field("topics",
+                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
+                                                                               "Topics to commit offsets."));
+
     public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                           INT32,
                                                                                           "Topic partition id."),
-                                                                                new Field("error_code", INT16));
+                                                                                new Field("error_code",
+                                                                                          INT16));
 
     public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
                                                                             new Field("partition_responses",
@@ -171,9 +198,9 @@ public class Protocol {
     public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
                                                                                 new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
 
-    public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1};
-    /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
-    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+    public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2};
+    /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
+    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
 
     /* Offset fetch api */
     public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 7672a3a..92de6a9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -52,7 +52,7 @@ public class Struct {
         else if (field.defaultValue != Field.NO_DEFAULT)
             return field.defaultValue;
         else
-            throw new SchemaException("Missing value for field '" + field.name + " which has no default value.");
+            throw new SchemaException("Missing value for field '" + field.name + "' which has no default value.");
     }
 
     /**
@@ -191,7 +191,7 @@ public class Struct {
             ArrayOf array = (ArrayOf) field.type();
             return new Struct((Schema) array.type());
         } else {
-            throw new SchemaException("Field " + field.name + " is not a container type, it is of type " + field.type());
+            throw new SchemaException("Field '" + field.name + "' is not a container type, it is of type " + field.type());
         }
     }
 
@@ -234,7 +234,7 @@ public class Struct {
      */
     private void validateField(Field field) {
         if (this.schema != field.schema)
-            throw new SchemaException("Attempt to access field '" + field.name + " from a different schema instance.");
+            throw new SchemaException("Attempt to access field '" + field.name + "' from a different schema instance.");
         if (field.index > values.length)
             throw new SchemaException("Invalid field index: " + field.index);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 94e9d37..b92f670 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -29,12 +29,12 @@ import org.apache.kafka.common.utils.CollectionUtils;
  * This wrapper supports both v0 and v1 of OffsetCommitRequest.
  */
 public class OffsetCommitRequest extends AbstractRequestResponse {
-    
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
     private static final String GROUP_ID_KEY_NAME = "group_id";
     private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
     private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
     private static final String TOPICS_KEY_NAME = "topics";
+    private static final String RETENTION_TIME_KEY_NAME = "retention_time";
 
     // topic level field names
     private static final String TOPIC_KEY_NAME = "topic";
@@ -43,27 +43,44 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String COMMIT_OFFSET_KEY_NAME = "offset";
-    private static final String TIMESTAMP_KEY_NAME = "timestamp";
     private static final String METADATA_KEY_NAME = "metadata";
 
+    @Deprecated
+    private static final String TIMESTAMP_KEY_NAME = "timestamp";         // for v0, v1
+
+    // default values for the current version
     public static final int DEFAULT_GENERATION_ID = -1;
     public static final String DEFAULT_CONSUMER_ID = "";
+    public static final long DEFAULT_RETENTION_TIME = -1L;
+
+    // default values for old versions,
+    // will be removed after these versions are deprecated
+    @Deprecated
+    public static final long DEFAULT_TIMESTAMP = -1L;            // for V0, V1
 
     private final String groupId;
-    private final int generationId;
     private final String consumerId;
+    private final int generationId;
+    private final long retentionTime;
     private final Map<TopicPartition, PartitionData> offsetData;
 
     public static final class PartitionData {
+        @Deprecated
+        public final long timestamp;                // for V0, V1
+
         public final long offset;
-        public final long timestamp;
         public final String metadata;
 
+        @Deprecated
         public PartitionData(long offset, long timestamp, String metadata) {
             this.offset = offset;
             this.timestamp = timestamp;
             this.metadata = metadata;
         }
+
+        public PartitionData(long offset, String metadata) {
+            this(offset, DEFAULT_TIMESTAMP, metadata);
+        }
     }
 
     /**
@@ -78,6 +95,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
         this.groupId = groupId;
         this.generationId = DEFAULT_GENERATION_ID;
         this.consumerId = DEFAULT_CONSUMER_ID;
+        this.retentionTime = DEFAULT_RETENTION_TIME;
         this.offsetData = offsetData;
     }
 
@@ -88,15 +106,39 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
      * @param consumerId
      * @param offsetData
      */
+    @Deprecated
     public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
+        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1)));
+
+        initCommonFields(groupId, offsetData);
+        struct.set(GENERATION_ID_KEY_NAME, generationId);
+        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        this.groupId = groupId;
+        this.generationId = generationId;
+        this.consumerId = consumerId;
+        this.retentionTime = DEFAULT_RETENTION_TIME;
+        this.offsetData = offsetData;
+    }
+
+    /**
+     * Constructor for version 2.
+     * @param groupId
+     * @param generationId
+     * @param consumerId
+     * @param retentionTime
+     * @param offsetData
+     */
+    public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) {
         super(new Struct(CURRENT_SCHEMA));
 
         initCommonFields(groupId, offsetData);
         struct.set(GENERATION_ID_KEY_NAME, generationId);
         struct.set(CONSUMER_ID_KEY_NAME, consumerId);
+        struct.set(RETENTION_TIME_KEY_NAME, retentionTime);
         this.groupId = groupId;
         this.generationId = generationId;
         this.consumerId = consumerId;
+        this.retentionTime = retentionTime;
         this.offsetData = offsetData;
     }
 
@@ -105,7 +147,8 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
 
         struct.set(GROUP_ID_KEY_NAME, groupId);
         List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
+
+        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
             Struct topicData = struct.instance(TOPICS_KEY_NAME);
             topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
             List<Struct> partitionArray = new ArrayList<Struct>();
@@ -114,7 +157,9 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
                 partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
-                partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
+                // Only for v0 and v1
+                if (partitionData.hasField(TIMESTAMP_KEY_NAME))
+                    partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
                 partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
                 partitionArray.add(partitionData);
             }
@@ -126,20 +171,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
 
     public OffsetCommitRequest(Struct struct) {
         super(struct);
-        offsetData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
-                long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
-                String metadata = partitionResponse.getString(METADATA_KEY_NAME);
-                PartitionData partitionData = new PartitionData(offset, timestamp, metadata);
-                offsetData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
+
         groupId = struct.getString(GROUP_ID_KEY_NAME);
         // This field only exists in v1.
         if (struct.hasField(GENERATION_ID_KEY_NAME))
@@ -152,6 +184,33 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
             consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
         else
             consumerId = DEFAULT_CONSUMER_ID;
+
+        // This field only exists in v2
+        if (struct.hasField(RETENTION_TIME_KEY_NAME))
+            retentionTime = struct.getLong(RETENTION_TIME_KEY_NAME);
+        else
+            retentionTime = DEFAULT_RETENTION_TIME;
+
+        offsetData = new HashMap<TopicPartition, PartitionData>();
+        for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) {
+            Struct topicData = (Struct) topicDataObj;
+            String topic = topicData.getString(TOPIC_KEY_NAME);
+            for (Object partitionDataObj : topicData.getArray(PARTITIONS_KEY_NAME)) {
+                Struct partitionDataStruct = (Struct) partitionDataObj;
+                int partition = partitionDataStruct.getInt(PARTITION_KEY_NAME);
+                long offset = partitionDataStruct.getLong(COMMIT_OFFSET_KEY_NAME);
+                String metadata = partitionDataStruct.getString(METADATA_KEY_NAME);
+                PartitionData partitionOffset;
+                // This field only exists in v0 and v1
+                if (partitionDataStruct.hasField(TIMESTAMP_KEY_NAME)) {
+                    long timestamp = partitionDataStruct.getLong(TIMESTAMP_KEY_NAME);
+                    partitionOffset = new PartitionData(offset, timestamp, metadata);
+                } else {
+                    partitionOffset = new PartitionData(offset, metadata);
+                }
+                offsetData.put(new TopicPartition(topic, partition), partitionOffset);
+            }
+        }
     }
 
     public String groupId() {
@@ -166,6 +225,10 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
         return consumerId;
     }
 
+    public long retentionTime() {
+        return retentionTime;
+    }
+
     public Map<TopicPartition, PartitionData> offsetData() {
         return offsetData;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 13237fd..61a767a 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -140,8 +140,8 @@ public class RequestResponseTest {
 
     private AbstractRequestResponse createOffsetCommitRequest() {
         Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>();
-        commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, 1000000, ""));
-        return new OffsetCommitRequest("group1", 100, "consumer1", commitData);
+        commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, ""));
+        return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData);
     }
 
     private AbstractRequestResponse createOffsetCommitResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 050615c..cf8e6ac 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -21,21 +21,19 @@ import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import kafka.utils.{SystemTime, Logging}
 import kafka.network.{RequestChannel, BoundedByteBufferSend}
-import kafka.common.{OffsetAndMetadata, ErrorMapping, TopicAndPartition}
+import kafka.common.{OffsetMetadata, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
 import kafka.network.RequestChannel.Response
 import scala.collection._
 
 object OffsetCommitRequest extends Logging {
-  val CurrentVersion: Short = 1
+  val CurrentVersion: Short = 2
   val DefaultClientId = ""
 
   def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
-    val now = SystemTime.milliseconds
-
     // Read values from the envelope
     val versionId = buffer.getShort
-    assert(versionId == 0 || versionId == 1,
-           "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.")
+    assert(versionId == 0 || versionId == 1 || versionId == 2,
+           "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0, 1 or 2.")
 
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
@@ -43,13 +41,25 @@ object OffsetCommitRequest extends Logging {
     // Read the OffsetRequest 
     val consumerGroupId = readShortString(buffer)
 
-    // version 1 specific fields
-    var groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID
-    var consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID
-    if (versionId == 1) {
-      groupGenerationId = buffer.getInt
-      consumerId = readShortString(buffer)
-    }
+    // version 1 and 2 specific fields
+    val groupGenerationId: Int =
+      if (versionId >= 1)
+        buffer.getInt
+      else
+        org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID
+
+    val consumerId: String =
+      if (versionId >= 1)
+        readShortString(buffer)
+      else
+        org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID
+
+    // version 2 specific fields
+    val retentionMs: Long =
+      if (versionId >= 2)
+        buffer.getLong
+      else
+        org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME
 
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
@@ -59,14 +69,18 @@ object OffsetCommitRequest extends Logging {
         val partitionId = buffer.getInt
         val offset = buffer.getLong
         val timestamp = {
-          val given = buffer.getLong
-          if (given == -1L) now else given
+          if (versionId <= 1)
+            buffer.getLong
+          else
+            org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
         }
         val metadata = readShortString(buffer)
+
         (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp))
       })
     })
-    OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId)
+
+    OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId, retentionMs)
   }
 }
 
@@ -76,11 +90,12 @@ case class OffsetCommitRequest(groupId: String,
                                correlationId: Int = 0,
                                clientId: String = OffsetCommitRequest.DefaultClientId,
                                groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID,
-                               consumerId: String =  org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID)
+                               consumerId: String =  org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID,
+                               retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME)
     extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
 
-  assert(versionId == 0 || versionId == 1,
-         "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.")
+  assert(versionId == 0 || versionId == 1 || versionId == 2,
+         "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0, 1 or 2.")
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 
@@ -93,11 +108,17 @@ case class OffsetCommitRequest(groupId: String,
     // Write OffsetCommitRequest
     writeShortString(buffer, groupId)             // consumer group
 
-    // version 1 specific data
-    if (versionId == 1) {
+    // version 1 and 2 specific data
+    if (versionId >= 1) {
       buffer.putInt(groupGenerationId)
       writeShortString(buffer, consumerId)
     }
+
+    // version 2 or above specific data
+    if (versionId >= 2) {
+      buffer.putLong(retentionMs)
+    }
+
     buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
     requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
       writeShortString(buffer, t1._1) // topic
@@ -105,7 +126,9 @@ case class OffsetCommitRequest(groupId: String,
       t1._2.foreach( t2 => {
         buffer.putInt(t2._1.partition)
         buffer.putLong(t2._2.offset)
-        buffer.putLong(t2._2.timestamp)
+        // version 0 and 1 specific data
+        if (versionId <= 1)
+          buffer.putLong(t2._2.commitTimestamp)
         writeShortString(buffer, t2._2.metadata)
       })
     })
@@ -116,7 +139,8 @@ case class OffsetCommitRequest(groupId: String,
     4 + /* correlationId */
     shortStringLength(clientId) +
     shortStringLength(groupId) +
-    (if (versionId == 1) 4 /* group generation id */ + shortStringLength(consumerId) else 0) +
+    (if (versionId >= 1) 4 /* group generation id */ + shortStringLength(consumerId) else 0) +
+    (if (versionId >= 2) 8 /* retention time */ else 0) +
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
       val (topic, offsets) = topicAndOffsets
@@ -127,7 +151,7 @@ case class OffsetCommitRequest(groupId: String,
         innerCount +
         4 /* partition */ +
         8 /* offset */ +
-        8 /* timestamp */ +
+        (if (versionId <= 1) 8 else 0) /* timestamp */ +
         shortStringLength(offsetAndMetadata._2.metadata)
       })
     })
@@ -149,6 +173,7 @@ case class OffsetCommitRequest(groupId: String,
     offsetCommitRequest.append("; GroupId: " + groupId)
     offsetCommitRequest.append("; GroupGenerationId: " + groupGenerationId)
     offsetCommitRequest.append("; ConsumerId: " + consumerId)
+    offsetCommitRequest.append("; RetentionMs: " + retentionMs)
     if(details)
       offsetCommitRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     offsetCommitRequest.toString()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index c7604b9..67811a7 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -17,13 +17,17 @@
 
 package kafka.api
 
-import java.nio.ByteBuffer
-
 import kafka.api.ApiUtils._
 import kafka.utils.Logging
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common._
+import kafka.common.TopicAndPartition
 import kafka.network.RequestChannel.Response
-import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
+
+import scala.Some
+
+import java.nio.ByteBuffer
+
 object OffsetFetchRequest extends Logging {
   val CurrentVersion: Short = 0
   val DefaultClientId = ""
@@ -91,8 +95,7 @@ case class OffsetFetchRequest(groupId: String,
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val responseMap = requestInfo.map {
       case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
-        offset = OffsetAndMetadata.InvalidOffset,
-        error = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+        ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
       ))
     }.toMap
     val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 1584a92..139913f 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -17,40 +17,60 @@
 
 package kafka.common
 
-case class OffsetAndMetadata(offset: Long,
-                             metadata: String = OffsetAndMetadata.NoMetadata,
-                             timestamp: Long = -1L) {
-  override def toString = "OffsetAndMetadata[%d,%s%s]"
-                          .format(offset,
-                                  if (metadata != null && metadata.length > 0) metadata else "NO_METADATA",
-                                  if (timestamp == -1) "" else "," + timestamp.toString)
+case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) {
+  override def toString = "OffsetMetadata[%d,%s]"
+    .format(offset,
+    if (metadata != null && metadata.length > 0) metadata else "NO_METADATA")
 }
 
-object OffsetAndMetadata {
+object OffsetMetadata {
   val InvalidOffset: Long = -1L
   val NoMetadata: String = ""
-  val InvalidTime: Long = -1L
+
+  val InvalidOffsetMetadata = OffsetMetadata(OffsetMetadata.InvalidOffset, OffsetMetadata.NoMetadata)
+}
+
+case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
+                             commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
+                             expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {
+
+  def offset() = offsetMetadata.offset
+
+  def metadata() = offsetMetadata.metadata
+
+  override def toString = "[%s,CommitTime %d,ExpirationTime %d]".format(offsetMetadata, commitTimestamp, expireTimestamp)
 }
 
-case class OffsetMetadataAndError(offset: Long,
-                                  metadata: String = OffsetAndMetadata.NoMetadata,
-                                  error: Short = ErrorMapping.NoError) {
+object OffsetAndMetadata {
+  def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, expireTimestamp)
+
+  def apply(offset: Long, metadata: String, timestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), timestamp)
+
+  def apply(offset: Long, metadata: String) = new OffsetAndMetadata(OffsetMetadata(offset, metadata))
 
-  def this(offsetMetadata: OffsetAndMetadata, error: Short) =
-    this(offsetMetadata.offset, offsetMetadata.metadata, error)
+  def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata))
+}
 
-  def this(error: Short) =
-    this(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, error)
+case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = ErrorMapping.NoError) {
+  def offset = offsetMetadata.offset
 
-  def asTuple = (offset, metadata, error)
+  def metadata = offsetMetadata.metadata
 
-  override def toString = "OffsetMetadataAndError[%d,%s,%d]".format(offset, metadata, error)
+  override def toString = "[%s,ErrorCode %d]".format(offsetMetadata, error)
 }
 
 object OffsetMetadataAndError {
-  val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoOffsetsCommittedCode)
-  val OffsetsLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.OffsetsLoadInProgressCode)
-  val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NotCoordinatorForConsumerCode)
-  val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode)
+  val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NoError)
+  val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.OffsetsLoadInProgressCode)
+  val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.UnknownTopicOrPartitionCode)
+  val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NotCoordinatorForConsumerCode)
+
+  def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), ErrorMapping.NoError)
+
+  def apply(error: Short) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error)
+
+  def apply(offset: Long, metadata: String, error: Short) = new OffsetMetadataAndError(OffsetMetadata(offset, metadata), error)
 }
 
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index cca815a..b1cf0db 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -400,7 +400,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
     val offsetString = readDataMaybeNull(zkClient, dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
     offsetString match {
-      case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError))
+      case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong))
       case None => (topicPartition, OffsetMetadataAndError.NoOffset)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 35af98f..c33e848 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -149,12 +149,44 @@ class KafkaApis(val requestChannel: RequestChannel,
       val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId)
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
     }
+
+    // compute the retention time based on the request version:
+    // if it is before v2 or not specified by user, we can use the default retention
+    val offsetRetention =
+      if (offsetCommitRequest.versionId <= 1 ||
+        offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) {
+        offsetManager.config.offsetsRetentionMs
+      } else {
+        offsetCommitRequest.retentionMs
+      }
+
+    // commit timestamp is always set to now.
+    // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+    // expire timestamp is computed differently for v1 and v2.
+    //   - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
+    //   - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
+    //   - If v2 we use the default expiration timestamp
+    val currentTimestamp = SystemTime.milliseconds
+    val defaultExpireTimestamp = offsetRetention + currentTimestamp
+
+    val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata =>
+      offsetAndMetadata.copy(
+        commitTimestamp = currentTimestamp,
+        expireTimestamp = {
+          if (offsetAndMetadata.commitTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
+            defaultExpireTimestamp
+          else
+            offsetRetention + offsetAndMetadata.commitTimestamp
+        }
+      )
+    )
+
     // call offset manager to store offsets
     offsetManager.storeOffsets(
       offsetCommitRequest.groupId,
       offsetCommitRequest.consumerId,
       offsetCommitRequest.groupGenerationId,
-      offsetCommitRequest.requestInfo,
+      offsetData,
       sendResponseCallback)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 46d21c7..422451a 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -336,7 +336,7 @@ object KafkaConfig {
   val OffsetsTopicPartitionsDoc = "The number of partitions for the offset commit topic (should not change after deployment)"
   val OffsetsTopicSegmentBytesDoc = "The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads"
   val OffsetsTopicCompressionCodecDoc = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits"
-  val OffsetsRetentionMinutesDoc = "Offsets older than this retention period will be discarded"
+  val OffsetsRetentionMinutesDoc = "Log retention window in minutes for offsets topic"
   val OffsetsRetentionCheckIntervalMsDoc = "Frequency at which to check for stale offsets"
   val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " +
     "or this timeout is reached. This is similar to the producer request timeout."

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index dddef93..4db3384 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -402,6 +402,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       maxMetadataSize = config.offsetMetadataMaxSize,
       loadBufferSize = config.offsetsLoadBufferSize,
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
+      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
       offsetsTopicNumPartitions = config.offsetsTopicPartitions,
       offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
       offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index d05e14d..395b1db 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -42,7 +42,6 @@ import java.util.concurrent.TimeUnit
 import com.yammer.metrics.core.Gauge
 import org.I0Itec.zkclient.ZkClient
 
-
 /**
  * Configuration settings for in-built offset management
  * @param maxMetadataSize The maximum allowed metadata for any offset commit.
@@ -62,7 +61,7 @@ import org.I0Itec.zkclient.ZkClient
  */
 case class OffsetManagerConfig(maxMetadataSize: Int = OffsetManagerConfig.DefaultMaxMetadataSize,
                                loadBufferSize: Int = OffsetManagerConfig.DefaultLoadBufferSize,
-                               offsetsRetentionMs: Long = 24*60*60000L,
+                               offsetsRetentionMs: Long = OffsetManagerConfig.DefaultOffsetRetentionMs,
                                offsetsRetentionCheckIntervalMs: Long = OffsetManagerConfig.DefaultOffsetsRetentionCheckIntervalMs,
                                offsetsTopicNumPartitions: Int = OffsetManagerConfig.DefaultOffsetsTopicNumPartitions,
                                offsetsTopicSegmentBytes: Int = OffsetManagerConfig.DefaultOffsetsTopicSegmentBytes,
@@ -74,6 +73,7 @@ case class OffsetManagerConfig(maxMetadataSize: Int = OffsetManagerConfig.Defaul
 object OffsetManagerConfig {
   val DefaultMaxMetadataSize = 4096
   val DefaultLoadBufferSize = 5*1024*1024
+  val DefaultOffsetRetentionMs = 24*60*60*1000L
   val DefaultOffsetsRetentionCheckIntervalMs = 600000L
   val DefaultOffsetsTopicNumPartitions = 50
   val DefaultOffsetsTopicSegmentBytes = 100*1024*1024
@@ -120,9 +120,11 @@ class OffsetManager(val config: OffsetManagerConfig,
     debug("Compacting offsets cache.")
     val startMs = SystemTime.milliseconds
 
-    val staleOffsets = offsetsCache.filter(startMs - _._2.timestamp > config.offsetsRetentionMs)
+    val staleOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) =>
+      offsetAndMetadata.expireTimestamp < startMs
+    }
 
-    debug("Found %d stale offsets (older than %d ms).".format(staleOffsets.size, config.offsetsRetentionMs))
+    debug("Found %d expired offsets.".format(staleOffsets.size))
 
     // delete the stale offsets from the table and generate tombstone messages to remove them from the log
     val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) =>
@@ -380,8 +382,17 @@ class OffsetManager(val config: OffsetManagerConfig,
                   else
                     trace("Ignoring redundant tombstone for %s.".format(key))
                 } else {
+                  // special handling for version 0:
+                  // set the expiration time stamp as commit time stamp + server default retention time
                   val value = OffsetManager.readMessageValue(msgAndOffset.message.payload)
-                  putOffset(key, value)
+                  putOffset(key, value.copy (
+                    expireTimestamp = {
+                      if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
+                        value.commitTimestamp + config.offsetsRetentionMs
+                      else
+                        value.expireTimestamp
+                    }
+                  ))
                   trace("Loaded offset %s for %s.".format(value, key))
                 }
                 currOffset = msgAndOffset.nextOffset
@@ -446,7 +457,7 @@ object OffsetManager {
 
   private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
 
-  private val CURRENT_OFFSET_SCHEMA_VERSION = 0.toShort
+  private val CURRENT_OFFSET_SCHEMA_VERSION = 1.toShort
 
   private val OFFSET_COMMIT_KEY_SCHEMA_V0 = new Schema(new Field("group", STRING),
                                                        new Field("topic", STRING),
@@ -458,12 +469,24 @@ object OffsetManager {
   private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
                                                          new Field("metadata", STRING, "Associated metadata.", ""),
                                                          new Field("timestamp", INT64))
-  private val VALUE_OFFSET_FIELD = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val VALUE_METADATA_FIELD = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val VALUE_TIMESTAMP_FIELD = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
+
+  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
+                                                         new Field("metadata", STRING, "Associated metadata.", ""),
+                                                         new Field("commit_timestamp", INT64),
+                                                         new Field("expire_timestamp", INT64))
+
+  private val VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
+  private val VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
+  private val VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
+
+  private val VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
+  private val VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
+  private val VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
+  private val VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
 
   // map of versions to schemas
-  private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0))
+  private val OFFSET_SCHEMAS = Map(0 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V0),
+                                   1 -> KeyAndValueSchemas(OFFSET_COMMIT_KEY_SCHEMA_V0, OFFSET_COMMIT_VALUE_SCHEMA_V1))
 
   private val CURRENT_SCHEMA = schemaFor(CURRENT_OFFSET_SCHEMA_VERSION)
 
@@ -480,7 +503,7 @@ object OffsetManager {
    *
    * @return key for offset commit message
    */
-  def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = {
+  private def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = {
     val key = new Struct(CURRENT_SCHEMA.keySchema)
     key.set(KEY_GROUP_FIELD, group)
     key.set(KEY_TOPIC_FIELD, topic)
@@ -498,12 +521,13 @@ object OffsetManager {
    * @param offsetAndMetadata consumer's current offset and metadata
    * @return payload for offset commit message
    */
-  def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
+  private def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
+    // generate commit value with schema version 1
     val value = new Struct(CURRENT_SCHEMA.valueSchema)
-    value.set(VALUE_OFFSET_FIELD, offsetAndMetadata.offset)
-    value.set(VALUE_METADATA_FIELD, offsetAndMetadata.metadata)
-    value.set(VALUE_TIMESTAMP_FIELD, offsetAndMetadata.timestamp)
-
+    value.set(VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
+    value.set(VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
+    value.set(VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
+    value.set(VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
     byteBuffer.putShort(CURRENT_OFFSET_SCHEMA_VERSION)
     value.writeTo(byteBuffer)
@@ -516,7 +540,7 @@ object OffsetManager {
    * @param buffer input byte-buffer
    * @return an GroupTopicPartition object
    */
-  def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
+  private def readMessageKey(buffer: ByteBuffer): GroupTopicPartition = {
     val version = buffer.getShort()
     val keySchema = schemaFor(version).keySchema
     val key = keySchema.read(buffer).asInstanceOf[Struct]
@@ -534,19 +558,40 @@ object OffsetManager {
    * @param buffer input byte-buffer
    * @return an offset-metadata object from the message
    */
-  def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
-    if(buffer == null) { // tombstone
+  private def readMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
+    val structAndVersion = readMessageValueStruct(buffer)
+
+    if (structAndVersion.value == null) { // tombstone
       null
     } else {
+      if (structAndVersion.version == 0) {
+        val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
+        val metadata = structAndVersion.value.get(VALUE_METADATA_FIELD_V0).asInstanceOf[String]
+        val timestamp = structAndVersion.value.get(VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
+
+        OffsetAndMetadata(offset, metadata, timestamp)
+      } else if (structAndVersion.version == 1) {
+        val offset = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
+        val metadata = structAndVersion.value.get(VALUE_OFFSET_FIELD_V1).asInstanceOf[String]
+        val commitTimestamp = structAndVersion.value.get(VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
+        val expireTimestamp = structAndVersion.value.get(VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
+
+        OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
+      } else {
+        throw new IllegalStateException("Unknown offset message version")
+      }
+    }
+  }
+
+  private def readMessageValueStruct(buffer: ByteBuffer): MessageValueStructAndVersion = {
+    if(buffer == null) { // tombstone
+      MessageValueStructAndVersion(null, -1)
+    } else {
       val version = buffer.getShort()
       val valueSchema = schemaFor(version).valueSchema
       val value = valueSchema.read(buffer).asInstanceOf[Struct]
 
-      val offset = value.get(VALUE_OFFSET_FIELD).asInstanceOf[Long]
-      val metadata = value.get(VALUE_METADATA_FIELD).asInstanceOf[String]
-      val timestamp = value.get(VALUE_TIMESTAMP_FIELD).asInstanceOf[Long]
-
-      OffsetAndMetadata(offset, metadata, timestamp)
+      MessageValueStructAndVersion(value, version)
     }
   }
 
@@ -555,7 +600,7 @@ object OffsetManager {
   class OffsetsMessageFormatter extends MessageFormatter {
     def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
       val formattedKey = if (key == null) "NULL" else OffsetManager.readMessageKey(ByteBuffer.wrap(key)).toString
-      val formattedValue = if (value == null) "NULL" else OffsetManager.readMessageValue(ByteBuffer.wrap(value)).toString
+      val formattedValue = if (value == null) "NULL" else OffsetManager.readMessageValueStruct(ByteBuffer.wrap(value)).value.toString
       output.write(formattedKey.getBytes)
       output.write("::".getBytes)
       output.write(formattedValue.getBytes)
@@ -565,6 +610,8 @@ object OffsetManager {
 
 }
 
+case class MessageValueStructAndVersion(value: Struct, version: Short)
+
 case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
 
   def this(group: String, topic: String, partition: Int) =

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c527482..44f0026 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -65,7 +65,7 @@ object ReplicaManager {
 }
 
 class ReplicaManager(val config: KafkaConfig,
-                     time: Time,
+                     private val time: Time,
                      val zkClient: ZkClient,
                      scheduler: Scheduler,
                      val logManager: LogManager,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index fba852a..4cb803f 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -17,20 +17,21 @@
 
 package kafka.api
 
-import org.junit._
-import org.scalatest.junit.JUnitSuite
-import junit.framework.Assert._
-import java.nio.ByteBuffer
-import kafka.message.{Message, ByteBufferMessageSet}
+import kafka.common._
 import kafka.cluster.Broker
-import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError}
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.utils.SystemTime
+
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.protocol.ApiKeys
+
 import scala.Some
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.common.TopicAndPartition
-import org.apache.kafka.common.TopicPartition
+import java.nio.ByteBuffer
+
+import org.junit._
+import org.scalatest.junit.JUnitSuite
+import junit.framework.Assert._
 
 
 object SerializationTestUtils {
@@ -151,10 +152,23 @@ object SerializationTestUtils {
     new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1)
   }
 
+  def createTestOffsetCommitRequestV2: OffsetCommitRequest = {
+    new OffsetCommitRequest(
+      groupId = "group 1",
+      retentionMs = SystemTime.milliseconds,
+      requestInfo=collection.immutable.Map(
+      TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata"),
+      TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata)
+    ))
+  }
+
   def createTestOffsetCommitRequestV1: OffsetCommitRequest = {
-    new OffsetCommitRequest("group 1", collection.immutable.Map(
-      TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds),
-      TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds)
+    new OffsetCommitRequest(
+      versionId = 1,
+      groupId = "group 1",
+      requestInfo = collection.immutable.Map(
+      TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata", SystemTime.milliseconds),
+      TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata, SystemTime.milliseconds)
     ))
   }
 
@@ -163,8 +177,8 @@ object SerializationTestUtils {
       versionId = 0,
       groupId = "group 1",
       requestInfo = collection.immutable.Map(
-        TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds),
-        TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds)
+        TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata", SystemTime.milliseconds),
+        TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata, SystemTime.milliseconds)
       ))
   }
 
@@ -183,7 +197,7 @@ object SerializationTestUtils {
   def createTestOffsetFetchResponse: OffsetFetchResponse = {
     new OffsetFetchResponse(collection.immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", ErrorMapping.NoError),
-      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode)
+      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode)
     ))
   }
 
@@ -232,6 +246,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse
   private val offsetCommitRequestV0 = SerializationTestUtils.createTestOffsetCommitRequestV0
   private val offsetCommitRequestV1 = SerializationTestUtils.createTestOffsetCommitRequestV1
+  private val offsetCommitRequestV2 = SerializationTestUtils.createTestOffsetCommitRequestV2
   private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse
   private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
@@ -250,7 +265,8 @@ class RequestResponseSerializationTest extends JUnitSuite {
       collection.immutable.Seq(leaderAndIsrRequest, leaderAndIsrResponse, stopReplicaRequest,
                                stopReplicaResponse, producerRequest, producerResponse,
                                fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest,
-                               topicMetadataResponse, offsetCommitRequestV0, offsetCommitRequestV1,
+                               topicMetadataResponse,
+                               offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2,
                                offsetCommitResponse, offsetFetchRequest, offsetFetchResponse,
                                consumerMetadataRequest, consumerMetadataResponse,
                                consumerMetadataResponseNoCoordinator, heartbeatRequest,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index e4d0435..7654275 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -17,28 +17,33 @@
 
 package kafka.server
 
-import java.io.File
-import kafka.utils._
-import junit.framework.Assert._
-import java.util.Properties
+import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest}
 import kafka.consumer.SimpleConsumer
-import org.junit.{After, Before, Test}
+import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
+import kafka.utils._
+import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
+
+import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnit3Suite
-import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest}
-import kafka.utils.TestUtils._
-import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
+
+import java.util.Properties
+import java.io.File
+
 import scala.util.Random
 import scala.collection._
 
+import junit.framework.Assert._
+
 class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
   val random: Random = new Random()
+  val brokerPort: Int = 9099
+  val group = "test-group"
+  val retentionCheckInterval: Long = 100L
   var logDir: File = null
   var topicLogDir: File = null
   var server: KafkaServer = null
   var logSize: Int = 100
-  val brokerPort: Int = 9099
-  val group = "test-group"
   var simpleConsumer: SimpleConsumer = null
   var time: Time = new MockTime()
 
@@ -46,6 +51,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
     val config: Properties = createBrokerConfig(1, brokerPort)
+    config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
+    config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString)
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     time = new MockTime()
@@ -89,7 +96,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
 
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(topicAndPartition).get.error)
-    assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata)
+    assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata)
     assertEquals(42L, fetchResponse.requestInfo.get(topicAndPartition).get.offset)
 
     // Commit a new offset
@@ -155,31 +162,37 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
 
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.error)
+
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error)
-    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
+
+    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
     assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
-    assertEquals(ErrorMapping.NoOffsetsCommittedCode, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
-    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
     assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
+
+    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
     assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get)
+
+    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
     assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get)
 
     assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
     assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)
     assertEquals("metadata three", fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.metadata)
-    assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata)
-    assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata)
-    assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata)
-    assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.metadata)
+
+    assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata)
+    assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata)
+    assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata)
+    assertEquals(OffsetMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.metadata)
 
     assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.offset)
     assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset)
     assertEquals(44L, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.offset)
     assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset)
-    assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
-    assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
-    assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.offset)
+
+    assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
+    assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
+    assertEquals(OffsetMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.offset)
   }
 
   @Test
@@ -204,6 +217,73 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
 
     assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get)
+  }
+
+  @Test
+  def testOffsetExpiration() {
+    // set up topic partition
+    val topic = "topic"
+    val topicPartition = TopicAndPartition(topic, 0)
+    createTopic(zkClient, topic, servers = Seq(server), numPartitions = 1)
+
+    val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0)))
+
+    // v0 version commit request with commit timestamp set to -1
+    // should not expire
+    val commitRequest0 = OffsetCommitRequest(
+      groupId = group,
+      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata", -1L)),
+      versionId = 0
+    )
+    assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
+    Thread.sleep(retentionCheckInterval * 2)
+    assertEquals(1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
+
+    // v1 version commit request with commit timestamp set to -1
+    // should not expire
+    val commitRequest1 = OffsetCommitRequest(
+      groupId = group,
+      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(2L, "metadata", -1L)),
+      versionId = 1
+    )
+    assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest1).commitStatus.get(topicPartition).get)
+    Thread.sleep(retentionCheckInterval * 2)
+    assertEquals(2L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
+
+    // v1 version commit request with commit timestamp set to now - two days
+    // should expire
+    val commitRequest2 = OffsetCommitRequest(
+      groupId = group,
+      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata", SystemTime.milliseconds - 2*24*60*60*1000L)),
+      versionId = 1
+    )
+    assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get)
+    Thread.sleep(retentionCheckInterval * 2)
+    assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
+
+    // v2 version commit request with retention time set to 1 hour
+    // should not expire
+    val commitRequest3 = OffsetCommitRequest(
+      groupId = group,
+      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(4L, "metadata", -1L)),
+      versionId = 2,
+      retentionMs = 1000 * 60 * 60L
+    )
+    assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest3).commitStatus.get(topicPartition).get)
+    Thread.sleep(retentionCheckInterval * 2)
+    assertEquals(4L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
+
+    // v2 version commit request with retention time set to 0 second
+    // should expire
+    val commitRequest4 = OffsetCommitRequest(
+      groupId = "test-group",
+      requestInfo = immutable.Map(TopicAndPartition(topic, 0) -> OffsetAndMetadata(5L, "metadata", -1L)),
+      versionId = 2,
+      retentionMs = 0L
+    )
+    assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest4).commitStatus.get(topicPartition).get)
+    Thread.sleep(retentionCheckInterval * 2)
+    assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5df2a8e/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index b46daa4..71317eb 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -163,10 +163,10 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
       server.shutdown()
       server.awaitShutdown()
       server.shutdown()
-      assertTrue(true);
+      assertTrue(true)
     }
     catch{
-      case ex => fail()
+      case ex: Throwable => fail()
     }
   }
 }


Mime
View raw message