kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-3341: Improve error handling on invalid requests
Date Mon, 07 Mar 2016 19:34:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5761a9ec7 -> 94c234341


KAFKA-3341: Improve error handling on invalid requests

* Include request id when parsing of request header fails
* Don't mute selector on a connection that was closed due to an error (otherwise a second
exception is thrown)
* Throw appropriate exception from `ApiKeys.fromId` if invalid id is passed
* Fail fast in `AbstractRequest.getRequest` if we fail to handle an instance of `ApiKeys`
(if this happens, it's a programmer error and the code in `getRequest` needs to be updated)

I ran into the top two issues while trying to figure out why a connection from a producer
to a broker was failing (and it made things harder than necessary). While fixing them, I noticed
the third and fourth issues.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira

Closes #1017 from ijuma/kafka-3341-improve-error-handling-invalid-requests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94c23434
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94c23434
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94c23434

Branch: refs/heads/trunk
Commit: 94c234341907b705ec34dd2b390899435286e242
Parents: 5761a9e
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon Mar 7 11:34:12 2016 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Mon Mar 7 11:34:12 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/ApiKeys.java   | 19 ++++++-----
 .../kafka/common/requests/AbstractRequest.java  |  6 ++--
 .../kafka/common/protocol/ApiKeysTest.java      | 34 ++++++++++++++++++++
 .../scala/kafka/network/RequestChannel.scala    | 11 ++++---
 .../main/scala/kafka/network/SocketServer.scala |  2 +-
 5 files changed, 57 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/94c23434/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 5bd3c96..708e1f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -38,18 +38,18 @@ public enum ApiKeys {
     DESCRIBE_GROUPS(15, "DescribeGroups"),
     LIST_GROUPS(16, "ListGroups");
 
-    private static ApiKeys[] codeToType;
+    private static final ApiKeys[] ID_TO_TYPE;
+    private static final int MIN_API_KEY = 0;
     public static final int MAX_API_KEY;
 
     static {
         int maxKey = -1;
-        for (ApiKeys key : ApiKeys.values()) {
+        for (ApiKeys key : ApiKeys.values())
             maxKey = Math.max(maxKey, key.id);
-        }
-        codeToType = new ApiKeys[maxKey + 1];
-        for (ApiKeys key : ApiKeys.values()) {
-            codeToType[key.id] = key;
-        }
+        ApiKeys[] idToType = new ApiKeys[maxKey + 1];
+        for (ApiKeys key : ApiKeys.values())
+            idToType[key.id] = key;
+        ID_TO_TYPE = idToType;
         MAX_API_KEY = maxKey;
     }
 
@@ -65,6 +65,9 @@ public enum ApiKeys {
     }
 
     public static ApiKeys forId(int id) {
-        return codeToType[id];
+        if (id < MIN_API_KEY || id > MAX_API_KEY)
+            throw new IllegalArgumentException(String.format("Unexpected ApiKeys id `%s`,
it should be between `%s` " +
+                    "and `%s` (inclusive)", id, MIN_API_KEY, MAX_API_KEY));
+        return ID_TO_TYPE[id];
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/94c23434/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 8dfa3f6..5a40b7f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -36,7 +36,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
      * Factory method for getting a request object based on ApiKey ID and a buffer
      */
     public static AbstractRequest getRequest(int requestId, int versionId, ByteBuffer buffer)
{
-        switch (ApiKeys.forId(requestId)) {
+        ApiKeys apiKey = ApiKeys.forId(requestId);
+        switch (apiKey) {
             case PRODUCE:
                 return ProduceRequest.parse(buffer, versionId);
             case FETCH:
@@ -72,7 +73,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             case LIST_GROUPS:
                 return ListGroupsRequest.parse(buffer, versionId);
             default:
-                return null;
+                throw new AssertionError(String.format("ApiKey %s is not currently handled
in `getRequest`, the " +
+                        "code should be updated to do so.", apiKey));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/94c23434/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
new file mode 100644
index 0000000..f177ae6
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.junit.Test;
+
+public class ApiKeysTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testForIdWithInvalidIdLow() {
+        ApiKeys.forId(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testForIdWithInvalidIdHigh() {
+        ApiKeys.forId(10000);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/94c23434/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 2e63b53..219e2fb 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -78,14 +78,17 @@ object RequestChannel extends Logging {
     val header: RequestHeader =
       if (requestObj == null) {
         buffer.rewind
-        RequestHeader.parse(buffer)
+        try RequestHeader.parse(buffer)
+        catch {
+          case ex: Throwable =>
+            throw new InvalidRequestException(s"Error parsing request header. Our best guess
of the apiKey is: $requestId", ex)
+        }
       } else
         null
     val body: AbstractRequest =
       if (requestObj == null)
-        try {
-          AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
-        } catch {
+        try AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
+        catch {
           case ex: Throwable =>
             throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey}
and apiVersion: ${header.apiVersion}", ex)
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/94c23434/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index c3ecd75..f6c3036 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -425,13 +425,13 @@ private[kafka] class Processor(val id: Int,
               channel.socketAddress)
             val req = RequestChannel.Request(processor = id, connectionId = receive.source,
session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol
= protocol)
             requestChannel.sendRequest(req)
+            selector.mute(receive.source)
           } catch {
             case e @ (_: InvalidRequestException | _: SchemaException) =>
               // note that even though we got an exception, we can assume that receive.source
is valid. Issues with constructing a valid receive object were handled earlier
               error("Closing socket for " + receive.source + " because of error", e)
               close(selector, receive.source)
           }
-          selector.mute(receive.source)
         }
 
         selector.completedSends.asScala.foreach { send =>


Mime
View raw message