kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/8] kafka git commit: MINOR: Move request/response schemas to the corresponding object representation
Date Tue, 19 Sep 2017 04:13:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a64fe2ed8 -> 0cf770800


http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 067fc27..cc65003 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -484,7 +484,7 @@ public class RequestResponseTest {
     @Test
     public void testControlledShutdownResponse() {
         ControlledShutdownResponse response = createControlledShutdownResponse();
-        short version = ApiKeys.CONTROLLED_SHUTDOWN_KEY.latestVersion();
+        short version = ApiKeys.CONTROLLED_SHUTDOWN.latestVersion();
         Struct struct = response.toStruct(version);
         ByteBuffer buffer = toBuffer(struct);
         ControlledShutdownResponse deserialized = ControlledShutdownResponse.parse(buffer,
version);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index d41d61a..c59f2c9 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -60,6 +60,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.security.auth.Subject;
+import javax.security.auth.login.Configuration;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -74,9 +76,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import javax.security.auth.Subject;
-import javax.security.auth.login.Configuration;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -953,7 +952,7 @@ public class SaslAuthenticatorTest {
 
                     @Override
                     protected ApiVersionsResponse apiVersionsResponse() {
-                        List<ApiVersion> apiVersions = new ArrayList<>(ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions());
+                        List<ApiVersion> apiVersions = new ArrayList<>(ApiVersionsResponse.defaultApiVersionsResponse().apiVersions());
                         for (Iterator<ApiVersion> it = apiVersions.iterator(); it.hasNext();
) {
                             ApiVersion apiVersion = it.next();
                             if (apiVersion.apiKey == ApiKeys.SASL_AUTHENTICATE.id) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 6976f7c..58e5543 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -242,7 +242,7 @@ class RequestSendThread(val controllerId: Int,
       if (clientResponse != null) {
         val requestHeader = clientResponse.requestHeader
         val api = requestHeader.apiKey
-        if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA &&
api != ApiKeys.UPDATE_METADATA_KEY)
+        if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA &&
api != ApiKeys.UPDATE_METADATA)
           throw new KafkaException(s"Unexpected apiKey received: $apiKey")
 
         val response = clientResponse.responseBody
@@ -455,7 +455,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
       }
 
       updateMetadataRequestBrokerSet.foreach { broker =>
-        controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, updateMetadataRequest,
null)
+        controller.sendRequest(broker, ApiKeys.UPDATE_METADATA, updateMetadataRequest, null)
       }
       updateMetadataRequestBrokerSet.clear()
       updateMetadataRequestPartitionInfoMap.clear()

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index c2ebbad..aba0249 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -36,7 +36,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.Type._
-import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
+import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index efd315b..2c7178e 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -20,7 +20,7 @@ import kafka.common.{KafkaException, MessageFormatter}
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.types.Type._
-import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
+import org.apache.kafka.common.protocol.types._
 import java.io.PrintStream
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 4c3d1a1..69d4e36 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -17,7 +17,7 @@
 package kafka.log
 
 import java.io._
-import java.nio.{BufferUnderflowException, ByteBuffer}
+import java.nio.ByteBuffer
 import java.nio.file.Files
 
 import kafka.common.KafkaException

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 2d6370a..1128fd3 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -28,7 +28,7 @@ import kafka.server.QuotaId
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.network.Send
-import org.apache.kafka.common.protocol.{ApiKeys, Protocol}
+import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
@@ -71,7 +71,7 @@ object RequestChannel extends Logging {
     //most request types are parsed entirely into objects at this point. for those we can
release the underlying buffer.
     //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES)
retain a reference
     //to the buffer. for those requests we cannot release the buffer early, but only when
request processing is done.
-    if (!Protocol.requiresDelayedDeallocation(header.apiKey.id)) {
+    if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 62e8abf..e07e689 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -102,8 +102,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.METADATA => handleTopicMetadataRequest(request)
         case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
         case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
-        case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
-        case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
+        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
+        case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
         case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
         case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
         case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
@@ -1126,7 +1126,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListGroupsRequest(request: RequestChannel.Request) {
     if (!authorize(request.session, Describe, Resource.ClusterResource)) {
       sendResponseMaybeThrottle(request, requestThrottleMs =>
-        ListGroupsResponse.fromError(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED))
+        request.body[ListGroupsRequest].getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
       val (error, groups) = groupCoordinator.handleListGroups()
       val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId,
group.protocolType) }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index dcd2038..7aeffb5 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -117,7 +117,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.OFFSET_COMMIT -> classOf[requests.OffsetCommitResponse],
       ApiKeys.OFFSET_FETCH -> classOf[requests.OffsetFetchResponse],
       ApiKeys.FIND_COORDINATOR -> classOf[FindCoordinatorResponse],
-      ApiKeys.UPDATE_METADATA_KEY -> classOf[requests.UpdateMetadataResponse],
+      ApiKeys.UPDATE_METADATA -> classOf[requests.UpdateMetadataResponse],
       ApiKeys.JOIN_GROUP -> classOf[JoinGroupResponse],
       ApiKeys.SYNC_GROUP -> classOf[SyncGroupResponse],
       ApiKeys.DESCRIBE_GROUPS -> classOf[DescribeGroupsResponse],
@@ -125,7 +125,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.LEAVE_GROUP -> classOf[LeaveGroupResponse],
       ApiKeys.LEADER_AND_ISR -> classOf[requests.LeaderAndIsrResponse],
       ApiKeys.STOP_REPLICA -> classOf[requests.StopReplicaResponse],
-      ApiKeys.CONTROLLED_SHUTDOWN_KEY -> classOf[requests.ControlledShutdownResponse],
+      ApiKeys.CONTROLLED_SHUTDOWN -> classOf[requests.ControlledShutdownResponse],
       ApiKeys.CREATE_TOPICS -> classOf[CreateTopicsResponse],
       ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse],
       ApiKeys.OFFSET_FOR_LEADER_EPOCH -> classOf[OffsetsForLeaderEpochResponse],
@@ -152,7 +152,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData.asScala.find(_._1
== tp).get._2),
     ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
     ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error),
-    ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.error),
+    ApiKeys.UPDATE_METADATA -> ((resp: requests.UpdateMetadataResponse) => resp.error),
     ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
     ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
     ApiKeys.DESCRIBE_GROUPS -> ((resp: DescribeGroupsResponse) => resp.groups.get(group).error),
@@ -160,7 +160,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
     ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1
== tp).get._2),
     ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1
== tp).get._2),
-    ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) =>
resp.error),
+    ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) =>
resp.error),
     ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors.asScala.find(_._1
== createTopic).get._2.error),
     ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors.asScala.find(_._1
== deleteTopic).get._2),
     ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.get(tp).error),
@@ -190,7 +190,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.OFFSET_COMMIT -> (topicReadAcl ++ groupReadAcl),
     ApiKeys.OFFSET_FETCH -> (topicReadAcl ++ groupDescribeAcl),
     ApiKeys.FIND_COORDINATOR -> (topicReadAcl ++ groupDescribeAcl ++ transactionalIdDescribeAcl),
-    ApiKeys.UPDATE_METADATA_KEY -> clusterAcl,
+    ApiKeys.UPDATE_METADATA -> clusterAcl,
     ApiKeys.JOIN_GROUP -> groupReadAcl,
     ApiKeys.SYNC_GROUP -> groupReadAcl,
     ApiKeys.DESCRIBE_GROUPS -> groupDescribeAcl,
@@ -198,7 +198,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.LEAVE_GROUP -> groupReadAcl,
     ApiKeys.LEADER_AND_ISR -> clusterAcl,
     ApiKeys.STOP_REPLICA -> clusterAcl,
-    ApiKeys.CONTROLLED_SHUTDOWN_KEY -> clusterAcl,
+    ApiKeys.CONTROLLED_SHUTDOWN -> clusterAcl,
     ApiKeys.CREATE_TOPICS -> clusterCreateAcl,
     ApiKeys.DELETE_TOPICS -> topicDeleteAcl,
     ApiKeys.OFFSET_FOR_LEADER_EPOCH -> clusterAcl,
@@ -293,7 +293,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
       Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol,
         ListenerName.forSecurityProtocol(securityProtocol))).asJava, null)).asJava
-    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
+    val version = ApiKeys.UPDATE_METADATA.latestVersion
     new requests.UpdateMetadataRequest.Builder(version, brokerId, Int.MaxValue, partitionState,
brokers).build()
   }
 
@@ -373,7 +373,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
       ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
       ApiKeys.FIND_COORDINATOR -> createFindCoordinatorRequest,
-      ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest,
+      ApiKeys.UPDATE_METADATA -> createUpdateMetadataRequest,
       ApiKeys.JOIN_GROUP -> createJoinGroupRequest,
       ApiKeys.SYNC_GROUP -> createSyncGroupRequest,
       ApiKeys.DESCRIBE_GROUPS -> createDescribeGroupsRequest,
@@ -382,7 +382,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.LEAVE_GROUP -> leaveGroupRequest,
       ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest,
       ApiKeys.STOP_REPLICA -> stopReplicaRequest,
-      ApiKeys.CONTROLLED_SHUTDOWN_KEY -> controlledShutdownRequest,
+      ApiKeys.CONTROLLED_SHUTDOWN -> controlledShutdownRequest,
       ApiKeys.CREATE_TOPICS -> createTopicsRequest,
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
       ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index c52020c..83f7111 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
 object ApiVersionsRequestTest {
   def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse) {
     assertEquals("API keys in ApiVersionsResponse must match API keys supported by broker.",
ApiKeys.values.length, apiVersionsResponse.apiVersions.size)
-    for (expectedApiVersion: ApiVersion <- ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions.asScala)
{
+    for (expectedApiVersion: ApiVersion <- ApiVersionsResponse.defaultApiVersionsResponse().apiVersions.asScala)
{
       val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey)
       assertNotNull(s"API key ${actualApiVersion.apiKey} is supported by broker, but not
received in ApiVersionsResponse.", actualApiVersion)
       assertEquals("API key must be supported by the broker.", expectedApiVersion.apiKey,
actualApiVersion.apiKey)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
index 262686a..3ba7cd5 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import org.apache.kafka.common.requests.ApiVersionsResponse
-import org.apache.kafka.common.protocol.{ApiKeys, Protocol}
+import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Assert._
 import org.junit.Test
 
@@ -26,25 +26,25 @@ class ApiVersionsTest {
 
   @Test
   def testApiVersions(): Unit = {
-    val apiVersions = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions
+    val apiVersions = ApiVersionsResponse.defaultApiVersionsResponse().apiVersions
     assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length)
 
     for (key <- ApiKeys.values) {
-      val version = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersion(key.id)
+      val version = ApiVersionsResponse.defaultApiVersionsResponse().apiVersion(key.id)
       assertNotNull(s"Could not find ApiVersion for API ${key.name}", version)
       assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, key.oldestVersion)
       assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, key.latestVersion)
 
       // Check if versions less than min version are indeed set as null, i.e., deprecated.
       for (i <- 0 until version.minVersion) {
-        assertNull(s"Request version $i for API ${version.apiKey} must be null.", Protocol.REQUESTS(version.apiKey)(i))
-        assertNull(s"Response version $i for API ${version.apiKey} must be null.", Protocol.RESPONSES(version.apiKey)(i))
+        assertNull(s"Request version $i for API ${version.apiKey} must be null.", key.requestSchemas(i))
+        assertNull(s"Response version $i for API ${version.apiKey} must be null.", key.responseSchemas(i))
       }
 
       // Check if versions between min and max versions are non null, i.e., valid.
       for (i <- version.minVersion.toInt to version.maxVersion) {
-        assertNotNull(s"Request version $i for API ${version.apiKey} must not be null.",
Protocol.REQUESTS(version.apiKey)(i))
-        assertNotNull(s"Response version $i for API ${version.apiKey} must not be null.",
Protocol.RESPONSES(version.apiKey)(i))
+        assertNotNull(s"Request version $i for API ${version.apiKey} must not be null.",
key.requestSchemas(i))
+        assertNotNull(s"Response version $i for API ${version.apiKey} must not be null.",
key.responseSchemas(i))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index de3ccdb..6b3c6c0 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -69,7 +69,7 @@ class MetadataCacheTest {
       new TopicPartition(topic0, 1) -> new UpdateMetadataRequest.PartitionState(controllerEpoch,
1, 1, asList(1, 0), zkVersion, asList(1, 2, 0, 4), asList()),
       new TopicPartition(topic1, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch,
2, 2, asList(2, 1), zkVersion, asList(2, 1, 3), asList()))
 
-    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
+    val version = ApiKeys.UPDATE_METADATA.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId,
controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
@@ -127,7 +127,7 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch,
leader, leaderEpoch, asList(0), zkVersion, asList(0), asList()))
 
-    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
+    val version = ApiKeys.UPDATE_METADATA.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId,
controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
@@ -171,7 +171,7 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch,
leader, leaderEpoch, isr, zkVersion, replicas, asList()))
 
-    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
+    val version = ApiKeys.UPDATE_METADATA.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId,
controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
@@ -231,7 +231,7 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch,
leader, leaderEpoch, isr, zkVersion, replicas, asList()))
 
-    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
+    val version = ApiKeys.UPDATE_METADATA.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId,
controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
@@ -283,7 +283,7 @@ class MetadataCacheTest {
     val isr = asList[Integer](0, 1)
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch,
leader, leaderEpoch, isr, 3, replicas, asList()))
-    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
+    val version = ApiKeys.UPDATE_METADATA.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch,
partitionStates.asJava,
       brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
@@ -316,7 +316,7 @@ class MetadataCacheTest {
       val isr = asList[Integer](0, 1)
       val partitionStates = Map(
         new TopicPartition(topic, 0) -> new UpdateMetadataRequest.PartitionState(controllerEpoch,
leader, leaderEpoch, isr, 3, replicas, asList()))
-      val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
+      val version = ApiKeys.UPDATE_METADATA.latestVersion
       val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch,
partitionStates.asJava,
         brokers.asJava).build()
       cache.updateCache(15, updateMetadataRequest)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index bb9f82e..1a3b5c2 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -187,16 +187,16 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.STOP_REPLICA =>
           new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava)
 
-        case ApiKeys.UPDATE_METADATA_KEY =>
+        case ApiKeys.UPDATE_METADATA =>
           val partitionState = Map(tp -> new UpdateMetadataRequest.PartitionState(
             Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava,
Seq.empty[Integer].asJava)).asJava
           val securityProtocol = SecurityProtocol.PLAINTEXT
           val brokers = Set(new UpdateMetadataRequest.Broker(brokerId,
             Seq(new UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol,
             ListenerName.forSecurityProtocol(securityProtocol))).asJava, null)).asJava
-          new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA_KEY.latestVersion, brokerId,
Int.MaxValue, partitionState, brokers)
+          new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, brokerId,
Int.MaxValue, partitionState, brokers)
 
-        case ApiKeys.CONTROLLED_SHUTDOWN_KEY =>
+        case ApiKeys.CONTROLLED_SHUTDOWN =>
           new ControlledShutdownRequest.Builder(brokerId)
 
         case ApiKeys.OFFSET_COMMIT =>


Mime
View raw message