kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2958: Remove duplicate API key mapping functionality
Date Tue, 08 Dec 2015 17:55:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 268392f5e -> 5bc90d454


KAFKA-2958: Remove duplicate API key mapping functionality

Author: Grant Henke <granthenke@gmail.com>

Reviewers: Gwen Shapira

Closes #637 from granthenke/api-keys


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5bc90d45
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5bc90d45
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5bc90d45

Branch: refs/heads/trunk
Commit: 5bc90d4542ca9d6cea3eac1e0a7e1bfa1fed15e7
Parents: 268392f
Author: Grant Henke <granthenke@gmail.com>
Authored: Tue Dec 8 09:55:46 2015 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Dec 8 09:55:46 2015 -0800

----------------------------------------------------------------------
 .../kafka/api/ControlledShutdownRequest.scala   |   3 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   6 +-
 .../kafka/api/GroupCoordinatorRequest.scala     |   5 +-
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |   9 +-
 .../scala/kafka/api/OffsetCommitRequest.scala   |   7 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |   9 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |   7 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   5 +-
 core/src/main/scala/kafka/api/RequestKeys.scala |  73 -----------
 .../scala/kafka/api/StopReplicaRequest.scala    |   5 +-
 .../scala/kafka/api/TopicMetadataRequest.scala  |   3 +-
 .../scala/kafka/api/UpdateMetadataRequest.scala |   8 +-
 .../kafka/javaapi/TopicMetadataRequest.scala    |   3 +-
 .../scala/kafka/network/RequestChannel.scala    |  40 ++++--
 .../scala/kafka/producer/SyncProducer.scala     |   5 +-
 .../main/scala/kafka/server/ConfigHandler.scala |   8 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  51 ++++----
 .../kafka/api/AuthorizerIntegrationTest.scala   | 122 +++++++++----------
 .../integration/kafka/api/QuotasTest.scala      |  13 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |   6 +-
 .../kafka/server/DynamicConfigChangeTest.scala  |   6 +-
 21 files changed, 175 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 6fb9e22..f827d54 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -24,6 +24,7 @@ import kafka.api.ApiUtils._
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
+import org.apache.kafka.common.protocol.ApiKeys
 
 object ControlledShutdownRequest extends Logging {
   val CurrentVersion = 1.shortValue
@@ -43,7 +44,7 @@ case class ControlledShutdownRequest(versionId: Short,
                                      correlationId: Int,
                                      clientId: Option[String],
                                      brokerId: Int)
-  extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){
+  extends RequestOrResponse(Some(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)){
 
   if (versionId > 0 && clientId.isEmpty)
     throw new IllegalArgumentException("`clientId` must be defined if `versionId` > 0")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 36e288f..ca47e75 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -26,6 +26,8 @@ import kafka.message.MessageSet
 
 import java.util.concurrent.atomic.AtomicInteger
 import java.nio.ByteBuffer
+import org.apache.kafka.common.protocol.ApiKeys
+
 import scala.collection.immutable.Map
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -65,7 +67,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
                         maxWait: Int = FetchRequest.DefaultMaxWait,
                         minBytes: Int = FetchRequest.DefaultMinBytes,
                         requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
-        extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
+        extends RequestOrResponse(Some(ApiKeys.FETCH.id)) {
 
   /**
    * Partitions the request info into a map of maps (one for each topic).

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
index 43e78f5..7e7b55c 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 import kafka.common.ErrorMapping
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
+import org.apache.kafka.common.protocol.ApiKeys
 
 object GroupCoordinatorRequest {
   val CurrentVersion = 0.shortValue
@@ -44,7 +45,7 @@ case class GroupCoordinatorRequest(group: String,
                                    versionId: Short = GroupCoordinatorRequest.CurrentVersion,
                                    correlationId: Int = 0,
                                    clientId: String = GroupCoordinatorRequest.DefaultClientId)
-  extends RequestOrResponse(Some(RequestKeys.GroupCoordinatorKey)) {
+  extends RequestOrResponse(Some(ApiKeys.GROUP_COORDINATOR.id)) {
 
   def sizeInBytes =
     2 + /* versionId */
@@ -77,4 +78,4 @@ case class GroupCoordinatorRequest(group: String,
     consumerMetadataRequest.append("; Group: " + group)
     consumerMetadataRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index c2584e0..95451eb 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -27,6 +27,7 @@ import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils._
+import org.apache.kafka.common.protocol.ApiKeys
 
 import scala.collection.Set
 
@@ -88,7 +89,7 @@ case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControlle
       allReplicas.size * 4
     size
   }
-  
+
   override def toString(): String = {
     val partitionStateInfo = new StringBuilder
     partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
@@ -137,7 +138,7 @@ case class LeaderAndIsrRequest (versionId: Short,
                                 controllerEpoch: Int,
                                 partitionStateInfos: Map[(String, Int), PartitionStateInfo],
                                 leaders: Set[BrokerEndPoint])
-    extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
+    extends RequestOrResponse(Some(ApiKeys.LEADER_AND_ISR.id)) {
 
   def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[BrokerEndPoint], controllerId: Int,
            controllerEpoch: Int, correlationId: Int, clientId: String) = {
@@ -164,7 +165,7 @@ case class LeaderAndIsrRequest (versionId: Short,
   def sizeInBytes(): Int = {
     var size =
       2 /* version id */ +
-      4 /* correlation id */ + 
+      4 /* correlation id */ +
       (2 + clientId.length) /* client id */ +
       4 /* controller id */ +
       4 /* controller epoch */ +

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 75067cf..534eedf 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -24,6 +24,7 @@ import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
+import org.apache.kafka.common.protocol.ApiKeys
 
 import scala.collection._
 
@@ -40,7 +41,7 @@ object OffsetCommitRequest extends Logging {
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
 
-    // Read the OffsetRequest 
+    // Read the OffsetRequest
     val groupId = readShortString(buffer)
 
     // version 1 and 2 specific fields
@@ -95,7 +96,7 @@ case class OffsetCommitRequest(groupId: String,
                                groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID,
                                memberId: String =  org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_MEMBER_ID,
                                retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME)
-    extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
+    extends RequestOrResponse(Some(ApiKeys.OFFSET_COMMIT.id)) {
 
   assert(versionId == 0 || versionId == 1 || versionId == 2,
          "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0, 1 or 2.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index a83e147..f0a3c9c 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -24,6 +24,7 @@ import kafka.common.{TopicAndPartition, _}
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
+import org.apache.kafka.common.protocol.ApiKeys
 
 object OffsetFetchRequest extends Logging {
   val CurrentVersion: Short = 1
@@ -55,10 +56,10 @@ case class OffsetFetchRequest(groupId: String,
                               versionId: Short = OffsetFetchRequest.CurrentVersion,
                               correlationId: Int = 0,
                               clientId: String = OffsetFetchRequest.DefaultClientId)
-    extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) {
+    extends RequestOrResponse(Some(ApiKeys.OFFSET_FETCH.id)) {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
-  
+
   def writeTo(buffer: ByteBuffer) {
     // Write envelope
     buffer.putShort(versionId)
@@ -81,7 +82,7 @@ case class OffsetFetchRequest(groupId: String,
     2 + /* versionId */
     4 + /* correlationId */
     shortStringLength(clientId) +
-    shortStringLength(groupId) + 
+    shortStringLength(groupId) +
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
       count + shortStringLength(t._1) + /* topic */

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index d2c1c95..a2ef7eb 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -23,6 +23,7 @@ import kafka.api.ApiUtils._
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
+import org.apache.kafka.common.protocol.ApiKeys
 
 
 object OffsetRequest {
@@ -61,7 +62,7 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
                          correlationId: Int = 0,
                          clientId: String = OffsetRequest.DefaultClientId,
                          replicaId: Int = Request.OrdinaryConsumerId)
-    extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
+    extends RequestOrResponse(Some(ApiKeys.LIST_OFFSETS.id)) {
 
   def this(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo], correlationId: Int, replicaId: Int) = this(requestInfo, OffsetRequest.CurrentVersion, correlationId, OffsetRequest.DefaultClientId, replicaId)
 
@@ -132,4 +133,4 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
       offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     offsetRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 7fb143e..a697dc6 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -24,6 +24,7 @@ import kafka.common._
 import kafka.message._
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
+import org.apache.kafka.common.protocol.ApiKeys
 
 object ProducerRequest {
   val CurrentVersion = 1.shortValue
@@ -59,7 +60,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
                            requiredAcks: Short,
                            ackTimeoutMs: Int,
                            data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
-    extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
+    extends RequestOrResponse(Some(ApiKeys.PRODUCE.id)) {
 
   /**
    * Partitions the data into a map of maps (one for each topic).

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
deleted file mode 100644
index 2363099..0000000
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import kafka.common.KafkaException
-import java.nio.ByteBuffer
-
-import kafka.network.InvalidRequestException
-
-object RequestKeys {
-  val ProduceKey: Short = 0
-  val FetchKey: Short = 1
-  val OffsetsKey: Short = 2
-  val MetadataKey: Short = 3
-  val LeaderAndIsrKey: Short = 4
-  val StopReplicaKey: Short = 5
-  val UpdateMetadataKey: Short = 6
-  val ControlledShutdownKey: Short = 7
-  val OffsetCommitKey: Short = 8
-  val OffsetFetchKey: Short = 9
-  val GroupCoordinatorKey: Short = 10
-  val JoinGroupKey: Short = 11
-  val HeartbeatKey: Short = 12
-  val LeaveGroupKey: Short = 13
-  val SyncGroupKey: Short = 14
-  val DescribeGroupsKey: Short = 15
-  val ListGroupsKey: Short = 16
-
-  // NOTE: this map only includes the server-side request/response handlers. Newer
-  // request types should only use the client-side versions which are parsed with
-  // o.a.k.common.requests.AbstractRequest.getRequest()
-  val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
-    Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
-        FetchKey -> ("Fetch", FetchRequest.readFrom),
-        OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
-        MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
-        LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
-        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
-        UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
-        ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
-        OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
-        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom)
-    )
-
-  def nameForKey(key: Short): String = {
-    keyToNameAndDeserializerMap.get(key) match {
-      case Some(nameAndSerializer) => nameAndSerializer._1
-      case None => throw new KafkaException("Wrong request type %d".format(key))
-    }
-  }
-
-  def deserializerForKey(key: Short): (ByteBuffer) => RequestOrResponse = {
-    keyToNameAndDeserializerMap.get(key) match {
-      case Some(nameAndSerializer) => nameAndSerializer._2
-      case None => throw new InvalidRequestException("Wrong request type %d".format(key))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 4441fc6..03c7f3d 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -23,6 +23,7 @@ import kafka.network.{RequestOrResponseSend, RequestChannel, InvalidRequestExcep
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
+import org.apache.kafka.common.protocol.ApiKeys
 import collection.Set
 
 
@@ -60,7 +61,7 @@ case class StopReplicaRequest(versionId: Short,
                               controllerEpoch: Int,
                               deletePartitions: Boolean,
                               partitions: Set[TopicAndPartition])
-        extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
+        extends RequestOrResponse(Some(ApiKeys.STOP_REPLICA.id)) {
 
   def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
     this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId,
@@ -122,4 +123,4 @@ case class StopReplicaRequest(versionId: Short,
       stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
     stopReplicaRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 401c583..656ff9f 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -24,6 +24,7 @@ import kafka.common.ErrorMapping
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
+import org.apache.kafka.common.protocol.ApiKeys
 
 import scala.collection.mutable.ListBuffer
 
@@ -52,7 +53,7 @@ case class TopicMetadataRequest(versionId: Short,
                                 correlationId: Int,
                                 clientId: String,
                                 topics: Seq[String])
- extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
+ extends RequestOrResponse(Some(ApiKeys.METADATA.id)){
 
   def this(topics: Seq[String], correlationId: Int) =
     this(TopicMetadataRequest.CurrentVersion, correlationId, TopicMetadataRequest.DefaultClientId, topics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 11c32cd..059c03e 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -23,7 +23,7 @@ import kafka.cluster.{Broker, BrokerEndPoint}
 import kafka.common.{ErrorMapping, KafkaException, TopicAndPartition}
 import kafka.network.{RequestOrResponseSend, RequestChannel}
 import kafka.network.RequestChannel.Response
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 
 import scala.collection.Set
 
@@ -70,7 +70,7 @@ case class UpdateMetadataRequest (versionId: Short,
                                   controllerEpoch: Int,
                                   partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo],
                                   aliveBrokers: Set[Broker])
-  extends RequestOrResponse(Some(RequestKeys.UpdateMetadataKey)) {
+  extends RequestOrResponse(Some(ApiKeys.UPDATE_METADATA_KEY.id)) {
 
   def this(controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String,
            partitionStateInfos: Map[TopicAndPartition, PartitionStateInfo], aliveBrokers: Set[Broker]) = {
@@ -144,4 +144,4 @@ case class UpdateMetadataRequest (versionId: Short,
       updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
     updateMetadataRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
index 568d0ac..92d9073 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
@@ -19,6 +19,7 @@ package kafka.javaapi
 import java.nio.ByteBuffer
 
 import kafka.api._
+import org.apache.kafka.common.protocol.ApiKeys
 
 import scala.collection.mutable
 
@@ -26,7 +27,7 @@ class TopicMetadataRequest(val versionId: Short,
                            val correlationId: Int,
                            val clientId: String,
                            val topics: java.util.List[String])
-    extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
+    extends RequestOrResponse(Some(ApiKeys.METADATA.id)) {
 
   val underlying: kafka.api.TopicMetadataRequest = {
     import scala.collection.JavaConversions._

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 4044f62..2fce621 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -41,7 +41,7 @@ object RequestChannel extends Logging {
   def getShutdownReceive() = {
     val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
     val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
-    byteBuffer.putShort(RequestKeys.ProduceKey)
+    byteBuffer.putShort(ApiKeys.PRODUCE.id)
     emptyProducerRequest.writeTo(byteBuffer)
     byteBuffer.rewind()
     byteBuffer
@@ -59,13 +59,29 @@ object RequestChannel extends Logging {
     @volatile var apiRemoteCompleteTimeMs = -1L
 
     val requestId = buffer.getShort()
+
+    // TODO: this will be removed once we migrated to client-side format
     // for server-side request / response format
+    // NOTE: this map only includes the server-side request/response handlers. Newer
+    // request types should only use the client-side versions which are parsed with
+    // o.a.k.common.requests.AbstractRequest.getRequest()
+    private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
+      Map(ApiKeys.PRODUCE.id -> ProducerRequest.readFrom,
+        ApiKeys.FETCH.id -> FetchRequest.readFrom,
+        ApiKeys.LIST_OFFSETS.id -> OffsetRequest.readFrom,
+        ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
+        ApiKeys.LEADER_AND_ISR.id -> LeaderAndIsrRequest.readFrom,
+        ApiKeys.STOP_REPLICA.id -> StopReplicaRequest.readFrom,
+        ApiKeys.UPDATE_METADATA_KEY.id -> UpdateMetadataRequest.readFrom,
+        ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom,
+        ApiKeys.OFFSET_COMMIT.id -> OffsetCommitRequest.readFrom,
+        ApiKeys.OFFSET_FETCH.id -> OffsetFetchRequest.readFrom
+      )
+
     // TODO: this will be removed once we migrated to client-side format
     val requestObj =
-      if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId))
-        RequestKeys.deserializerForKey(requestId)(buffer)
-      else
-        null
+      keyToNameAndDeserializerMap.get(requestId).map(readFrom => readFrom(buffer)).orNull
+
     // if we failed to find a server-side mapping, then try using the
     // client-side request / response format
     val header: RequestHeader =
@@ -113,7 +129,7 @@ object RequestChannel extends Logging {
       val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L)
       val totalTime = endTimeMs - startTimeMs
       var metricsList = List(RequestMetrics.metricsMap(ApiKeys.forId(requestId).name))
-      if (requestId == RequestKeys.FetchKey) {
+      if (requestId == ApiKeys.FETCH.id) {
         val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
         metricsList ::= ( if (isFromFollower)
                             RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName)
@@ -139,7 +155,7 @@ object RequestChannel extends Logging {
           .format(requestDesc(false), connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime, securityProtocol, session.principal))
     }
   }
-  
+
   case class Response(processor: Int, request: Request, responseSend: Send, responseAction: ResponseAction) {
     request.responseCompleteTimeMs = SystemTime.milliseconds
 
@@ -187,8 +203,8 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
   def sendRequest(request: RequestChannel.Request) {
     requestQueue.put(request)
   }
-  
-  /** Send a response back to the socket server to be sent over the network */ 
+
+  /** Send a response back to the socket server to be sent over the network */
   def sendResponse(response: RequestChannel.Response) {
     responseQueues(response.processor).put(response)
     for(onResponse <- responseListeners)
@@ -225,7 +241,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
     response
   }
 
-  def addResponseListener(onResponse: Int => Unit) { 
+  def addResponseListener(onResponse: Int => Unit) {
     responseListeners ::= onResponse
   }
 
@@ -236,8 +252,8 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
 
 object RequestMetrics {
   val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
-  val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Consumer"
-  val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Follower"
+  val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
+  val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
   (ApiKeys.values().toList.map(e => e.name)
     ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 73db2b1..ec3c4ab 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -24,6 +24,7 @@ import kafka.api._
 import kafka.network.{RequestOrResponseSend, BlockingChannel}
 import kafka.utils._
 import org.apache.kafka.common.network.NetworkReceive
+import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.utils.Utils._
 
 object SyncProducer {
@@ -55,7 +56,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
       val buffer = new RequestOrResponseSend("", request).buffer
       trace("verifying sendbuffer of size " + buffer.limit)
       val requestTypeId = buffer.getShort()
-      if(requestTypeId == RequestKeys.ProduceKey) {
+      if(requestTypeId == ApiKeys.PRODUCE.id) {
         val request = ProducerRequest.readFrom(buffer)
         trace(request.toString)
       }
@@ -161,4 +162,4 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
       connect()
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 606156a..bc599a0 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -21,8 +21,8 @@ import java.util.Properties
 
 import kafka.common.TopicAndPartition
 import kafka.log.{Log, LogConfig, LogManager}
-import kafka.api.RequestKeys
 import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.common.protocol.ApiKeys
 
 import scala.collection.mutable
 import scala.collection.Map
@@ -71,13 +71,13 @@ class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaMan
 
   def processConfigChanges(clientId: String, clientConfig: Properties) = {
     if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
-      quotaManagers(RequestKeys.ProduceKey).updateQuota(clientId,
+      quotaManagers(ApiKeys.PRODUCE.id).updateQuota(clientId,
         new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true))
     }
 
     if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
-      quotaManagers(RequestKeys.FetchKey).updateQuota(clientId,
+      quotaManagers(ApiKeys.FETCH.id).updateQuota(clientId,
         new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true))
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ade879b..ce5d2c6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.nio.ByteBuffer
-import java.util
 
 import kafka.admin.AdminUtils
 import kafka.api._
@@ -33,7 +32,7 @@ import kafka.network.RequestChannel.{Session, Response}
 import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
 import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.Node
@@ -64,24 +63,24 @@ class KafkaApis(val requestChannel: RequestChannel,
     try{
       trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
         format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))
-      request.requestId match {
-        case RequestKeys.ProduceKey => handleProducerRequest(request)
-        case RequestKeys.FetchKey => handleFetchRequest(request)
-        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
-        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
-        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
-        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
-        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
-        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
-        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
-        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
-        case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
-        case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
-        case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
-        case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
-        case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
-        case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
-        case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
+      ApiKeys.forId(request.requestId) match {
+        case ApiKeys.PRODUCE => handleProducerRequest(request)
+        case ApiKeys.FETCH => handleFetchRequest(request)
+        case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
+        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
+        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
+        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
+        case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
+        case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
+        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
+        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
+        case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
+        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
+        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
+        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
+        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
+        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
+        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -350,7 +349,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // When this callback is triggered, the remote API call has completed
       request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
 
-      quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId,
+      quotaManagers(ApiKeys.PRODUCE.id).recordAndMaybeThrottle(produceRequest.clientId,
                                                                    numBytesAppended,
                                                                    produceResponseCallback)
     }
@@ -419,7 +418,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (fetchRequest.isFromFollower) {
         fetchResponseCallback(0)
       } else {
-        quotaManagers(RequestKeys.FetchKey).recordAndMaybeThrottle(fetchRequest.clientId,
+        quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId,
                                                                    FetchResponse.responseSize(responsePartitionData
                                                                                                       .groupBy(_._1.topic),
                                                                                               fetchRequest.versionId),
@@ -869,10 +868,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     )
 
     val quotaManagers = Map[Short, ClientQuotaManager](
-      RequestKeys.ProduceKey ->
-              new ClientQuotaManager(producerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.ProduceKey), new org.apache.kafka.common.utils.SystemTime),
-      RequestKeys.FetchKey ->
-              new ClientQuotaManager(consumerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.FetchKey), new org.apache.kafka.common.utils.SystemTime)
+      ApiKeys.PRODUCE.id ->
+              new ClientQuotaManager(producerQuotaManagerCfg, metrics, ApiKeys.PRODUCE.name, new org.apache.kafka.common.utils.SystemTime),
+      ApiKeys.FETCH.id ->
+              new ClientQuotaManager(consumerQuotaManagerCfg, metrics, ApiKeys.FETCH.name, new org.apache.kafka.common.utils.SystemTime)
     )
     quotaManagers
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 4aa8438..fccfdb6 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -72,59 +72,59 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
 
   val endPoint = new EndPoint("localhost", 0, SecurityProtocol.PLAINTEXT)
-  
+
   val RequestKeyToResponseDeserializer: Map[Short, Class[_ <: Any]] =
-    Map(RequestKeys.MetadataKey -> classOf[requests.MetadataResponse],
-      RequestKeys.ProduceKey -> classOf[requests.ProduceResponse],
-      RequestKeys.FetchKey -> classOf[requests.FetchResponse],
-      RequestKeys.OffsetsKey -> classOf[requests.ListOffsetResponse],
-      RequestKeys.OffsetCommitKey -> classOf[requests.OffsetCommitResponse],
-      RequestKeys.OffsetFetchKey -> classOf[requests.OffsetFetchResponse],
-      RequestKeys.GroupCoordinatorKey -> classOf[requests.GroupCoordinatorResponse],
-      RequestKeys.UpdateMetadataKey -> classOf[requests.UpdateMetadataResponse],
-      RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse],
-      RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse],
-      RequestKeys.HeartbeatKey -> classOf[HeartbeatResponse],
-      RequestKeys.LeaveGroupKey -> classOf[LeaveGroupResponse],
-      RequestKeys.LeaderAndIsrKey -> classOf[requests.LeaderAndIsrResponse],
-      RequestKeys.StopReplicaKey -> classOf[requests.StopReplicaResponse],
-      RequestKeys.ControlledShutdownKey -> classOf[requests.ControlledShutdownResponse]
+    Map(ApiKeys.METADATA.id -> classOf[requests.MetadataResponse],
+      ApiKeys.PRODUCE.id -> classOf[requests.ProduceResponse],
+      ApiKeys.FETCH.id -> classOf[requests.FetchResponse],
+      ApiKeys.LIST_OFFSETS.id -> classOf[requests.ListOffsetResponse],
+      ApiKeys.OFFSET_COMMIT.id -> classOf[requests.OffsetCommitResponse],
+      ApiKeys.OFFSET_FETCH.id -> classOf[requests.OffsetFetchResponse],
+      ApiKeys.GROUP_COORDINATOR.id -> classOf[requests.GroupCoordinatorResponse],
+      ApiKeys.UPDATE_METADATA_KEY.id -> classOf[requests.UpdateMetadataResponse],
+      ApiKeys.JOIN_GROUP.id -> classOf[JoinGroupResponse],
+      ApiKeys.SYNC_GROUP.id -> classOf[SyncGroupResponse],
+      ApiKeys.HEARTBEAT.id -> classOf[HeartbeatResponse],
+      ApiKeys.LEAVE_GROUP.id -> classOf[LeaveGroupResponse],
+      ApiKeys.LEADER_AND_ISR.id -> classOf[requests.LeaderAndIsrResponse],
+      ApiKeys.STOP_REPLICA.id -> classOf[requests.StopReplicaResponse],
+      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> classOf[requests.ControlledShutdownResponse]
     )
 
   val RequestKeyToErrorCode = Map[Short, (Nothing) => Short](
-    RequestKeys.MetadataKey -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()),
-    RequestKeys.ProduceKey -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode),
-    RequestKeys.FetchKey -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
-    RequestKeys.OffsetsKey -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
-    RequestKeys.OffsetCommitKey -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
-    RequestKeys.OffsetFetchKey -> ((resp: requests.OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
-    RequestKeys.GroupCoordinatorKey -> ((resp: requests.GroupCoordinatorResponse) => resp.errorCode()),
-    RequestKeys.UpdateMetadataKey -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()),
-    RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()),
-    RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()),
-    RequestKeys.HeartbeatKey -> ((resp: HeartbeatResponse) => resp.errorCode()),
-    RequestKeys.LeaveGroupKey -> ((resp: LeaveGroupResponse) => resp.errorCode()),
-    RequestKeys.LeaderAndIsrKey -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
-    RequestKeys.StopReplicaKey -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
-    RequestKeys.ControlledShutdownKey -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode())
+    ApiKeys.METADATA.id -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()),
+    ApiKeys.PRODUCE.id -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode),
+    ApiKeys.FETCH.id -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+    ApiKeys.LIST_OFFSETS.id -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+    ApiKeys.OFFSET_COMMIT.id -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
+    ApiKeys.OFFSET_FETCH.id -> ((resp: requests.OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+    ApiKeys.GROUP_COORDINATOR.id -> ((resp: requests.GroupCoordinatorResponse) => resp.errorCode()),
+    ApiKeys.UPDATE_METADATA_KEY.id -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()),
+    ApiKeys.JOIN_GROUP.id -> ((resp: JoinGroupResponse) => resp.errorCode()),
+    ApiKeys.SYNC_GROUP.id -> ((resp: SyncGroupResponse) => resp.errorCode()),
+    ApiKeys.HEARTBEAT.id -> ((resp: HeartbeatResponse) => resp.errorCode()),
+    ApiKeys.LEAVE_GROUP.id -> ((resp: LeaveGroupResponse) => resp.errorCode()),
+    ApiKeys.LEADER_AND_ISR.id -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
+    ApiKeys.STOP_REPLICA.id -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
+    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode())
   )
 
   val RequestKeysToAcls = Map[Short, Map[Resource, Set[Acl]]](
-    RequestKeys.MetadataKey -> TopicDescribeAcl,
-    RequestKeys.ProduceKey -> TopicWriteAcl,
-    RequestKeys.FetchKey -> TopicReadAcl,
-    RequestKeys.OffsetsKey -> TopicDescribeAcl,
-    RequestKeys.OffsetCommitKey -> (TopicReadAcl ++ GroupReadAcl),
-    RequestKeys.OffsetFetchKey -> (TopicReadAcl ++ GroupReadAcl),
-    RequestKeys.GroupCoordinatorKey -> (TopicReadAcl ++ GroupReadAcl),
-    RequestKeys.UpdateMetadataKey -> ClusterAcl,
-    RequestKeys.JoinGroupKey -> GroupReadAcl,
-    RequestKeys.SyncGroupKey -> GroupReadAcl,
-    RequestKeys.HeartbeatKey -> GroupReadAcl,
-    RequestKeys.LeaveGroupKey -> GroupReadAcl,
-    RequestKeys.LeaderAndIsrKey -> ClusterAcl,
-    RequestKeys.StopReplicaKey -> ClusterAcl,
-    RequestKeys.ControlledShutdownKey -> ClusterAcl
+    ApiKeys.METADATA.id -> TopicDescribeAcl,
+    ApiKeys.PRODUCE.id -> TopicWriteAcl,
+    ApiKeys.FETCH.id -> TopicReadAcl,
+    ApiKeys.LIST_OFFSETS.id -> TopicDescribeAcl,
+    ApiKeys.OFFSET_COMMIT.id -> (TopicReadAcl ++ GroupReadAcl),
+    ApiKeys.OFFSET_FETCH.id -> (TopicReadAcl ++ GroupReadAcl),
+    ApiKeys.GROUP_COORDINATOR.id -> (TopicReadAcl ++ GroupReadAcl),
+    ApiKeys.UPDATE_METADATA_KEY.id -> ClusterAcl,
+    ApiKeys.JOIN_GROUP.id -> GroupReadAcl,
+    ApiKeys.SYNC_GROUP.id -> GroupReadAcl,
+    ApiKeys.HEARTBEAT.id -> GroupReadAcl,
+    ApiKeys.LEAVE_GROUP.id -> GroupReadAcl,
+    ApiKeys.LEADER_AND_ISR.id -> ClusterAcl,
+    ApiKeys.STOP_REPLICA.id -> ClusterAcl,
+    ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ClusterAcl
   )
 
   // configure the servers and clients
@@ -227,21 +227,21 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
   @Test
   def testAuthorization() {
     val requestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest](
-      RequestKeys.MetadataKey -> createMetadataRequest,
-      RequestKeys.ProduceKey -> createProduceRequest,
-      RequestKeys.FetchKey -> createFetchRequest,
-      RequestKeys.OffsetsKey -> createListOffsetsRequest,
-      RequestKeys.OffsetFetchKey -> createOffsetFetchRequest,
-      RequestKeys.GroupCoordinatorKey -> createGroupCoordinatorRequest,
-      RequestKeys.UpdateMetadataKey -> createUpdateMetadataRequest,
-      RequestKeys.JoinGroupKey -> createJoinGroupRequest,
-      RequestKeys.SyncGroupKey -> createSyncGroupRequest,
-      RequestKeys.OffsetCommitKey -> createOffsetCommitRequest,
-      RequestKeys.HeartbeatKey -> createHeartbeatRequest,
-      RequestKeys.LeaveGroupKey -> createLeaveGroupRequest,
-      RequestKeys.LeaderAndIsrKey -> createLeaderAndIsrRequest,
-      RequestKeys.StopReplicaKey -> createStopReplicaRequest,
-      RequestKeys.ControlledShutdownKey -> createControlledShutdownRequest
+      ApiKeys.METADATA.id -> createMetadataRequest,
+      ApiKeys.PRODUCE.id -> createProduceRequest,
+      ApiKeys.FETCH.id -> createFetchRequest,
+      ApiKeys.LIST_OFFSETS.id -> createListOffsetsRequest,
+      ApiKeys.OFFSET_FETCH.id -> createOffsetFetchRequest,
+      ApiKeys.GROUP_COORDINATOR.id -> createGroupCoordinatorRequest,
+      ApiKeys.UPDATE_METADATA_KEY.id -> createUpdateMetadataRequest,
+      ApiKeys.JOIN_GROUP.id -> createJoinGroupRequest,
+      ApiKeys.SYNC_GROUP.id -> createSyncGroupRequest,
+      ApiKeys.OFFSET_COMMIT.id -> createOffsetCommitRequest,
+      ApiKeys.HEARTBEAT.id -> createHeartbeatRequest,
+      ApiKeys.LEAVE_GROUP.id -> createLeaveGroupRequest,
+      ApiKeys.LEADER_AND_ISR.id -> createLeaderAndIsrRequest,
+      ApiKeys.STOP_REPLICA.id -> createStopReplicaRequest,
+      ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> createControlledShutdownRequest
     )
 
     val socket = new Socket("localhost", servers.head.boundPort())

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index 7990020..cc1f821 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics.{Quota, KafkaMetric}
+import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Assert.assertEquals
 import org.junit.Assert.assertTrue
 import org.junit.{After, Before, Test}
@@ -125,7 +126,7 @@ class QuotasTest extends KafkaServerTestHarness {
     produce(producers.head, numRecords)
 
     val producerMetricName = new MetricName("throttle-time",
-                                    RequestKeys.nameForKey(RequestKeys.ProduceKey),
+                                    ApiKeys.PRODUCE.name,
                                     "Tracking throttle-time per client",
                                     "client-id", producerId1)
     assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0)
@@ -136,7 +137,7 @@ class QuotasTest extends KafkaServerTestHarness {
     val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
     replicaConsumers.head.fetch(request)
     val consumerMetricName = new MetricName("throttle-time",
-                                            RequestKeys.nameForKey(RequestKeys.FetchKey),
+                                            ApiKeys.FETCH.name,
                                             "Tracking throttle-time per client",
                                             "client-id", consumerId1)
     assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0)
@@ -154,8 +155,8 @@ class QuotasTest extends KafkaServerTestHarness {
 
     TestUtils.retry(10000) {
       val quotaManagers: Map[Short, ClientQuotaManager] = leaderNode.apis.quotaManagers
-      val overrideProducerQuota = quotaManagers.get(RequestKeys.ProduceKey).get.quota(producerId2)
-      val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(consumerId2)
+      val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(producerId2)
+      val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(consumerId2)
 
       assertEquals(s"ClientId $producerId2 must have unlimited producer quota", Quota.upperBound(Long.MaxValue), overrideProducerQuota)
       assertEquals(s"ClientId $consumerId2 must have unlimited consumer quota", Quota.upperBound(Long.MaxValue), overrideConsumerQuota)
@@ -166,7 +167,7 @@ class QuotasTest extends KafkaServerTestHarness {
     val numRecords = 1000
     produce(producers(1), numRecords)
     val producerMetricName = new MetricName("throttle-time",
-                                            RequestKeys.nameForKey(RequestKeys.ProduceKey),
+                                            ApiKeys.PRODUCE.name,
                                             "Tracking throttle-time per client",
                                             "client-id", producerId2)
     assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0)
@@ -177,7 +178,7 @@ class QuotasTest extends KafkaServerTestHarness {
     val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build()
     replicaConsumers(1).fetch(request)
     val consumerMetricName = new MetricName("throttle-time",
-                                            RequestKeys.nameForKey(RequestKeys.FetchKey),
+                                            ApiKeys.FETCH.name,
                                             "Tracking throttle-time per client",
                                             "client-id", consumerId2)
     assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 0570c79..47afad1 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -17,8 +17,8 @@
 package kafka.admin
 
 import junit.framework.Assert._
-import kafka.api.RequestKeys
 import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Test
 import java.util.Properties
 import kafka.utils._
@@ -444,8 +444,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
     // Test that the existing clientId overrides are read
     val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
     try {
-      assertEquals(new Quota(1000, true), server.apis.quotaManagers(RequestKeys.ProduceKey).quota(clientId));
-      assertEquals(new Quota(2000, true), server.apis.quotaManagers(RequestKeys.FetchKey).quota(clientId));
+      assertEquals(new Quota(1000, true), server.apis.quotaManagers(ApiKeys.PRODUCE.id).quota(clientId))
+      assertEquals(new Quota(2000, true), server.apis.quotaManagers(ApiKeys.FETCH.id).quota(clientId))
     } finally {
       server.shutdown()
       server.config.logDirs.foreach(CoreUtils.rm(_))

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bc90d45/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 6b49c4e..7cc8773 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -18,8 +18,8 @@ package kafka.server
 
 import java.util.Properties
 
+import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Assert._
-import kafka.api.RequestKeys
 import org.apache.kafka.common.metrics.Quota
 import org.easymock.{Capture, EasyMock}
 import org.junit.Test
@@ -69,8 +69,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     TestUtils.retry(10000) {
       val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
       val quotaManagers: Map[Short, ClientQuotaManager] = servers(0).apis.quotaManagers
-      val overrideProducerQuota = quotaManagers.get(RequestKeys.ProduceKey).get.quota(clientId)
-      val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(clientId)
+      val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
+      val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
 
       assertEquals(s"ClientId $clientId must have overridden producer quota of 1000",
         Quota.upperBound(1000), overrideProducerQuota)


Mime
View raw message