kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-1729; Add constructor to javaapi to allow constructing explicitly versioned offset commit requests; reviewed by Jun Rao
Date Tue, 03 Feb 2015 02:17:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 75a286e43 -> f1ba4ff87


KAFKA-1729; Add constructor to javaapi to allow constructing explicitly versioned offset commit
requests; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f1ba4ff8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f1ba4ff8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f1ba4ff8

Branch: refs/heads/trunk
Commit: f1ba4ff87e60a6dc06fb3d16e84fa94f8c4a7d9e
Parents: 75a286e
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Mon Feb 2 18:17:15 2015 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Mon Feb 2 18:17:15 2015 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/api/OffsetCommitResponse.scala   |  4 +++-
 .../kafka/javaapi/ConsumerMetadataResponse.scala      |  6 ++++++
 .../scala/kafka/javaapi/OffsetCommitRequest.scala     | 14 ++++++++++++--
 .../scala/kafka/javaapi/OffsetCommitResponse.scala    |  9 +++++++++
 .../scala/kafka/javaapi/OffsetFetchResponse.scala     |  5 +++++
 5 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f1ba4ff8/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 624a1c1..116547a 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -20,7 +20,7 @@ package kafka.api
 import java.nio.ByteBuffer
 
 import kafka.utils.Logging
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
 
 object OffsetCommitResponse extends Logging {
   val CurrentVersion: Short = 0
@@ -47,6 +47,8 @@ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
 
   lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
 
+  def hasError = commitStatus.exists{ case (topicAndPartition, errorCode) => errorCode
!= ErrorMapping.NoError }
+
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)
     buffer.putInt(commitStatusGroupedByTopic.size)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1ba4ff8/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
index 1b28861..d281bb3 100644
--- a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
@@ -17,6 +17,8 @@
 
 package kafka.javaapi
 
+import java.nio.ByteBuffer
+
 import kafka.cluster.Broker
 
 class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse)
{
@@ -40,3 +42,7 @@ class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadat
   override def toString = underlying.toString
 
 }
+
+object ConsumerMetadataResponse {
+  def readFrom(buffer: ByteBuffer) = new ConsumerMetadataResponse(kafka.api.ConsumerMetadataResponse.readFrom(buffer))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1ba4ff8/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 873f575..456c3c4 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -22,7 +22,8 @@ import kafka.common.{OffsetAndMetadata, TopicAndPartition}
 class OffsetCommitRequest(groupId: String,
                           requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
                           correlationId: Int,
-                          clientId: String) {
+                          clientId: String,
+                          versionId: Short) {
   val underlying = {
     val scalaMap: collection.immutable.Map[TopicAndPartition, OffsetAndMetadata] = {
       import collection.JavaConversions._
@@ -32,12 +33,21 @@ class OffsetCommitRequest(groupId: String,
     kafka.api.OffsetCommitRequest(
       groupId = groupId,
       requestInfo = scalaMap,
-      versionId = 0, // binds to version 0 so that it commits to Zookeeper
+      versionId = versionId,
       correlationId = correlationId,
       clientId = clientId
     )
   }
 
+  def this(groupId: String,
+           requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
+           correlationId: Int,
+           clientId: String) {
+
+    // by default bind to version 0 so that it commits to Zookeeper
+    this(groupId, requestInfo, correlationId, clientId, 0)
+  }
+
 
   override def toString = underlying.toString
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1ba4ff8/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
index c2d3d11..b222329 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -17,6 +17,8 @@
 
 package kafka.javaapi
 
+import java.nio.ByteBuffer
+
 import kafka.common.TopicAndPartition
 import collection.JavaConversions
 
@@ -27,5 +29,12 @@ class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitRespons
     underlying.commitStatus
   }
 
+  def hasError = underlying.hasError
+
+  def errorCode(topicAndPartition: TopicAndPartition) = underlying.commitStatus(topicAndPartition)
+
 }
 
+object OffsetCommitResponse {
+  def readFrom(buffer: ByteBuffer) = new OffsetCommitResponse(kafka.api.OffsetCommitResponse.readFrom(buffer))
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f1ba4ff8/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
index 60924d2..c4bdb12 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
@@ -17,6 +17,8 @@
 
 package kafka.javaapi
 
+import java.nio.ByteBuffer
+
 import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
 import collection.JavaConversions
 
@@ -29,3 +31,6 @@ class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse)
 
 }
 
+object OffsetFetchResponse {
+  def readFrom(buffer: ByteBuffer) = new OffsetFetchResponse(kafka.api.OffsetFetchResponse.readFrom(buffer))
+}


Mime
View raw message