kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] branch 2.4 updated: MINOR: AbstractRequestResponse should be an interface (#7513)
Date Thu, 17 Oct 2019 16:26:01 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new f666cf3  MINOR: AbstractRequestResponse should be an interface (#7513)
f666cf3 is described below

commit f666cf37a5de88cac0842cd3e15613bb4940db88
Author: Colin Patrick McCabe <cmccabe@apache.org>
AuthorDate: Thu Oct 17 09:21:34 2019 -0700

    MINOR: AbstractRequestResponse should be an interface (#7513)
    
    AbstractRequestResponse should be an interface, since it has no concrete elements or implementation.
 Move AbstractRequestResponse#serialize to RequestUtils#serialize and make it package-private,
since it doesn't need to be public.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
    (cherry picked from commit 3cb8ccf63a3f88b4de5a2133b39fb4bf175f3532)
---
 .../apache/kafka/common/requests/AbstractRequest.java    |  4 ++--
 .../kafka/common/requests/AbstractRequestResponse.java   | 16 +---------------
 .../apache/kafka/common/requests/AbstractResponse.java   | 10 +++++-----
 .../org/apache/kafka/common/requests/RequestHeader.java  |  2 +-
 .../org/apache/kafka/common/requests/RequestUtils.java   | 11 ++++++++++-
 .../org/apache/kafka/common/requests/ResponseHeader.java |  2 +-
 .../kafka/clients/consumer/internals/FetcherTest.java    |  6 ++++--
 .../kafka/clients/producer/internals/SenderTest.java     |  5 +++--
 .../test/scala/unit/kafka/server/BaseRequestTest.scala   |  4 ++--
 9 files changed, 29 insertions(+), 31 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index b737b49..97bc728 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-public abstract class AbstractRequest extends AbstractRequestResponse {
+public abstract class AbstractRequest implements AbstractRequestResponse {
 
     public static abstract class Builder<T extends AbstractRequest> {
         private final ApiKeys apiKey;
@@ -100,7 +100,7 @@ public abstract class AbstractRequest extends AbstractRequestResponse
{
      * Use with care, typically {@link #toSend(String, RequestHeader)} should be used instead.
      */
     public ByteBuffer serialize(RequestHeader header) {
-        return serialize(header.toStruct(), toStruct());
+        return RequestUtils.serialize(header.toStruct(), toStruct());
     }
 
     protected abstract Struct toStruct();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
index 0ba373d..b02659d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
@@ -16,19 +16,5 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public abstract class AbstractRequestResponse {
-    /**
-     * Visible for testing.
-     */
-    public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) {
-        ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf());
-        headerStruct.writeTo(buffer);
-        bodyStruct.writeTo(buffer);
-        buffer.rewind();
-        return buffer;
-    }
+public interface AbstractRequestResponse {
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 8a6edf1..6470152 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -28,18 +28,18 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-public abstract class AbstractResponse extends AbstractRequestResponse {
+public abstract class AbstractResponse implements AbstractRequestResponse {
     public static final int DEFAULT_THROTTLE_TIME = 0;
 
     protected Send toSend(String destination, ResponseHeader header, short apiVersion) {
-        return new NetworkSend(destination, serialize(header.toStruct(), toStruct(apiVersion)));
+        return new NetworkSend(destination, RequestUtils.serialize(header.toStruct(), toStruct(apiVersion)));
     }
 
     /**
      * Visible for testing, typically {@link #toSend(String, ResponseHeader, short)} should
be used instead.
      */
-    public ByteBuffer serialize(ApiKeys apiKey, int correlationId) {
-        return serialize(apiKey, apiKey.latestVersion(), correlationId);
+    public ByteBuffer serialize(short version, ResponseHeader responseHeader) {
+        return RequestUtils.serialize(responseHeader.toStruct(), toStruct(version));
     }
 
     /**
@@ -48,7 +48,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
     public ByteBuffer serialize(ApiKeys apiKey, short version, int correlationId) {
         ResponseHeader header =
             new ResponseHeader(correlationId, apiKey.responseHeaderVersion(version));
-        return serialize(header.toStruct(), toStruct(version));
+        return RequestUtils.serialize(header.toStruct(), toStruct(version));
     }
 
     public abstract Map<Errors, Integer> errorCounts();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 8da1eb5..3d80c4e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -28,7 +28,7 @@ import java.nio.ByteBuffer;
 /**
  * The header for a request in the Kafka protocol
  */
-public class RequestHeader extends AbstractRequestResponse {
+public class RequestHeader implements AbstractRequestResponse {
     private final RequestHeaderData data;
     private final short headerVersion;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index b4a2420..c3dfaa1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.resource.ResourcePatternFilter;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.ResourceType;
 
+import java.nio.ByteBuffer;
 import java.util.Optional;
 
 import static org.apache.kafka.common.protocol.CommonFields.HOST;
@@ -42,7 +43,7 @@ import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYP
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
-final class RequestUtils {
+public final class RequestUtils {
 
     private RequestUtils() {}
 
@@ -122,4 +123,12 @@ final class RequestUtils {
             Optional.empty() : Optional.of(leaderEpoch);
         return leaderEpochOpt;
     }
+
+    public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) {
+        ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf());
+        headerStruct.writeTo(buffer);
+        bodyStruct.writeTo(buffer);
+        buffer.rewind();
+        return buffer;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
index 249b5d0..118e5d3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java
@@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
 /**
  * A response header in the kafka protocol.
  */
-public class ResponseHeader extends AbstractRequestResponse {
+public class ResponseHeader implements AbstractRequestResponse {
     private final ResponseHeaderData data;
     private final short headerVersion;
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d3fcbc3..fd1be37 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1999,7 +1999,7 @@ public class FetcherTest {
 
         ByteBuffer buffer = ApiVersionsResponse.
             createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).
-                serialize(ApiKeys.API_VERSIONS, 0);
+                serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0);
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(),
buffer)));
         while (!client.ready(node, time.milliseconds())) {
             client.poll(1, time.milliseconds());
@@ -2016,7 +2016,9 @@ public class FetcherTest {
             client.send(request, time.milliseconds());
             client.poll(1, time.milliseconds());
             FetchResponse response = fullFetchResponse(tp0, nextRecords, Errors.NONE, i,
throttleTimeMs);
-            buffer = response.serialize(ApiKeys.FETCH, request.correlationId());
+            buffer = response.serialize(ApiKeys.FETCH,
+                    ApiKeys.FETCH.latestVersion(),
+                    request.correlationId());
             selector.completeReceive(new NetworkReceive(node.idString(), buffer));
             client.poll(1, time.milliseconds());
             // If a throttled response is received, advance the time to ensure progress.
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 71180bd..1b35e0b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -275,7 +275,7 @@ public class SenderTest {
                 time, true, new ApiVersions(), throttleTimeSensor, logContext);
 
         ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).
-            serialize(ApiKeys.API_VERSIONS, 0);
+            serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0);
         selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(),
buffer)));
         while (!client.ready(node, time.milliseconds())) {
             client.poll(1, time.milliseconds());
@@ -292,7 +292,8 @@ public class SenderTest {
             client.send(request, time.milliseconds());
             client.poll(1, time.milliseconds());
             ProduceResponse response = produceResponse(tp0, i, Errors.NONE, throttleTimeMs);
-            buffer = response.serialize(ApiKeys.PRODUCE, request.correlationId());
+            buffer = response.
+                serialize(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion(), request.correlationId());
             selector.completeReceive(new NetworkReceive(node.idString(), buffer));
             client.poll(1, time.milliseconds());
             // If a throttled response is received, advance the time to ensure progress.
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 5d00d54..2cc3553 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -29,7 +29,7 @@ import kafka.network.SocketServer
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse, RequestHeader,
ResponseHeader}
+import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, RequestUtils, ResponseHeader}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
 abstract class BaseRequestTest extends IntegrationTestHarness {
@@ -170,7 +170,7 @@ abstract class BaseRequestTest extends IntegrationTestHarness {
     */
   def sendStructAndReceive(requestStruct: Struct, apiKey: ApiKeys, socket: Socket, apiVersion:
Short): ByteBuffer = {
     val header = nextRequestHeader(apiKey, apiVersion)
-    val serializedBytes = AbstractRequestResponse.serialize(header.toStruct, requestStruct).array
+    val serializedBytes = RequestUtils.serialize(header.toStruct, requestStruct).array
     val response = requestAndReceive(socket, serializedBytes)
     skipResponseHeader(response, apiKey.responseHeaderVersion(apiVersion))
   }


Mime
View raw message