kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2459: Mark last committed timestamp to fix connection backoff
Date Wed, 21 Oct 2015 16:59:53 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 44f6c4b94 -> 0785feeb0


KAFKA-2459: Mark last committed timestamp to fix connection backoff

This fix applies to three JIRAs, since they are all connected.

KAFKA-2459Connection backoff/blackout period should start when a connection is disconnected,
not when the connection attempt was initiated
Backoff when connection is disconnected

KAFKA-2615Poll() method is broken wrt time
Added Time through the NetworkClient API. Minimal change.

KAFKA-1843Metadata fetch/refresh in new producer should handle all node connection states
gracefully
I’ve partially addressed this for a specific failure case in the JIRA.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma, Guozhang Wang

Closes #290 from enothereska/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0785feeb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0785feeb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0785feeb

Branch: refs/heads/trunk
Commit: 0785feeb0fae2a4f75a59147197c5138109b1b39
Parents: 44f6c4b
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Wed Oct 21 10:04:49 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Oct 21 10:04:49 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/ClusterConnectionStates.java  |  4 +-
 .../org/apache/kafka/clients/NetworkClient.java | 88 ++++++++++++++++----
 .../kafka/clients/consumer/KafkaConsumer.java   |  2 +-
 .../internals/ConsumerNetworkClient.java        | 13 +--
 .../kafka/clients/producer/KafkaProducer.java   |  2 +-
 .../apache/kafka/clients/NetworkClientTest.java | 33 +++++++-
 .../org/apache/kafka/test/MockSelector.java     |  6 ++
 .../controller/ControllerChannelManager.scala   |  3 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  3 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  3 +-
 10 files changed, 129 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 6c58211..a8101da 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -115,10 +115,12 @@ final class ClusterConnectionStates {
     /**
      * Enter the disconnected state for the given node
      * @param id The connection we have disconnected
+     * @param now The current time
      */
-    public void disconnected(String id) {
+    public void disconnected(String id, long now) {
         NodeConnectionState nodeState = nodeState(id);
         nodeState.state = ConnectionState.DISCONNECTED;
+        nodeState.lastConnectAttemptMs = now;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 6f39ac9..4265004 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -15,8 +15,10 @@ package org.apache.kafka.clients;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
@@ -33,6 +35,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,7 +54,13 @@ public class NetworkClient implements KafkaClient {
     private final Selectable selector;
     
     private final MetadataUpdater metadataUpdater;
-
+    
+    /* a list of nodes we've connected to in the past */
+    private final List<Integer> nodesEverSeen;
+    private final Map<Integer, Node> nodesEverSeenById;
+    /* random offset into nodesEverSeen list */
+    private final Random randOffset;
+    
     /* the state of each node's connection */
     private final ClusterConnectionStates connectionStates;
 
@@ -75,6 +84,8 @@ public class NetworkClient implements KafkaClient {
 
     /* max time in ms for the producer to wait for acknowledgement from server*/
     private final int requestTimeoutMs;
+    
+    private final Time time;
 
     public NetworkClient(Selectable selector,
                          Metadata metadata,
@@ -83,9 +94,10 @@ public class NetworkClient implements KafkaClient {
                          long reconnectBackoffMs,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
-                         int requestTimeoutMs) {
+                         int requestTimeoutMs,
+                         Time time) {
         this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection,
-                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs);
+                reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs,
time);
     }
 
     public NetworkClient(Selectable selector,
@@ -95,9 +107,10 @@ public class NetworkClient implements KafkaClient {
                          long reconnectBackoffMs,
                          int socketSendBuffer,
                          int socketReceiveBuffer,
-                         int requestTimeoutMs) {
+                         int requestTimeoutMs,
+                         Time time) {
         this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection,
reconnectBackoffMs,
-                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs);
+                socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time);
     }
 
     private NetworkClient(MetadataUpdater metadataUpdater,
@@ -107,7 +120,9 @@ public class NetworkClient implements KafkaClient {
                           int maxInFlightRequestsPerConnection,
                           long reconnectBackoffMs,
                           int socketSendBuffer,
-                          int socketReceiveBuffer, int requestTimeoutMs) {
+                          int socketReceiveBuffer, 
+                          int requestTimeoutMs,
+                          Time time) {
 
         /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor,
but it's not
          * possible because `DefaultMetadataUpdater` is an inner class and it can only be
instantiated after the
@@ -127,8 +142,13 @@ public class NetworkClient implements KafkaClient {
         this.socketSendBuffer = socketSendBuffer;
         this.socketReceiveBuffer = socketReceiveBuffer;
         this.correlation = 0;
-        this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
+        this.randOffset = new Random();
+        this.nodeIndexOffset = this.randOffset.nextInt(Integer.MAX_VALUE);
         this.requestTimeoutMs = requestTimeoutMs;
+        this.nodesEverSeen = new ArrayList<>();
+        this.nodesEverSeenById = new HashMap<>();
+        
+        this.time = time;
     }
 
     /**
@@ -255,6 +275,7 @@ public class NetworkClient implements KafkaClient {
     @Override
     public List<ClientResponse> poll(long timeout, long now) {
         long metadataTimeout = metadataUpdater.maybeUpdate(now);
+        long updatedNow = now;
         try {
             this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
         } catch (IOException e) {
@@ -262,12 +283,13 @@ public class NetworkClient implements KafkaClient {
         }
 
         // process completed actions
+        updatedNow = this.time.milliseconds();
         List<ClientResponse> responses = new ArrayList<>();
-        handleCompletedSends(responses, now);
-        handleCompletedReceives(responses, now);
-        handleDisconnections(responses, now);
+        handleCompletedSends(responses, updatedNow);
+        handleCompletedReceives(responses, updatedNow);
+        handleDisconnections(responses, updatedNow);
         handleConnections();
-        handleTimedOutRequests(responses, now);
+        handleTimedOutRequests(responses, updatedNow);
 
         // invoke callbacks
         for (ClientResponse response : responses) {
@@ -364,6 +386,20 @@ public class NetworkClient implements KafkaClient {
                 found = node;
             }
         }
+
+        // if we found no node in the current list, try one from the nodes seen before
+        if (found == null && nodesEverSeen.size() > 0) {
+            int offset = randOffset.nextInt(nodesEverSeen.size());
+            for (int i = 0; i < nodesEverSeen.size(); i++) {
+                int idx = Utils.abs((offset + i) % nodesEverSeen.size());
+                Node node = nodesEverSeenById.get(nodesEverSeen.get(idx));
+                log.debug("No node found. Trying previously-seen node with ID {}", node.id());
+                if (!this.connectionStates.isBlackedOut(node.idString(), now)) {
+                    found = node;
+                }
+            }
+        }
+        
         return found;
     }
 
@@ -375,7 +411,7 @@ public class NetworkClient implements KafkaClient {
      * @param now The current time
      */
     private void processDisconnection(List<ClientResponse> responses, String nodeId,
long now) {
-        connectionStates.disconnected(nodeId);
+        connectionStates.disconnected(nodeId, now);
         for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
             log.trace("Cancelled request {} due to node {} being disconnected", request,
nodeId);
             if (!metadataUpdater.maybeHandleDisconnection(request))
@@ -489,7 +525,7 @@ public class NetworkClient implements KafkaClient {
                              this.socketReceiveBuffer);
         } catch (IOException e) {
             /* attempt failed, we'll try again after the backoff */
-            connectionStates.disconnected(nodeConnectionId);
+            connectionStates.disconnected(nodeConnectionId, now);
             /* maybe the problem is our metadata, update it */
             metadataUpdater.requestUpdate();
             log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(),
e);
@@ -532,7 +568,7 @@ public class NetworkClient implements KafkaClient {
             // if there is no node available to connect, back off refreshing metadata
             long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                     waitForMetadataFetch);
-
+ 
             if (metadataTimeout == 0) {
                 // Beware that the behavior of this method and the computation of timeouts
for poll() are
                 // highly dependent on the behavior of leastLoadedNode.
@@ -570,6 +606,29 @@ public class NetworkClient implements KafkaClient {
             this.metadata.requestUpdate();
         }
 
+        /*
+         * Keep track of any nodes we've ever seen. Add current
+         * alive nodes to this tracking list. 
+         * @param nodes Current alive nodes
+         */
+        private void updateNodesEverSeen(List<Node> nodes) {
+            Node existing = null;
+            for (Node n : nodes) {
+                existing = nodesEverSeenById.get(n.id());
+                if (existing == null) {
+                    nodesEverSeenById.put(n.id(), n);
+                    log.debug("Adding node {} to nodes ever seen", n.id());
+                    nodesEverSeen.add(n.id());
+                } else {
+                    // check if the nodes are really equal. There could be a case
+                    // where node.id() is the same but node has moved to different host
+                    if (!existing.equals(n)) {
+                        nodesEverSeenById.put(n.id(), n);
+                    }
+                }
+            }
+        }
+
         private void handleResponse(RequestHeader header, Struct body, long now) {
             this.metadataFetchInProgress = false;
             MetadataResponse response = new MetadataResponse(body);
@@ -582,6 +641,7 @@ public class NetworkClient implements KafkaClient {
             // created which means we will get errors and no nodes until it exists
             if (cluster.nodes().size() > 0) {
                 this.metadata.update(cluster, now);
+                this.updateNodesEverSeen(cluster.nodes());
             } else {
                 log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
                 this.metadata.failedUpdate(now);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 24051f2..2f7f153 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -527,7 +527,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
-                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
+                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
             this.subscriptions = new SubscriptionState(offsetResetStrategy);

http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 0b611fb..4153eb3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -175,7 +175,8 @@ public class ConsumerNetworkClient implements Closeable {
     private void poll(long timeout, long now) {
         // send all the requests we can send now
         pollUnsentRequests(now);
-
+        now = time.milliseconds();
+        
         // ensure we don't poll any longer than the deadline for
         // the next scheduled task
         timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
@@ -190,7 +191,7 @@ public class ConsumerNetworkClient implements Closeable {
         pollUnsentRequests(now);
 
         // fail all requests that couldn't be sent
-        clearUnsentRequests(now);
+        clearUnsentRequests();
 
     }
 
@@ -228,11 +229,13 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     private void pollUnsentRequests(long now) {
-        while (trySend(now))
+        while (trySend(now)) {
             clientPoll(0, now);
+            now = time.milliseconds();
+        }
     }
 
-    private void clearUnsentRequests(long now) {
+    private void clearUnsentRequests() {
         // clear all unsent requests and fail their corresponding futures
         for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet())
{
             Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
@@ -273,7 +276,7 @@ public class ConsumerNetworkClient implements Closeable {
     private void clientPoll(long timeout, long now) {
         client.poll(timeout, now);
         if (wakeup.get()) {
-            clearUnsentRequests(now);
+            clearUnsentRequests();
             wakeup.set(false);
             throw new ConsumerWakeupException();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 44280e0..ff3bfe6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -276,7 +276,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
                     config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
-                    this.requestTimeoutMs);
+                    this.requestTimeoutMs, time);
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,

http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 2379896..12136d8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -52,10 +52,12 @@ public class NetworkClientTest {
     private int nodeId = 1;
     private Cluster cluster = TestUtils.singletonCluster("test", nodeId);
     private Node node = cluster.nodes().get(0);
-    private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
0, 64 * 1024, 64 * 1024, requestTimeoutMs);
-
+    private long reconnectBackoffMsTest = 10 * 1000;
+    private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, 
+            64 * 1024, 64 * 1024, requestTimeoutMs, time);
+    
     private NetworkClient clientWithStaticNodes = new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)),
-            "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs);
+            "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs,
time);
 
     @Before
     public void setup() {
@@ -149,6 +151,31 @@ public class NetworkClientTest {
         assertEquals(node.idString(), disconnectedNode);
     }
 
+    @Test
+    public void testLeastLoadedNode() {
+        Node leastNode = null;
+        client.ready(node, time.milliseconds());
+        awaitReady(client, node);
+        client.poll(1, time.milliseconds());
+        assertTrue("The client should be ready", client.isReady(node, time.milliseconds()));
+        
+        // leastloadednode should be our single node
+        leastNode = client.leastLoadedNode(time.milliseconds());
+        assertEquals("There should be one leastloadednode", leastNode.id(), node.id());
+        
+        // sleep for longer than reconnect backoff
+        time.sleep(reconnectBackoffMsTest);
+        
+        // CLOSE node 
+        selector.close(node.idString());
+        
+        client.poll(1, time.milliseconds());
+        assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node,
time.milliseconds()));
+        leastNode = client.leastLoadedNode(time.milliseconds());
+        assertEquals("There should be NO leastloadednode", leastNode, null);
+        
+    }
+    
     private static class TestCallbackHandler implements RequestCompletionHandler {
         public boolean executed = false;
         public ClientResponse response;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 5a5f963..b39ff7e 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -55,6 +55,12 @@ public class MockSelector implements Selectable {
     @Override
     public void close(String id) {
         this.disconnected.add(id);
+        for (int i = 0; i < this.connected.size(); i++) {
+            if (this.connected.get(i).equals(id)) {
+                this.connected.remove(i);
+                break;
+            }
+        }
     }
 
     public void clear() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 3756822..d86c8ce 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -106,7 +106,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config:
Kaf
         0,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
         Selectable.USE_DEFAULT_BUFFER_SIZE,
-        config.requestTimeoutMs
+        config.requestTimeoutMs,
+        time
       )
     }
     val threadName = threadNamePrefix match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 510957b..beea83a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -317,7 +317,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
           0,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
           Selectable.USE_DEFAULT_BUFFER_SIZE,
-          config.requestTimeoutMs)
+          config.requestTimeoutMs,
+          kafkaMetricsTime)
       }
 
       var shutdownSucceeded: Boolean = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 5993bbb..4affd89 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -84,7 +84,8 @@ class ReplicaFetcherThread(name: String,
       0,
       Selectable.USE_DEFAULT_BUFFER_SIZE,
       brokerConfig.replicaSocketReceiveBufferBytes,
-      brokerConfig.requestTimeoutMs
+      brokerConfig.requestTimeoutMs,
+      time
     )
   }
 


Mime
View raw message