kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-2115; Error updating metrics in RequestChannel; patched by Gwen Shapira; reviewed by Jun Rao
Date Mon, 13 Apr 2015 14:22:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fb86cf633 -> aa365639b


kafka-2115; Error updating metrics in RequestChannel; patched by Gwen Shapira; reviewed by
Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa365639
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa365639
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa365639

Branch: refs/heads/trunk
Commit: aa365639b2dd36d8feff761f09208154ce71964d
Parents: fb86cf6
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Mon Apr 13 09:21:53 2015 -0500
Committer: Jun Rao <junrao@gmail.com>
Committed: Mon Apr 13 09:21:53 2015 -0500

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/ApiKeys.java   | 24 +++++++++++---------
 .../apache/kafka/common/protocol/Protocol.java  |  4 ++++
 .../scala/kafka/network/RequestChannel.scala    |  6 ++---
 3 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aa365639/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 07aba71..b39e9bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -20,17 +20,19 @@ package org.apache.kafka.common.protocol;
  * Identifiers for all the Kafka APIs
  */
 public enum ApiKeys {
-    PRODUCE(0, "produce"),
-    FETCH(1, "fetch"),
-    LIST_OFFSETS(2, "list_offsets"),
-    METADATA(3, "metadata"),
-    LEADER_AND_ISR(4, "leader_and_isr"),
-    STOP_REPLICA(5, "stop_replica"),
-    OFFSET_COMMIT(8, "offset_commit"),
-    OFFSET_FETCH(9, "offset_fetch"),
-    CONSUMER_METADATA(10, "consumer_metadata"),
-    JOIN_GROUP(11, "join_group"),
-    HEARTBEAT(12, "heartbeat");
+    PRODUCE(0, "Produce"),
+    FETCH(1, "Fetch"),
+    LIST_OFFSETS(2, "Offsets"),
+    METADATA(3, "Metadata"),
+    LEADER_AND_ISR(4, "LeaderAndIsr"),
+    STOP_REPLICA(5, "StopReplica"),
+    UPDATE_METADATA_KEY(6, "UpdateMetadata"),
+    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
+    OFFSET_COMMIT(8, "OffsetCommit"),
+    OFFSET_FETCH(9, "OffsetFetch"),
+    CONSUMER_METADATA(10, "ConsumerMetadata"),
+    JOIN_GROUP(11, "JoinGroup"),
+    HEARTBEAT(12, "Heartbeat");
 
     private static ApiKeys[] codeToType;
     public static final int MAX_API_KEY;

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa365639/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 9c4518e..d53fe45 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -404,6 +404,8 @@ public class Protocol {
         REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
         REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
         REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+        REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
+        REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
         REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
         REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
         REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
@@ -416,6 +418,8 @@ public class Protocol {
         RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
         RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
         RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+        RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
+        RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
         RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
         RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa365639/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1d9c57b..1d0024c 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -26,7 +26,7 @@ import kafka.common.TopicAndPartition
 import kafka.utils.{Logging, SystemTime}
 import kafka.message.ByteBufferMessageSet
 import java.net._
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader}
 import org.apache.log4j.Logger
 
@@ -82,7 +82,7 @@ object RequestChannel extends Logging {
       val responseQueueTime = (responseDequeueTimeMs - responseCompleteTimeMs).max(0L)
       val responseSendTime = (endTimeMs - responseDequeueTimeMs).max(0L)
       val totalTime = endTimeMs - startTimeMs
-      var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId)))
+      var metricsList = List(RequestMetrics.metricsMap(ApiKeys.forId(requestId).name))
       if (requestId == RequestKeys.FetchKey) {
         val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
         metricsList ::= ( if (isFromFollower)
@@ -207,7 +207,7 @@ object RequestMetrics {
   val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
   val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Consumer"
   val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Follower"
-  (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
+  (ApiKeys.values().toList.map(e => e.name)
     ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name,
new RequestMetrics(name)))
 }
 


Mime
View raw message