kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2068 Phase 1; Merge in KAFKA-1841; reviewed by Jun Rao
Date Mon, 04 May 2015 00:26:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f148d8659 -> 161b1aa16


KAFKA-2068 Phase 1; Merge in KAFKA-1841; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/161b1aa1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/161b1aa1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/161b1aa1

Branch: refs/heads/trunk
Commit: 161b1aa16ef0821c15fc7283d87cd83336b28d1a
Parents: f148d86
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Sun May 3 17:26:20 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun May 3 17:26:20 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/Protocol.java  |  42 +++++-
 .../common/requests/OffsetCommitRequest.java    |   7 +-
 .../scala/kafka/api/OffsetCommitRequest.scala   |   9 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |   2 +-
 .../kafka/common/OffsetMetadataAndError.scala   |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 145 +++++++++++++------
 .../api/RequestResponseSerializationTest.scala  |   4 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |  23 +--
 8 files changed, 161 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/161b1aa1/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index d53fe45..3dc8b01 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -118,6 +118,16 @@ public class Protocol {
                                                                                new Field("offset",
                                                                                         
INT64,
                                                                                         
"Message offset to be committed."),
+                                                                               new Field("metadata",
+                                                                                        
STRING,
+                                                                                        
"Any associated metadata the client wants to keep."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
+                                                                                        
INT32,
+                                                                                        
"Topic partition id."),
+                                                                               new Field("offset",
+                                                                                        
INT64,
+                                                                                        
"Message offset to be committed."),
                                                                                new Field("timestamp",
                                                                                         
INT64,
                                                                                         
"Timestamp of the commit"),
@@ -125,7 +135,7 @@ public class Protocol {
                                                                                         
STRING,
                                                                                         
"Any associated metadata the client wants to keep."));
 
-    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
+    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(new Field("partition",
                                                                                         
INT32,
                                                                                         
"Topic partition id."),
                                                                                new Field("offset",
@@ -149,6 +159,13 @@ public class Protocol {
                                                                                      new
ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
                                                                                      "Partitions
to commit offsets."));
 
+    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V2 = new Schema(new Field("topic",
+                                                                                     STRING,
+                                                                                     "Topic
to commit."),
+                                                                           new Field("partitions",
+                                                                                     new
ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2),
+                                                                                     "Partitions
to commit offsets."));
+
     public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
                                                                                STRING,
                                                                                "The consumer
group id."),
@@ -166,7 +183,7 @@ public class Protocol {
                                                                                STRING,
                                                                                "The consumer
id assigned by the group coordinator."),
                                                                      new Field("topics",
-                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
                                                                                "Topics to
commit offsets."));
 
     public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id",
@@ -182,7 +199,7 @@ public class Protocol {
                                                                                INT64,
                                                                                "Time period
in ms to retain the offset."),
                                                                      new Field("topics",
-                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
+                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
                                                                                "Topics to
commit offsets."));
 
     public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
@@ -199,10 +216,20 @@ public class Protocol {
                                                                                 new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
 
     public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0,
OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2};
+
     /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
-    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0,
OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+    public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
+    public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
+
+    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0,
OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2};
 
     /* Offset fetch api */
+
+    /*
+     * Wire formats of version 0 and 1 are the same, but with different functionality.
+     * Version 0 will read the offsets from ZK;
+     * Version 1 will read the offsets from Kafka.
+     */
     public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
                                                                                         INT32,
                                                                                         "Topic
partition id."));
@@ -239,8 +266,11 @@ public class Protocol {
     public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
                                                                                new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
 
-    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0};
-    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0};
+    public static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
+    public static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
+
+    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0,
OFFSET_FETCH_REQUEST_V1};
+    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0,
OFFSET_FETCH_RESPONSE_V1};
 
     /* List offset api */
     public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",

http://git-wip-us.apache.org/repos/asf/kafka/blob/161b1aa1/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index a0e1976..8bf6cbb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -68,7 +68,7 @@ public class OffsetCommitRequest extends AbstractRequest {
 
     public static final class PartitionData {
         @Deprecated
-        public final long timestamp;                // for V0, V1
+        public final long timestamp;                // for V1
 
         public final long offset;
         public final String metadata;
@@ -93,6 +93,7 @@ public class OffsetCommitRequest extends AbstractRequest {
     @Deprecated
     public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData)
{
         super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
+
         initCommonFields(groupId, offsetData);
         this.groupId = groupId;
         this.generationId = DEFAULT_GENERATION_ID;
@@ -159,7 +160,7 @@ public class OffsetCommitRequest extends AbstractRequest {
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
                 partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
-                // Only for v0 and v1
+                // Only for v1
                 if (partitionData.hasField(TIMESTAMP_KEY_NAME))
                     partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
                 partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
@@ -203,7 +204,7 @@ public class OffsetCommitRequest extends AbstractRequest {
                 long offset = partitionDataStruct.getLong(COMMIT_OFFSET_KEY_NAME);
                 String metadata = partitionDataStruct.getString(METADATA_KEY_NAME);
                 PartitionData partitionOffset;
-                // This field only exists in v0 and v1
+                // This field only exists in v1
                 if (partitionDataStruct.hasField(TIMESTAMP_KEY_NAME)) {
                     long timestamp = partitionDataStruct.getLong(TIMESTAMP_KEY_NAME);
                     partitionOffset = new PartitionData(offset, timestamp, metadata);

http://git-wip-us.apache.org/repos/asf/kafka/blob/161b1aa1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index cf8e6ac..317daed 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -69,7 +69,8 @@ object OffsetCommitRequest extends Logging {
         val partitionId = buffer.getInt
         val offset = buffer.getLong
         val timestamp = {
-          if (versionId <= 1)
+          // version 1 specific field
+          if (versionId == 1)
             buffer.getLong
           else
             org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
@@ -126,8 +127,8 @@ case class OffsetCommitRequest(groupId: String,
       t1._2.foreach( t2 => {
         buffer.putInt(t2._1.partition)
         buffer.putLong(t2._2.offset)
-        // version 0 and 1 specific data
-        if (versionId <= 1)
+        // version 1 specific data
+        if (versionId == 1)
           buffer.putLong(t2._2.commitTimestamp)
         writeShortString(buffer, t2._2.metadata)
       })
@@ -151,7 +152,7 @@ case class OffsetCommitRequest(groupId: String,
         innerCount +
         4 /* partition */ +
         8 /* offset */ +
-        (if (versionId <= 1) 8 else 0) /* timestamp */ +
+        (if (versionId == 1) 8 else 0) /* timestamp */ +
         shortStringLength(offsetAndMetadata._2.metadata)
       })
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/161b1aa1/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index 67811a7..fa8bd6a 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -29,7 +29,7 @@ import scala.Some
 import java.nio.ByteBuffer
 
 object OffsetFetchRequest extends Logging {
-  val CurrentVersion: Short = 0
+  val CurrentVersion: Short = 1
   val DefaultClientId = ""
 
   def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/161b1aa1/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 139913f..6b4242c 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -34,9 +34,9 @@ case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
                              commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
                              expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
{
 
-  def offset() = offsetMetadata.offset
+  def offset = offsetMetadata.offset
 
-  def metadata() = offsetMetadata.metadata
+  def metadata = offsetMetadata.metadata
 
   override def toString = "[%s,CommitTime %d,ExpirationTime %d]".format(offsetMetadata, commitTimestamp,
expireTimestamp)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/161b1aa1/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b4004aa..417960d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -28,7 +28,7 @@ import kafka.coordinator.ConsumerCoordinator
 import kafka.log._
 import kafka.network._
 import kafka.network.RequestChannel.Response
-import kafka.utils.{SystemTime, Logging}
+import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging}
 
 import scala.collection._
 
@@ -161,44 +161,70 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
     }
 
-    // compute the retention time based on the request version:
-    // if it is before v2 or not specified by user, we can use the default retention
-    val offsetRetention =
-      if (offsetCommitRequest.versionId <= 1 ||
-        offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME)
{
-        offsetManager.config.offsetsRetentionMs
-      } else {
-        offsetCommitRequest.retentionMs
+    if (offsetCommitRequest.versionId == 0) {
+      // for version 0 always store offsets to ZK
+      val responseInfo = offsetCommitRequest.requestInfo.map {
+        case (topicAndPartition, metaAndError) => {
+          val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
+          try {
+            if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size
<= 0) {
+              (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode)
+            } else if (metaAndError.metadata != null && metaAndError.metadata.length
> config.offsetMetadataMaxSize) {
+              (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+            } else {
+              ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
+                topicAndPartition.partition, metaAndError.offset.toString)
+              (topicAndPartition, ErrorMapping.NoError)
+            }
+          } catch {
+            case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+          }
+        }
       }
 
-    // commit timestamp is always set to now.
-    // "default" expiration timestamp is now + retention (and retention may be overridden
if v2)
-    // expire timestamp is computed differently for v1 and v2.
-    //   - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
-    //   - If v1 and explicit commit timestamp is provided we calculate retention from that
explicit commit timestamp
-    //   - If v2 we use the default expiration timestamp
-    val currentTimestamp = SystemTime.milliseconds
-    val defaultExpireTimestamp = offsetRetention + currentTimestamp
-
-    val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata =>
-      offsetAndMetadata.copy(
-        commitTimestamp = currentTimestamp,
-        expireTimestamp = {
-          if (offsetAndMetadata.commitTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
-            defaultExpireTimestamp
-          else
-            offsetRetention + offsetAndMetadata.commitTimestamp
+      sendResponseCallback(responseInfo)
+    } else {
+      // for version 1 and beyond store offsets in offset manager
+
+      // compute the retention time based on the request version:
+      // if it is v1 or not specified by user, we can use the default retention
+      val offsetRetention =
+        if (offsetCommitRequest.versionId <= 1 ||
+          offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME)
{
+          offsetManager.config.offsetsRetentionMs
+        } else {
+          offsetCommitRequest.retentionMs
         }
+
+      // commit timestamp is always set to now.
+      // "default" expiration timestamp is now + retention (and retention may be overridden
if v2)
+      // expire timestamp is computed differently for v1 and v2.
+      //   - If v1 and no explicit commit timestamp is provided we use default expiration
timestamp.
+      //   - If v1 and explicit commit timestamp is provided we calculate retention from
that explicit commit timestamp
+      //   - If v2 we use the default expiration timestamp
+      val currentTimestamp = SystemTime.milliseconds
+      val defaultExpireTimestamp = offsetRetention + currentTimestamp
+
+      val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata =>
+        offsetAndMetadata.copy(
+          commitTimestamp = currentTimestamp,
+          expireTimestamp = {
+            if (offsetAndMetadata.commitTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
+              defaultExpireTimestamp
+            else
+              offsetRetention + offsetAndMetadata.commitTimestamp
+          }
+        )
       )
-    )
-
-    // call offset manager to store offsets
-    offsetManager.storeOffsets(
-      offsetCommitRequest.groupId,
-      offsetCommitRequest.consumerId,
-      offsetCommitRequest.groupGenerationId,
-      offsetData,
-      sendResponseCallback)
+
+      // call offset manager to store offsets
+      offsetManager.storeOffsets(
+        offsetCommitRequest.groupId,
+        offsetCommitRequest.consumerId,
+        offsetCommitRequest.groupGenerationId,
+        offsetData,
+        sendResponseCallback)
+    }
   }
 
   /**
@@ -449,21 +475,46 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
     val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
 
-    val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition
=>
-      metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
-    )
-    val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition,
OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
-    val knownStatus =
-      if (knownTopicPartitions.size > 0)
-        offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
-      else
-        Map.empty[TopicAndPartition, OffsetMetadataAndError]
-    val status = unknownStatus ++ knownStatus
+    val response = if (offsetFetchRequest.versionId == 0) {
+      // version 0 reads offsets from ZK
+      val responseInfo = offsetFetchRequest.requestInfo.map( topicAndPartition => {
+        val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicAndPartition.topic)
+        try {
+          if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size
<= 0) {
+            (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)
+          } else {
+            val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir
+ "/" + topicAndPartition.partition)._1
+            payloadOpt match {
+              case Some(payload) => (topicAndPartition, OffsetMetadataAndError(payload.toLong))
+              case None => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)
+            }
+          }
+        } catch {
+          case e: Throwable =>
+            (topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,
+              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
+        }
+      })
 
-    val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)
+      OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId)
+    } else {
+      // version 1 reads offsets from Kafka
+      val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition
=>
+        metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
+      )
+      val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition,
OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
+      val knownStatus =
+        if (knownTopicPartitions.size > 0)
+          offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
+        else
+          Map.empty[TopicAndPartition, OffsetMetadataAndError]
+      val status = unknownStatus ++ knownStatus
+
+      OffsetFetchResponse(status, offsetFetchRequest.correlationId)
+    }
 
     trace("Sending offset fetch response %s for correlation id %d to client %s."
-          .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
+      .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/161b1aa1/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index dbf9f48..5717165 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -204,8 +204,8 @@ object SerializationTestUtils {
       versionId = 0,
       groupId = "group 1",
       requestInfo = collection.immutable.Map(
-        TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata", SystemTime.milliseconds),
-        TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata,
SystemTime.milliseconds)
+        TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata"),
+        TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata)
       ))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/161b1aa1/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 652208a..528525b 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -85,7 +85,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create the topic
     createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment,
servers = Seq(server))
 
-    val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition ->
OffsetAndMetadata(offset=42L)))
+    val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition ->
OffsetAndMetadata(offset = 42L)))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
     assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
@@ -227,19 +227,22 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness
{
 
     val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0)))
 
-    // v0 version commit request with commit timestamp set to -1
-    // should not expire
+    // v0 version commit request
+    // committed offset should not exist with fetch version 1 since it was stored in ZK
     val commitRequest0 = OffsetCommitRequest(
       groupId = group,
-      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata",
-1L)),
+      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata")),
       versionId = 0
     )
     assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
-    Thread.sleep(retentionCheckInterval * 2)
-    assertEquals(1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
+    assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
+
+    // committed offset should exist with fetch version 0
+    assertEquals(1L, simpleConsumer.fetchOffsets(OffsetFetchRequest(group, Seq(TopicAndPartition(topic,
0)), versionId = 0)).requestInfo.get(topicPartition).get.offset)
+
 
     // v1 version commit request with commit timestamp set to -1
-    // should not expire
+    // committed offset should not expire
     val commitRequest1 = OffsetCommitRequest(
       groupId = group,
       requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(2L, "metadata",
-1L)),
@@ -250,7 +253,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(2L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
     // v1 version commit request with commit timestamp set to now - two days
-    // should expire
+    // committed offset should expire
     val commitRequest2 = OffsetCommitRequest(
       groupId = group,
       requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata",
SystemTime.milliseconds - 2*24*60*60*1000L)),
@@ -261,7 +264,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
     // v2 version commit request with retention time set to 1 hour
-    // should not expire
+    // committed offset should not expire
     val commitRequest3 = OffsetCommitRequest(
       groupId = group,
       requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(4L, "metadata",
-1L)),
@@ -273,7 +276,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(4L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset)
 
     // v2 version commit request with retention time set to 0 second
-    // should expire
+    // committed offset should expire
     val commitRequest4 = OffsetCommitRequest(
       groupId = "test-group",
       requestInfo = immutable.Map(TopicAndPartition(topic, 0) -> OffsetAndMetadata(5L,
"metadata", -1L)),


Mime
View raw message