kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-2070; Replace Offset{Request,Response} with o.a.k.c requests equivalent
Date Fri, 11 Dec 2015 17:37:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1dcafadef -> 01f3e59ef


KAFKA-2070; Replace Offset{Request,Response} with o.a.k.c requests equivalent

…uivalent

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #663 from granthenke/offset-list


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01f3e59e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01f3e59e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01f3e59e

Branch: refs/heads/trunk
Commit: 01f3e59ef91b348a5fab805bc6915591dd490a16
Parents: 1dcafad
Author: Grant Henke <granthenke@gmail.com>
Authored: Fri Dec 11 09:37:09 2015 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Fri Dec 11 09:37:09 2015 -0800

----------------------------------------------------------------------
 .../common/requests/ListOffsetRequest.java      |  9 ++-
 .../scala/kafka/network/RequestChannel.scala    |  1 -
 .../src/main/scala/kafka/server/KafkaApis.scala | 66 ++++++++++++--------
 .../scala/unit/kafka/server/LogOffsetTest.scala | 11 ++--
 4 files changed, 51 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/01f3e59e/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 8dfd811..2a91637 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -34,7 +34,10 @@ public class ListOffsetRequest extends AbstractRequest {
 
     public static final long EARLIEST_TIMESTAMP = -2L;
     public static final long LATEST_TIMESTAMP = -1L;
-    
+
+    public static final int CONSUMER_REPLICA_ID = -1;
+    public static final int DEBUGGING_REPLICA_ID = -2;
+
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
     private static final String REPLICA_ID_KEY_NAME = "replica_id";
     private static final String TOPICS_KEY_NAME = "topics";
@@ -60,9 +63,9 @@ public class ListOffsetRequest extends AbstractRequest {
             this.maxNumOffsets = maxNumOffsets;
         }
     }
-    
+
     public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
-        this(-1, offsetData);
+        this(CONSUMER_REPLICA_ID, offsetData);
     }
 
     public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData)
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/01f3e59e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 2fce621..719588d 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -68,7 +68,6 @@ object RequestChannel extends Logging {
     private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
       Map(ApiKeys.PRODUCE.id -> ProducerRequest.readFrom,
         ApiKeys.FETCH.id -> FetchRequest.readFrom,
-        ApiKeys.LIST_OFFSETS.id -> OffsetRequest.readFrom,
         ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
         ApiKeys.LEADER_AND_ISR.id -> LeaderAndIsrRequest.readFrom,
         ApiKeys.STOP_REPLICA.id -> StopReplicaRequest.readFrom,

http://git-wip-us.apache.org/repos/asf/kafka/blob/01f3e59e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ce5d2c6..2f619a4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.nio.ByteBuffer
+import java.lang.{Long => JLong}
 
 import kafka.admin.AdminUtils
 import kafka.api._
@@ -33,11 +34,15 @@ import kafka.security.auth.{Authorizer, ClusterAction, Group, Create,
Describe,
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
-import org.apache.kafka.common.requests.{GroupCoordinatorRequest, GroupCoordinatorResponse,
ListGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse,
JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader,
ResponseSend, SyncGroupRequest, SyncGroupResponse}
+import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest,
GroupCoordinatorResponse, ListGroupsResponse,
+DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest,
JoinGroupResponse,
+LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.Node
+import org.apache.kafka.common.{TopicPartition, Node}
 
 import scala.collection._
+import scala.collection.JavaConverters._
+
 /**
  * Logic to handle the various Kafka requests
  */
@@ -443,28 +448,32 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle an offset request
    */
   def handleOffsetRequest(request: RequestChannel.Request) {
-    val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body.asInstanceOf[ListOffsetRequest]
 
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.requestInfo.partition
 {
-      case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(Topic,
topicAndPartition.topic))
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.offsetData.asScala.partition
{
+      case (topicPartition, _) => authorize(request.session, Describe, new Resource(Topic,
topicPartition.topic))
     }
 
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => PartitionOffsetsResponse(ErrorMapping.TopicAuthorizationCode,
Nil))
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
+      new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava)
+    )
 
     val responseMap = authorizedRequestInfo.map(elem => {
-      val (topicAndPartition, partitionOffsetRequestInfo) = elem
+      val (topicPartition, partitionData) = elem
       try {
         // ensure leader exists
-        val localReplica = if (!offsetRequest.isFromDebuggingClient)
-          replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
+        val localReplica = if (offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          replicaManager.getLeaderReplicaIfLocal(topicPartition.topic, topicPartition.partition)
         else
-          replicaManager.getReplicaOrException(topicAndPartition.topic, topicAndPartition.partition)
+          replicaManager.getReplicaOrException(topicPartition.topic, topicPartition.partition)
         val offsets = {
           val allOffsets = fetchOffsets(replicaManager.logManager,
-                                        topicAndPartition,
-                                        partitionOffsetRequestInfo.time,
-                                        partitionOffsetRequestInfo.maxNumOffsets)
-          if (!offsetRequest.isFromOrdinaryClient) {
+                                        topicPartition,
+                                        partitionData.timestamp,
+                                        partitionData.maxNumOffsets)
+          if (offsetRequest.replicaId != ListOffsetRequest.CONSUMER_REPLICA_ID) {
             allOffsets
           } else {
             val hw = localReplica.highWatermark.messageOffset
@@ -474,35 +483,38 @@ class KafkaApis(val requestChannel: RequestChannel,
               allOffsets
           }
         }
-        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
+        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, offsets.map(new
JLong(_)).asJava))
       } catch {
         // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are
special cased since these error messages
         // are typically transient and there is no value in logging the entire stack trace
for the same
         case utpe: UnknownTopicOrPartitionException =>
           debug("Offset request with correlation id %d from client %s on partition %s failed
due to %s".format(
-               offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition, utpe.getMessage))
-          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]),
Nil) )
+               correlationId, clientId, topicPartition, utpe.getMessage))
+          (topicPartition,  new ListOffsetResponse.PartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]),
List[JLong]().asJava))
         case nle: NotLeaderForPartitionException =>
           debug("Offset request with correlation id %d from client %s on partition %s failed
due to %s".format(
-               offsetRequest.correlationId, offsetRequest.clientId, topicAndPartition,nle.getMessage))
-          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]),
Nil) )
+               correlationId, clientId, topicPartition,nle.getMessage))
+          (topicPartition,  new ListOffsetResponse.PartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]),
List[JLong]().asJava))
         case e: Throwable =>
           error("Error while responding to offset request", e)
-          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
Nil) )
+          (topicPartition,  new ListOffsetResponse.PartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
List[JLong]().asJava))
       }
     })
 
     val mergedResponseMap = responseMap ++ unauthorizedResponseStatus
-    val response = OffsetResponse(offsetRequest.correlationId, mergedResponseMap)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId,
response)))
+
+    val responseHeader = new ResponseHeader(correlationId)
+    val response = new ListOffsetResponse(mergedResponseMap.asJava)
+
+    requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId,
responseHeader, response)))
   }
 
-  def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp:
Long, maxNumOffsets: Int): Seq[Long] = {
-    logManager.getLog(topicAndPartition) match {
+  def fetchOffsets(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long,
maxNumOffsets: Int): Seq[Long] = {
+    logManager.getLog(TopicAndPartition(topicPartition.topic, topicPartition.partition))
match {
       case Some(log) =>
         fetchOffsetsBefore(log, timestamp, maxNumOffsets)
       case None =>
-        if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
+        if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP || timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
           Seq(0L)
         else
           Nil
@@ -524,9 +536,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     var startIndex = -1
     timestamp match {
-      case OffsetRequest.LatestTime =>
+      case ListOffsetRequest.LATEST_TIMESTAMP =>
         startIndex = offsetTimeArray.length - 1
-      case OffsetRequest.EarliestTime =>
+      case ListOffsetRequest.EARLIEST_TIMESTAMP =>
         startIndex = 0
       case _ =>
         var isFound = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/01f3e59e/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index babf6fb..3f89a8a 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -19,6 +19,7 @@ package kafka.server
 
 import java.io.File
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import java.util.{Random, Properties}
 import kafka.consumer.SimpleConsumer
@@ -33,7 +34,7 @@ import org.junit.Before
 import org.junit.Test
 
 class LogOffsetTest extends ZooKeeperTestHarness {
-  val random = new Random() 
+  val random = new Random()
   var logDir: File = null
   var topicLogDir: File = null
   var server: KafkaServer = null
@@ -89,7 +90,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
-    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.LatestTime,
10)
+    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime,
10)
     assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be
elected")
@@ -152,7 +153,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
 
     val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions
with the fs
 
-    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), now,
10)
+    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), now,
10)
     assertEquals(Seq(20L, 18L, 15L, 12L, 9L, 6L, 3L, 0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), "Leader should be
elected")
@@ -179,7 +180,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
-    val offsets = server.apis.fetchOffsets(logManager, TopicAndPartition(topic, part), OffsetRequest.EarliestTime,
10)
+    val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.EarliestTime,
10)
 
     assertEquals(Seq(0L), offsets)
 


Mime
View raw message