kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject [16/21] kafka git commit: KAFKA-2998: log warnings when client is disconnected from bootstrap brokers
Date Wed, 06 Apr 2016 00:09:23 GMT
KAFKA-2998: log warnings when client is disconnected from bootstrap brokers

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Grant Henke, Guozhang Wang

Closes #769 from hachikuji/KAFKA-2998


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c36268f7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c36268f7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c36268f7

Branch: refs/heads/0.10.0
Commit: c36268f77fbf7f6a47a1e09ec3e38c20173a06c5
Parents: 9897813
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Apr 4 21:28:59 2016 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Apr 5 17:08:53 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java    |  8 ++++++++
 .../main/java/org/apache/kafka/common/Cluster.java | 17 +++++++++++++++--
 2 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c36268f7/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d22b508..d2eaace 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -556,6 +556,14 @@ public class NetworkClient implements KafkaClient {
             ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
 
             if (requestKey == ApiKeys.METADATA) {
+                Cluster cluster = metadata.fetch();
+                if (cluster.isBootstrapConfigured()) {
+                    int nodeId = Integer.parseInt(request.request().destination());
+                    Node node = cluster.nodeById(nodeId);
+                    if (node != null)
+                        log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port());
+                }
+
                 metadataFetchInProgress = false;
                 return true;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c36268f7/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 8e85df8..e1bf581 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -29,6 +29,7 @@ import java.util.Set;
  */
 public final class Cluster {
 
+    private final boolean isBootstrapConfigured;
     private final List<Node> nodes;
     private final Set<String> unauthorizedTopics;
     private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
@@ -45,11 +46,19 @@ public final class Cluster {
     public Cluster(Collection<Node> nodes,
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics) {
+        this(false, nodes, partitions, unauthorizedTopics);
+    }
+
+    private Cluster(boolean isBootstrapConfigured,
+                    Collection<Node> nodes,
+                    Collection<PartitionInfo> partitions,
+                    Set<String> unauthorizedTopics) {
+        this.isBootstrapConfigured = isBootstrapConfigured;
+
         // make a randomized, unmodifiable copy of the nodes
         List<Node> copy = new ArrayList<>(nodes);
         Collections.shuffle(copy);
         this.nodes = Collections.unmodifiableList(copy);
-        
         this.nodesById = new HashMap<>();
         for (Node node : nodes)
             this.nodesById.put(node.id(), node);
@@ -115,7 +124,7 @@ public final class Cluster {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
-        return new Cluster(nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
+        return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet());
     }
 
     /**
@@ -214,6 +223,10 @@ public final class Cluster {
         return unauthorizedTopics;
     }
 
+    public boolean isBootstrapConfigured() {
+        return isBootstrapConfigured;
+    }
+
     @Override
     public String toString() {
         return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values()
+ ")";


Mime
View raw message