kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: KAFKA-4990; Request/response classes for transactions (KIP-98)
Date Fri, 07 Apr 2017 11:08:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2f4f3b957 -> 865d82af2


http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
new file mode 100644
index 0000000..916dbab
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.CollectionUtils;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class WriteTxnMarkersResponse extends AbstractResponse {
+    private static final String TXN_MARKER_ENTRY_KEY_NAME = "transaction_markers";
+
+    private static final String PID_KEY_NAME = "producer_id";
+    private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
+    private static final String PARTITIONS_KEY_NAME = "partitions";
+    private static final String TOPIC_KEY_NAME = "topic";
+    private static final String PARTITION_KEY_NAME = "partition";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+
+    // Possible error codes:
+    //   CorruptRecord
+    //   InvalidProducerEpoch
+    //   UnknownTopicOrPartition
+    //   NotLeaderForPartition
+    //   MessageTooLarge
+    //   RecordListTooLarge
+    //   NotEnoughReplicas
+    //   NotEnoughReplicasAfterAppend
+    //   InvalidRequiredAcks
+
+    private final Map<Long, Map<TopicPartition, Errors>> errors;
+
+    public WriteTxnMarkersResponse(Map<Long, Map<TopicPartition, Errors>> errors)
{
+        this.errors = errors;
+    }
+
+    public WriteTxnMarkersResponse(Struct struct) {
+        Map<Long, Map<TopicPartition, Errors>> errors = new HashMap<>();
+
+        Object[] responseArray = struct.getArray(TXN_MARKER_ENTRY_KEY_NAME);
+        for (Object responseObj : responseArray) {
+            Struct responseStruct = (Struct) responseObj;
+
+            long producerId = responseStruct.getLong(PID_KEY_NAME);
+
+            Map<TopicPartition, Errors> errorPerPartition = new HashMap<>();
+            Object[] topicPartitionsArray = responseStruct.getArray(TOPIC_PARTITIONS_KEY_NAME);
+            for (Object topicPartitionObj : topicPartitionsArray) {
+                Struct topicPartitionStruct = (Struct) topicPartitionObj;
+                String topic = topicPartitionStruct.getString(TOPIC_KEY_NAME);
+                for (Object partitionObj : topicPartitionStruct.getArray(PARTITIONS_KEY_NAME))
{
+                    Struct partitionStruct = (Struct) partitionObj;
+                    Integer partition = partitionStruct.getInt(PARTITION_KEY_NAME);
+                    Errors error = Errors.forCode(partitionStruct.getShort(ERROR_CODE_KEY_NAME));
+                    errorPerPartition.put(new TopicPartition(topic, partition), error);
+                }
+            }
+            errors.put(producerId, errorPerPartition);
+        }
+
+        this.errors = errors;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.WRITE_TXN_MARKERS.responseSchema(version));
+
+        Object[] responsesArray = new Object[errors.size()];
+        int k = 0;
+        for (Map.Entry<Long, Map<TopicPartition, Errors>> responseEntry : errors.entrySet())
{
+            Struct responseStruct = struct.instance(TXN_MARKER_ENTRY_KEY_NAME);
+            responseStruct.set(PID_KEY_NAME, responseEntry.getKey());
+
+            Map<TopicPartition, Errors> partitionAndErrors = responseEntry.getValue();
+            Map<String, Map<Integer, Errors>> mappedPartitions = CollectionUtils.groupDataByTopic(partitionAndErrors);
+            Object[] partitionsArray = new Object[mappedPartitions.size()];
+            int i = 0;
+            for (Map.Entry<String, Map<Integer, Errors>> topicAndPartitions :
mappedPartitions.entrySet()) {
+                Struct topicPartitionsStruct = responseStruct.instance(TOPIC_PARTITIONS_KEY_NAME);
+                topicPartitionsStruct.set(TOPIC_KEY_NAME, topicAndPartitions.getKey());
+                Map<Integer, Errors> partitionIdAndErrors = topicAndPartitions.getValue();
+
+                Object[] partitionAndErrorsArray = new Object[partitionIdAndErrors.size()];
+                int j = 0;
+                for (Map.Entry<Integer, Errors> partitionAndError : partitionIdAndErrors.entrySet())
{
+                    Struct partitionAndErrorStruct = topicPartitionsStruct.instance(PARTITIONS_KEY_NAME);
+                    partitionAndErrorStruct.set(PARTITION_KEY_NAME, partitionAndError.getKey());
+                    partitionAndErrorStruct.set(ERROR_CODE_KEY_NAME, partitionAndError.getValue().code());
+                    partitionAndErrorsArray[j++] = partitionAndErrorStruct;
+                }
+                topicPartitionsStruct.set(PARTITIONS_KEY_NAME, partitionAndErrorsArray);
+                partitionsArray[i++] = topicPartitionsStruct;
+            }
+            responseStruct.set(TOPIC_PARTITIONS_KEY_NAME, partitionsArray);
+
+            responsesArray[k++] = responseStruct;
+        }
+
+        struct.set(TXN_MARKER_ENTRY_KEY_NAME, responsesArray);
+        return struct;
+    }
+
+    public Map<TopicPartition, Errors> errors(long producerId) {
+        return errors.get(producerId);
+    }
+
+    public static WriteTxnMarkersResponse parse(ByteBuffer buffer, short version) {
+        return new WriteTxnMarkersResponse(ApiKeys.WRITE_TXN_MARKERS.parseResponse(version,
buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2995882..2e1a79d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -30,10 +30,10 @@ import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.InvalidRecordException;
-import org.apache.kafka.common.record.SimpleRecord;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.Test;
@@ -136,9 +136,27 @@ public class RequestResponseTest {
         checkRequest(createDeleteTopicsRequest());
         checkErrorResponse(createDeleteTopicsRequest(), new UnknownServerException());
         checkResponse(createDeleteTopicsResponse(), 0);
+
         checkRequest(createInitPidRequest());
         checkErrorResponse(createInitPidRequest(), new UnknownServerException());
         checkResponse(createInitPidResponse(), 0);
+
+        checkRequest(createAddPartitionsToTxnRequest());
+        checkResponse(createAddPartitionsToTxnResponse(), 0);
+        checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException());
+        checkRequest(createAddOffsetsToTxnRequest());
+        checkResponse(createAddOffsetsToTxnResponse(), 0);
+        checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException());
+        checkRequest(createEndTxnRequest());
+        checkResponse(createEndTxnResponse(), 0);
+        checkErrorResponse(createEndTxnRequest(), new UnknownServerException());
+        checkRequest(createWriteTxnMarkersRequest());
+        checkResponse(createWriteTxnMarkersResponse(), 0);
+        checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException());
+        checkRequest(createTxnOffsetCommitRequest());
+        checkResponse(createTxnOffsetCommitResponse(), 0);
+        checkErrorResponse(createTxnOffsetCommitRequest(), new UnknownServerException());
+
         checkOlderFetchVersions();
         checkResponse(createMetadataResponse(), 0);
         checkResponse(createMetadataResponse(), 1);
@@ -166,6 +184,21 @@ public class RequestResponseTest {
         checkRequest(createLeaderEpochRequest());
         checkResponse(createLeaderEpochResponse(), 0);
         checkErrorResponse(createLeaderEpochRequest(), new UnknownServerException());
+        checkRequest(createAddPartitionsToTxnRequest());
+        checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException());
+        checkResponse(createAddPartitionsToTxnResponse(), 0);
+        checkRequest(createAddOffsetsToTxnRequest());
+        checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException());
+        checkResponse(createAddOffsetsToTxnResponse(), 0);
+        checkRequest(createEndTxnRequest());
+        checkErrorResponse(createEndTxnRequest(), new UnknownServerException());
+        checkResponse(createEndTxnResponse(), 0);
+        checkRequest(createWriteTxnMarkersRequest());
+        checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException());
+        checkResponse(createWriteTxnMarkersResponse(), 0);
+        checkRequest(createTxnOffsetCommitRequest());
+        checkErrorResponse(createTxnOffsetCommitRequest(), new UnknownServerException());
+        checkResponse(createTxnOffsetCommitResponse(), 0);
     }
 
     @Test
@@ -821,6 +854,58 @@ public class RequestResponseTest {
         return new OffsetsForLeaderEpochResponse(epochs);
     }
 
+    private AddPartitionsToTxnRequest createAddPartitionsToTxnRequest() {
+        return new AddPartitionsToTxnRequest.Builder("tid", 21L, (short) 42,
+            Collections.singletonList(new TopicPartition("topic", 73))).build();
+    }
+
+    private AddPartitionsToTxnResponse createAddPartitionsToTxnResponse() {
+        return new AddPartitionsToTxnResponse(Errors.NONE);
+    }
+
+    private AddOffsetsToTxnRequest createAddOffsetsToTxnRequest() {
+        return new AddOffsetsToTxnRequest.Builder("tid", 21L, (short) 42, "gid").build();
+    }
+
+    private AddOffsetsToTxnResponse createAddOffsetsToTxnResponse() {
+        return new AddOffsetsToTxnResponse(Errors.NONE);
+    }
+
+    private EndTxnRequest createEndTxnRequest() {
+        return new EndTxnRequest.Builder("tid", 21L, (short) 42, TransactionResult.COMMIT).build();
+    }
+
+    private EndTxnResponse createEndTxnResponse() {
+        return new EndTxnResponse(Errors.NONE);
+    }
+
+    private WriteTxnMarkersRequest createWriteTxnMarkersRequest() {
+        return new WriteTxnMarkersRequest.Builder(73,
+            Collections.singletonList(new WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short)
42, TransactionResult.ABORT,
+                Collections.singletonList(new TopicPartition("topic", 73))))).build();
+    }
+
+    private WriteTxnMarkersResponse createWriteTxnMarkersResponse() {
+        final Map<TopicPartition, Errors> errorPerPartitions = new HashMap<>();
+        errorPerPartitions.put(new TopicPartition("topic", 73), Errors.NONE);
+        final Map<Long, Map<TopicPartition, Errors>> response = new HashMap<>();
+        response.put(21L, errorPerPartitions);
+        return new WriteTxnMarkersResponse(response);
+    }
+
+    private TxnOffsetCommitRequest createTxnOffsetCommitRequest() {
+        final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> offsets =
new HashMap<>();
+        offsets.put(new TopicPartition("topic", 73),
+                    new TxnOffsetCommitRequest.CommittedOffset(100, null));
+        return new TxnOffsetCommitRequest.Builder("gid", 21L, (short) 42, 73, offsets).build();
+    }
+
+    private TxnOffsetCommitResponse createTxnOffsetCommitResponse() {
+        final Map<TopicPartition, Errors> errorPerPartitions = new HashMap<>();
+        errorPerPartitions.put(new TopicPartition("topic", 73), Errors.NONE);
+        return new TxnOffsetCommitResponse(errorPerPartitions);
+    }
+
     private static class ByteBufferChannel implements GatheringByteChannel {
         private final ByteBuffer buf;
         private boolean closed = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/865d82af/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1e8900b..c75e1b9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -31,7 +31,7 @@ import kafka.controller.KafkaController
 import kafka.coordinator.{GroupCoordinator, InitPidResult, JoinGroupResult, TransactionCoordinator}
 import kafka.log._
 import kafka.network._
-import kafka.network.RequestChannel.{Response, Session}
+import kafka.network.RequestChannel.{Request, Response, Session}
 import kafka.security.auth
 import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation,
Read, Resource, Write}
 import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
@@ -40,7 +40,7 @@ import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
-import org.apache.kafka.common.record.{RecordBatch, MemoryRecords, TimestampType}
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, TimestampType}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
@@ -104,7 +104,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
         case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request)
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
-        case requestId => throw new KafkaException("Unknown api code " + requestId)
+        case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
+        case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
+        case ApiKeys.END_TXN => handleEndTxnRequest(request)
+        case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
+        case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -1323,6 +1327,28 @@ class KafkaApis(val requestChannel: RequestChannel,
     txnCoordinator.handleInitPid(initPidRequest.transactionalId, sendResponseCallback)
   }
 
+  def handleEndTxnRequest(request: Request): Unit = {
+    requestChannel.sendResponse(new RequestChannel.Response(request, new EndTxnResponse(Errors.UNSUPPORTED_VERSION)))
+  }
+
+  def handleAddPartitionToTxnRequest(request: Request): Unit = {
+    requestChannel.sendResponse(new RequestChannel.Response(request, new AddPartitionsToTxnResponse(Errors.UNSUPPORTED_VERSION)))
+  }
+
+  def handleAddOffsetsToTxnRequest(request: Request): Unit = {
+    requestChannel.sendResponse(new RequestChannel.Response(request, new AddOffsetsToTxnResponse(Errors.UNSUPPORTED_VERSION)))
+  }
+
+  def handleWriteTxnMarkersRequest(request: Request): Unit = {
+    val emptyResponse = new java.util.HashMap[java.lang.Long, java.util.Map[TopicPartition,
Errors]]()
+    requestChannel.sendResponse(new RequestChannel.Response(request, new WriteTxnMarkersResponse(emptyResponse)))
+  }
+
+  def handleTxnOffsetCommitRequest(request: Request): Unit = {
+    val emptyResponse = new java.util.HashMap[TopicPartition, Errors]()
+    requestChannel.sendResponse(new RequestChannel.Response(request, new TxnOffsetCommitResponse(emptyResponse)))
+  }
+
   def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
     val offsetForEpoch = request.body[OffsetsForLeaderEpochRequest]
     val requestInfo = offsetForEpoch.epochsByTopicPartition()


Mime
View raw message