kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5128; Check inter broker version in transactional methods
Date Fri, 26 May 2017 16:54:23 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 1b15adde1 -> 012332042


KAFKA-5128; Check inter broker version in transactional methods

Add check in `KafkaApis` that the inter broker protocol version is at least `KAFKA_0_11_0_IV0`,
i.e., supporting transactions

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3103 from dguy/kafka-5128

(cherry picked from commit 7892b4e6c7c32be09d78a8bbbeeaa823d3197aaa)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01233204
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01233204
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01233204

Branch: refs/heads/0.11.0
Commit: 0123320426341fe50d4124f0ef398d7f5aaee909
Parents: 1b15add
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri May 26 09:52:47 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri May 26 09:54:21 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/ApiKeys.java   |  71 +++++++-----
 .../common/requests/ApiVersionsResponse.java    |  18 +--
 .../clients/consumer/internals/FetcherTest.java |   2 +-
 .../clients/producer/internals/SenderTest.java  |   2 +-
 .../requests/ApiVersionsResponseTest.java       |  68 ++++++++++++
 .../transaction/TransactionStateManager.scala   |   1 -
 .../src/main/scala/kafka/server/KafkaApis.scala |  14 ++-
 .../scala/unit/kafka/server/KafkaApisTest.scala | 111 +++++++++++++++++++
 8 files changed, 246 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 36f6403..721a610 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.protocol;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
 
 import java.nio.ByteBuffer;
 
@@ -26,25 +27,25 @@ import java.nio.ByteBuffer;
  * Identifiers for all the Kafka APIs
  */
 public enum ApiKeys {
-    PRODUCE(0, "Produce", false),
-    FETCH(1, "Fetch", false),
-    LIST_OFFSETS(2, "Offsets", false),
-    METADATA(3, "Metadata", false),
+    PRODUCE(0, "Produce"),
+    FETCH(1, "Fetch"),
+    LIST_OFFSETS(2, "Offsets"),
+    METADATA(3, "Metadata"),
     LEADER_AND_ISR(4, "LeaderAndIsr", true),
     STOP_REPLICA(5, "StopReplica", true),
     UPDATE_METADATA_KEY(6, "UpdateMetadata", true),
     CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown", true),
-    OFFSET_COMMIT(8, "OffsetCommit", false),
-    OFFSET_FETCH(9, "OffsetFetch", false),
-    FIND_COORDINATOR(10, "FindCoordinator", false),
-    JOIN_GROUP(11, "JoinGroup", false),
-    HEARTBEAT(12, "Heartbeat", false),
-    LEAVE_GROUP(13, "LeaveGroup", false),
-    SYNC_GROUP(14, "SyncGroup", false),
-    DESCRIBE_GROUPS(15, "DescribeGroups", false),
-    LIST_GROUPS(16, "ListGroups", false),
-    SASL_HANDSHAKE(17, "SaslHandshake", false),
-    API_VERSIONS(18, "ApiVersions", false) {
+    OFFSET_COMMIT(8, "OffsetCommit"),
+    OFFSET_FETCH(9, "OffsetFetch"),
+    FIND_COORDINATOR(10, "FindCoordinator"),
+    JOIN_GROUP(11, "JoinGroup"),
+    HEARTBEAT(12, "Heartbeat"),
+    LEAVE_GROUP(13, "LeaveGroup"),
+    SYNC_GROUP(14, "SyncGroup"),
+    DESCRIBE_GROUPS(15, "DescribeGroups"),
+    LIST_GROUPS(16, "ListGroups"),
+    SASL_HANDSHAKE(17, "SaslHandshake"),
+    API_VERSIONS(18, "ApiVersions") {
         @Override
         public Struct parseResponse(short version, ByteBuffer buffer) {
             // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest
@@ -53,21 +54,21 @@ public enum ApiKeys {
             return parseResponse(version, buffer, (short) 0);
         }
     },
-    CREATE_TOPICS(19, "CreateTopics", false),
-    DELETE_TOPICS(20, "DeleteTopics", false),
-    DELETE_RECORDS(21, "DeleteRecords", false),
-    INIT_PRODUCER_ID(22, "InitProducerId", false),
+    CREATE_TOPICS(19, "CreateTopics"),
+    DELETE_TOPICS(20, "DeleteTopics"),
+    DELETE_RECORDS(21, "DeleteRecords"),
+    INIT_PRODUCER_ID(22, "InitProducerId"),
     OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true),
-    ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false),
-    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false),
-    END_TXN(26, "EndTxn", false),
-    WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true),
-    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false),
-    DESCRIBE_ACLS(29, "DescribeAcls", false),
-    CREATE_ACLS(30, "CreateAcls", false),
-    DELETE_ACLS(31, "DeleteAcls", false),
-    DESCRIBE_CONFIGS(32, "DescribeConfigs", false),
-    ALTER_CONFIGS(33, "AlterConfigs", false);
+    ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2),
+    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2),
+    END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2),
+    WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2),
+    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2),
+    DESCRIBE_ACLS(29, "DescribeAcls"),
+    CREATE_ACLS(30, "CreateAcls"),
+    DELETE_ACLS(31, "DeleteAcls"),
+    DESCRIBE_CONFIGS(32, "DescribeConfigs"),
+    ALTER_CONFIGS(33, "AlterConfigs");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
@@ -93,12 +94,24 @@ public enum ApiKeys {
     /** indicates if this is a ClusterAction request used only by brokers */
     public final boolean clusterAction;
 
+    /** indicates the minimum required inter broker magic required to support the API */
+    public final byte minRequiredInterBrokerMagic;
+
+    ApiKeys(int id, String name) {
+        this(id, name, false);
+    }
+
     ApiKeys(int id, String name, boolean clusterAction) {
+        this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0);
+    }
+
+    ApiKeys(int id, String name, boolean clusterAction, byte minRequiredInterBrokerMagic)
{
         if (id < 0)
             throw new IllegalArgumentException("id must not be negative, id: " + id);
         this.id = (short) id;
         this.name = name;
         this.clusterAction = clusterAction;
+        this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
     }
 
     public static ApiKeys forId(int id) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 6f921a7..e9d5023 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -31,7 +32,8 @@ import java.util.Map;
 
 public class ApiVersionsResponse extends AbstractResponse {
 
-    public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME);
+    public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME,
RecordBatch.CURRENT_MAGIC_VALUE);
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String API_VERSIONS_KEY_NAME = "api_versions";
     public static final String API_KEY_NAME = "api_key";
@@ -114,11 +116,11 @@ public class ApiVersionsResponse extends AbstractResponse {
         return struct;
     }
 
-    public static ApiVersionsResponse apiVersionsResponse(short version, int throttleTimeMs)
{
-        if (throttleTimeMs == 0 || version == 0)
+    public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic)
{
+        if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME)
{
             return API_VERSIONS_RESPONSE;
-        else
-            return createApiVersionsResponse(throttleTimeMs);
+        }
+        return createApiVersionsResponse(throttleTimeMs, maxMagic);
     }
 
     /**
@@ -150,10 +152,12 @@ public class ApiVersionsResponse extends AbstractResponse {
         return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer));
     }
 
-    public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs) {
+    public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs, final
byte minMagic) {
         List<ApiVersion> versionList = new ArrayList<>();
         for (ApiKeys apiKey : ApiKeys.values()) {
-            versionList.add(new ApiVersion(apiKey));
+            if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
+                versionList.add(new ApiVersion(apiKey));
+            }
         }
         return new ApiVersionsResponse(throttleTimeMs, Errors.NONE, versionList);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index a81dc58..ba5b7d5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1051,7 +1051,7 @@ public class FetcherTest {
                 time, true, new ApiVersions(), throttleTimeSensor);
 
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
-        ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400).serialize(apiVersionsResponseVersion,
new ResponseHeader(0));
+        ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion,
new ResponseHeader(0));
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(),
buffer)));
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());

http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 50c4cd4..c08ea57 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -229,7 +229,7 @@ public class SenderTest {
                 time, true, new ApiVersions(), throttleTimeSensor);
 
         short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
-        ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400).serialize(apiVersionsResponseVersion,
new ResponseHeader(0));
+        ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion,
new ResponseHeader(0));
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(),
buffer)));
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());

http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
new file mode 100644
index 0000000..1e8e3b4
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ApiVersionsResponseTest {
+
+    @Test
+    public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() throws Exception
{
+        final ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(10,
RecordBatch.MAGIC_VALUE_V1);
+        verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
+        assertEquals(10, response.throttleTimeMs());
+    }
+
+    @Test
+    public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() throws Exception
{
+        assertEquals(apiKeysInResponse(ApiVersionsResponse.API_VERSIONS_RESPONSE), Utils.mkSet(ApiKeys.values()));
+    }
+
+    @Test
+    public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle()
throws Exception {
+        ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
RecordBatch.CURRENT_MAGIC_VALUE);
+        assertEquals(Utils.mkSet(ApiKeys.values()), apiKeysInResponse(response));
+        assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
+    }
+    
+    private void verifyApiKeysForMagic(final ApiVersionsResponse response, final byte maxMagic)
{
+        for (final ApiVersionsResponse.ApiVersion version : response.apiVersions()) {
+            assertTrue(ApiKeys.forId(version.apiKey).minRequiredInterBrokerMagic <= maxMagic);
+        }
+    }
+
+    private Set<ApiKeys> apiKeysInResponse(final ApiVersionsResponse apiVersions) {
+        final Set<ApiKeys> apiKeys = new HashSet<>();
+        for (final ApiVersionsResponse.ApiVersion version : apiVersions.apiVersions()) {
+            apiKeys.add(ApiKeys.forId(version.apiKey));
+        }
+        return apiKeys;
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 0d7b5c4..19b9b91 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -439,7 +439,6 @@ class TransactionStateManager(brokerId: Int,
       throw new KafkaException(s"Transaction topic number of partitions has changed from
$transactionTopicPartitionCount to $curTransactionTopicPartitionCount")
   }
 
-  // TODO: check broker message format and error if < V2
   def appendTransactionToLog(transactionalId: String,
                              coordinatorEpoch: Int,
                              newMetadata: TxnTransitMetadata,

http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 380685f..473d108 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicInteger
 
 import kafka.admin.{AdminUtils, RackAwareMode}
-import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse}
+import kafka.api.{ApiVersion, ControlledShutdownRequest, ControlledShutdownResponse, KAFKA_0_11_0_IV0}
 import kafka.cluster.Partition
 import kafka.common.{KafkaStorageException, OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
@@ -1277,7 +1277,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(requestThrottleMs: Int) {
       val responseSend =
         if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion))
-          ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, requestThrottleMs).toSend(request.connectionId,
request.header)
+          ApiVersionsResponse.apiVersionsResponse(requestThrottleMs, config.interBrokerProtocolVersion.messageFormatVersion).toSend(request.connectionId,
request.header)
         else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header)
       requestChannel.sendResponse(RequestChannel.Response(request, responseSend))
     }
@@ -1453,6 +1453,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleEndTxnRequest(request: RequestChannel.Request): Unit = {
+    ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     val endTxnRequest = request.body[EndTxnRequest]
     val transactionalId = endTxnRequest.transactionalId
 
@@ -1477,6 +1478,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = {
+    ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     authorizeClusterAction(request)
     val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest]
     val errors = new ConcurrentHashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]()
@@ -1538,7 +1540,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def ensureInterBrokerVersion(version: ApiVersion): Unit = {
+    if (config.interBrokerProtocolVersion < version)
+      throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version}
is less than the required version: ${version.version}")
+  }
+
   def handleAddPartitionToTxnRequest(request: RequestChannel.Request): Unit = {
+    ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
     val transactionalId = addPartitionsToTxnRequest.transactionalId
     val partitionsToAdd = addPartitionsToTxnRequest.partitions
@@ -1593,6 +1601,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = {
+    ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     val addOffsetsToTxnRequest = request.body[AddOffsetsToTxnRequest]
     val transactionalId = addOffsetsToTxnRequest.transactionalId
     val groupId = addOffsetsToTxnRequest.consumerGroupId
@@ -1624,6 +1633,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = {
+    ensureInterBrokerVersion(KAFKA_0_11_0_IV0)
     val header = request.header
     val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest]
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
new file mode 100644
index 0000000..d22a5a0
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server
+
+
+import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_0_11_0_IV0}
+import kafka.controller.KafkaController
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.TransactionCoordinator
+import kafka.network.RequestChannel
+import kafka.security.auth.Authorizer
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server._
+import kafka.utils.{MockTime, TestUtils, ZkUtils}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.UnsupportedVersionException
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{AbstractRequestResponse, AddPartitionsToTxnRequest,
RequestHeader}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.Utils
+import org.easymock.EasyMock
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters
+
+
+class KafkaApisTest {
+
+  private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
+  private val replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
+  private val groupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
+  private val adminManager = EasyMock.createNiceMock(classOf[AdminManager])
+  private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
+  private val controller = EasyMock.createNiceMock(classOf[KafkaController])
+  private val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+  private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
+  private val metrics = new Metrics()
+  private val brokerId = 1
+  private val authorizer: Option[Authorizer] = None
+  private val quotas = EasyMock.createNiceMock(classOf[QuotaManagers])
+  private val brokerTopicStats = new BrokerTopicStats
+  private val clusterId = "clusterId"
+  private val time = new MockTime
+
+
+
+  def createKafkaApis(interBrokerProtocolVersion: ApiVersion): KafkaApis = {
+    val properties = TestUtils.createBrokerConfig(brokerId, "zk")
+    properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString)
+    properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString)
+    new KafkaApis(requestChannel,
+      replicaManager,
+      adminManager,
+      groupCoordinator,
+      txnCoordinator,
+      controller,
+      zkUtils,
+      brokerId,
+      new KafkaConfig(properties),
+      metadataCache,
+      metrics,
+      authorizer,
+      quotas,
+      brokerTopicStats,
+      clusterId,
+      time
+    )
+  }
+
+  @Test(expected = classOf[UnsupportedVersionException])
+  def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported():
Unit = {
+    createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null)
+  }
+
+  @Test(expected = classOf[UnsupportedVersionException])
+  def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported():
Unit = {
+    createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null)
+  }
+
+  @Test(expected = classOf[UnsupportedVersionException])
+  def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported():
Unit = {
+    createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null)
+  }
+
+  @Test(expected = classOf[UnsupportedVersionException])
+  def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported():
Unit = {
+    createKafkaApis(KAFKA_0_10_2_IV0).handleEndTxnRequest(null)
+  }
+
+  @Test(expected = classOf[UnsupportedVersionException])
+  def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported():
Unit = {
+    createKafkaApis(KAFKA_0_10_2_IV0).handleWriteTxnMarkersRequest(null)
+  }
+  
+}


Mime
View raw message