kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7890: Invalidate ClusterConnectionState cache for a broker if the hostname of the broker changes. (#6215)
Date Tue, 05 Feb 2019 12:30:16 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7debda4  KAFKA-7890: Invalidate ClusterConnectionState cache for a broker if the
hostname of the broker changes. (#6215)
7debda4 is described below

commit 7debda4d41e7c951fe54b8b916d4b87cc7843348
Author: Mark Cho <markcho.011@gmail.com>
AuthorDate: Tue Feb 5 04:29:54 2019 -0800

    KAFKA-7890: Invalidate ClusterConnectionState cache for a broker if the hostname of the
broker changes. (#6215)
---
 .../kafka/clients/ClusterConnectionStates.java     | 25 ++++++++++++++++------
 .../org/apache/kafka/clients/NetworkClient.java    |  2 +-
 .../kafka/clients/ClusterConnectionStatesTest.java | 15 ++++++++++++-
 3 files changed, 34 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 09274be..376b35d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -36,8 +38,10 @@ final class ClusterConnectionStates {
     private final static int RECONNECT_BACKOFF_EXP_BASE = 2;
     private final double reconnectBackoffMaxExp;
     private final Map<String, NodeConnectionState> nodeState;
+    private final Logger log;
 
-    public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs) {
+    public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMaxMs, LogContext
logContext) {
+        this.log = logContext.logger(ClusterConnectionStates.class);
         this.reconnectBackoffInitMs = reconnectBackoffMs;
         this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
         this.reconnectBackoffMaxExp = Math.log(this.reconnectBackoffMaxMs / (double) Math.max(reconnectBackoffMs,
1)) / Math.log(RECONNECT_BACKOFF_EXP_BASE);
@@ -110,15 +114,20 @@ final class ClusterConnectionStates {
      * @throws UnknownHostException 
      */
     public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup)
throws UnknownHostException {
-        if (nodeState.containsKey(id)) {
-            NodeConnectionState connectionState = nodeState.get(id);
+        NodeConnectionState connectionState = nodeState.get(id);
+        if (connectionState != null && connectionState.host().equals(host)) {
             connectionState.lastConnectAttemptMs = now;
             connectionState.state = ConnectionState.CONNECTING;
             connectionState.moveToNextAddress();
-        } else {
-            nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
-                this.reconnectBackoffInitMs, host, clientDnsLookup));
+            return;
+        } else if (connectionState != null) {
+            log.info("Hostname for node {} changed from {} to {}.", id, connectionState.host(),
host);
         }
+
+        // Create a new NodeConnectionState if nodeState does not already contain one
+        // for the specified id or if the hostname associated with the node id changed.
+        nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
+            this.reconnectBackoffInitMs, host, clientDnsLookup));
     }
 
     public InetAddress currentAddress(String id) {
@@ -361,6 +370,10 @@ final class ClusterConnectionStates {
             this.clientDnsLookup = clientDnsLookup;
         }
 
+        public String host() {
+            return host;
+        }
+
         public InetAddress currentAddress() {
             return addresses.get(index);
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3973701..e7ba0e1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -251,7 +251,7 @@ public class NetworkClient implements KafkaClient {
         this.selector = selector;
         this.clientId = clientId;
         this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
-        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax);
+        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
logContext);
         this.socketSendBuffer = socketSendBuffer;
         this.socketReceiveBuffer = socketReceiveBuffer;
         this.correlation = 0;
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 5f57482..79afb75 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -30,6 +30,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,7 +49,7 @@ public class ClusterConnectionStatesTest {
 
     @Before
     public void setup() {
-        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax);
+        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
new LogContext());
     }
 
     @Test
@@ -293,8 +294,20 @@ public class ClusterConnectionStatesTest {
         hostField.setAccessible(true);
         hostField.set(nodeState, "localhost");
 
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        InetAddress addr2 = connectionStates.currentAddress(nodeId1);
+
+        assertNotSame(addr1, addr2);
+    }
+
+    @Test
+    public void testNodeWithNewHostname() throws UnknownHostException {
+        connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", ClientDnsLookup.DEFAULT);
+        InetAddress addr1 = connectionStates.currentAddress(nodeId1);
+
         connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
         InetAddress addr2 = connectionStates.currentAddress(nodeId1);
+
         assertNotSame(addr1, addr2);
     }
 }


Mime
View raw message