kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rsiva...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7755; Look up client host name since DNS entry may have changed (#6049)
Date Mon, 07 Jan 2019 13:37:18 GMT
This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2227664  KAFKA-7755; Look up client host name since DNS entry may have changed (#6049)
2227664 is described below

commit 2227664845a537393a8f4640e59373da6ab91bdc
Author: hackerwin7 <hackerswin7@gmail.com>
AuthorDate: Mon Jan 7 21:33:58 2019 +0800

    KAFKA-7755; Look up client host name since DNS entry may have changed (#6049)
    
    Lookup client host name after every full iteration through the addresses returned.
    
    Reviewers: Loïc Monney <loicmonney@github.com>, Edoardo Comar <ecomar@uk.ibm.com>,
Rajini Sivaram <rajinisivaram@googlemail.com>
---
 .../kafka/clients/ClusterConnectionStates.java     | 16 +++++++++++-----
 .../kafka/clients/ClusterConnectionStatesTest.java | 22 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index f198533..09274be 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -117,7 +117,7 @@ final class ClusterConnectionStates {
             connectionState.moveToNextAddress();
         } else {
             nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now,
-                this.reconnectBackoffInitMs, ClientUtils.resolve(host, clientDnsLookup)));
+                this.reconnectBackoffInitMs, host, clientDnsLookup));
         }
     }
 
@@ -343,18 +343,22 @@ final class ClusterConnectionStates {
         long reconnectBackoffMs;
         // Connection is being throttled if current time < throttleUntilTimeMs.
         long throttleUntilTimeMs;
-        private final List<InetAddress> addresses;
+        private List<InetAddress> addresses;
         private int index = 0;
+        private final String host;
+        private final ClientDnsLookup clientDnsLookup;
 
         public NodeConnectionState(ConnectionState state, long lastConnectAttempt, long reconnectBackoffMs,

-                List<InetAddress> addresses) {
+                String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException
{
             this.state = state;
-            this.addresses = addresses;
+            this.addresses = ClientUtils.resolve(host, clientDnsLookup);
             this.authenticationException = null;
             this.lastConnectAttemptMs = lastConnectAttempt;
             this.failedAttempts = 0;
             this.reconnectBackoffMs = reconnectBackoffMs;
             this.throttleUntilTimeMs = 0;
+            this.host = host;
+            this.clientDnsLookup = clientDnsLookup;
         }
 
         public InetAddress currentAddress() {
@@ -364,8 +368,10 @@ final class ClusterConnectionStates {
         /*
          * implementing a ring buffer with the addresses
          */
-        public void moveToNextAddress() {
+        public void moveToNextAddress() throws UnknownHostException {
             index = (index + 1) % addresses.size();
+            if (index == 0)
+                addresses = ClientUtils.resolve(host, clientDnsLookup);
         }
 
         public String toString() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 23edaa9..5f57482 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
@@ -275,4 +277,24 @@ public class ClusterConnectionStatesTest {
         InetAddress addr3 = connectionStates.currentAddress(nodeId1);
         assertSame(addr1, addr3);
     }
+
+    @Test
+    public void testHostResolveChange() throws UnknownHostException, ReflectiveOperationException
{
+        assertEquals(2, ClientUtils.resolve(hostTwoIps, ClientDnsLookup.USE_ALL_DNS_IPS).size());
+
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
+        InetAddress addr1 = connectionStates.currentAddress(nodeId1);
+
+        // reflection to simulate host change in DNS lookup
+        Method nodeStateMethod = connectionStates.getClass().getDeclaredMethod("nodeState",
String.class);
+        nodeStateMethod.setAccessible(true);
+        Object nodeState = nodeStateMethod.invoke(connectionStates, nodeId1);
+        Field hostField = nodeState.getClass().getDeclaredField("host");
+        hostField.setAccessible(true);
+        hostField.set(nodeState, "localhost");
+
+        connectionStates.connecting(nodeId1, time.milliseconds(), hostTwoIps, ClientDnsLookup.DEFAULT);
+        InetAddress addr2 = connectionStates.currentAddress(nodeId1);
+        assertNotSame(addr1, addr2);
+    }
 }


Mime
View raw message