kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-3659: Handle coordinator disconnects more gracefully in client
Date Thu, 05 May 2016 19:03:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 e9d10108b -> 1fb0d796b


KAFKA-3659: Handle coordinator disconnects more gracefully in client

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Grant Henke <granthenke@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1322 from hachikuji/KAFKA-3659

(cherry picked from commit 32bf83e5a792c5ee9eb88660da71b73aad5bbc02)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1fb0d796
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1fb0d796
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1fb0d796

Branch: refs/heads/0.10.0
Commit: 1fb0d796bb93afe21a035764dc5e70292a76e061
Parents: e9d1010
Author: Jason Gustafson <jason@confluent.io>
Authored: Thu May 5 12:03:28 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu May 5 12:03:51 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |   2 +-
 .../consumer/internals/AbstractCoordinator.java |  12 +-
 .../consumer/internals/ConsumerCoordinator.java |   4 +-
 .../org/apache/kafka/clients/MockClient.java    |  24 +++-
 .../internals/AbstractCoordinatorTest.java      | 137 +++++++++++++++++++
 .../internals/ConsumerCoordinatorTest.java      |  94 ++++++-------
 .../runtime/distributed/WorkerGroupMember.java  |   4 +-
 .../distributed/WorkerCoordinatorTest.java      |   8 +-
 8 files changed, 225 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1fb0d796/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 7290a38..2373a13 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -938,7 +938,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long
timeout) {
         // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // ensure we have partitions assigned if we expect to
         if (subscriptions.partitionsAutoAssigned())

http://git-wip-us.apache.org/repos/asf/kafka/blob/1fb0d796/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 15185d7..6bb4406 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -171,9 +171,9 @@ public abstract class AbstractCoordinator implements Closeable {
                                            ByteBuffer memberAssignment);
 
     /**
-     * Block until the coordinator for this group is known.
+     * Block until the coordinator for this group is known and is ready to receive requests.
      */
-    public void ensureCoordinatorKnown() {
+    public void ensureCoordinatorReady() {
         while (coordinatorUnknown()) {
             RequestFuture<Void> future = sendGroupCoordinatorRequest();
             client.poll(future);
@@ -183,7 +183,13 @@ public abstract class AbstractCoordinator implements Closeable {
                     client.awaitMetadataUpdate();
                 else
                     throw future.exception();
+            } else if (coordinator != null && client.connectionFailed(coordinator))
{
+                // we found the coordinator, but the connection has failed, so mark
+                // it dead and backoff before retrying discovery
+                coordinatorDead();
+                time.sleep(retryBackoffMs);
             }
+
         }
     }
 
@@ -208,7 +214,7 @@ public abstract class AbstractCoordinator implements Closeable {
         }
 
         while (needRejoin()) {
-            ensureCoordinatorKnown();
+            ensureCoordinatorReady();
 
             // ensure that there are no pending requests to the coordinator. This is important
             // in particular to avoid resending a pending JoinGroup request.

http://git-wip-us.apache.org/repos/asf/kafka/blob/1fb0d796/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index d44d8eb..c1f373f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -321,7 +321,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      */
     public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition>
partitions) {
         while (true) {
-            ensureCoordinatorKnown();
+            ensureCoordinatorReady();
 
             // contact coordinator to fetch committed offsets
             RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
@@ -397,7 +397,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             return;
 
         while (true) {
-            ensureCoordinatorKnown();
+            ensureCoordinatorReady();
 
             RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
             client.poll(future);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1fb0d796/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 1c3efd4..527d283 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -18,9 +18,11 @@ package org.apache.kafka.clients;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 
@@ -58,6 +60,7 @@ public class MockClient implements KafkaClient {
     private int correlation = 0;
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
+    private final Map<Node, Long> blackedOut = new HashMap<>();
     private final Queue<ClientRequest> requests = new ArrayDeque<>();
     private final Queue<ClientResponse> responses = new ArrayDeque<>();
     private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
@@ -73,6 +76,8 @@ public class MockClient implements KafkaClient {
 
     @Override
     public boolean ready(Node node, long now) {
+        if (isBlackedOut(node))
+            return false;
         ready.add(node.idString());
         return true;
     }
@@ -82,9 +87,26 @@ public class MockClient implements KafkaClient {
         return 0;
     }
 
+    public void blackout(Node node, long duration) {
+        blackedOut.put(node, time.milliseconds() + duration);
+    }
+
+    private boolean isBlackedOut(Node node) {
+        if (blackedOut.containsKey(node)) {
+            long expiration = blackedOut.get(node);
+            if (time.milliseconds() > expiration) {
+                blackedOut.remove(node);
+                return false;
+            } else {
+                return true;
+            }
+        }
+        return false;
+    }
+
     @Override
     public boolean connectionFailed(Node node) {
-        return false;
+        return isBlackedOut(node);
     }
 
     public void disconnect(String node) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1fb0d796/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
new file mode 100644
index 0000000..7a05eb1
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+public class AbstractCoordinatorTest {
+
+    private static final ByteBuffer EMPTY_DATA = ByteBuffer.wrap(new byte[0]);
+    private static final int SESSION_TIMEOUT_MS = 30000;
+    private static final int HEARTBEAT_INTERVAL_MS = 3000;
+    private static final long RETRY_BACKOFF_MS = 100;
+    private static final long REQUEST_TIMEOUT_MS = 40000;
+    private static final String GROUP_ID = "dummy-group";
+    private static final String METRIC_GROUP_PREFIX = "consumer";
+
+    private MockClient mockClient;
+    private MockTime mockTime;
+    private Node node;
+    private Node coordinatorNode;
+    private ConsumerNetworkClient consumerClient;
+    private DummyCoordinator coordinator;
+
+    @Before
+    public void setupCoordinator() {
+        this.mockTime = new MockTime();
+        this.mockClient = new MockClient(mockTime);
+
+        Metadata metadata = new Metadata();
+        this.consumerClient = new ConsumerNetworkClient(mockClient, metadata, mockTime,
+                RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS);
+        Metrics metrics = new Metrics();
+
+        Cluster cluster = TestUtils.singletonCluster("topic", 1);
+        metadata.update(cluster, mockTime.milliseconds());
+        this.node = cluster.nodes().get(0);
+        mockClient.setNode(node);
+
+        this.coordinatorNode = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+        this.coordinator = new DummyCoordinator(consumerClient, metrics, mockTime);
+    }
+
+    @Test
+    public void testCoordinatorDiscoveryBackoff() {
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+
+        // blackout the coordinator for 50 milliseconds to simulate a disconnect.
+        // after backing off, we should be able to connect.
+        mockClient.blackout(coordinatorNode, 50L);
+
+        long initialTime = mockTime.milliseconds();
+        coordinator.ensureCoordinatorReady();
+        long endTime = mockTime.milliseconds();
+
+        assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
+    }
+
+    private Struct groupCoordinatorResponse(Node node, short error) {
+        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
+        return response.toStruct();
+    }
+
+    public class DummyCoordinator extends AbstractCoordinator {
+
+        public DummyCoordinator(ConsumerNetworkClient client,
+                                Metrics metrics,
+                                Time time) {
+            super(client, GROUP_ID, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics,
+                    METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS);
+        }
+
+        @Override
+        protected String protocolType() {
+            return "dummy";
+        }
+
+        @Override
+        protected List<JoinGroupRequest.ProtocolMetadata> metadata() {
+            return Collections.singletonList(new JoinGroupRequest.ProtocolMetadata("dummy-subprotocol",
EMPTY_DATA));
+        }
+
+        @Override
+        protected Map<String, ByteBuffer> performAssignment(String leaderId, String
protocol, Map<String, ByteBuffer> allMemberMetadata) {
+            Map<String, ByteBuffer> assignment = new HashMap<>();
+            for (Map.Entry<String, ByteBuffer> metadata : allMemberMetadata.entrySet())
+                assignment.put(metadata.getKey(), EMPTY_DATA);
+            return assignment;
+        }
+
+        @Override
+        protected void onJoinPrepare(int generation, String memberId) {
+
+        }
+
+        @Override
+        protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer
memberAssignment) {
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1fb0d796/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index bb31acf..82a854a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -125,7 +125,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testNormalHeartbeat() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // normal heartbeat
         time.sleep(sessionTimeoutMs);
@@ -143,7 +143,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = GroupAuthorizationException.class)
     public void testGroupDescribeUnauthorized() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
     }
 
     @Test(expected = GroupAuthorizationException.class)
@@ -151,7 +151,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String,
List<String>>emptyMap(),
                 Errors.GROUP_AUTHORIZATION_FAILED.code()));
@@ -161,7 +161,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCoordinatorNotAvailable() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -182,7 +182,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testNotCoordinator() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // not_coordinator will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -203,7 +203,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testIllegalGeneration() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // illegal_generation will cause re-partition
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
@@ -227,7 +227,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testUnknownConsumerId() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // illegal_generation will cause re-partition
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
@@ -251,7 +251,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCoordinatorDisconnect() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // coordinator disconnect will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
@@ -281,7 +281,7 @@ public class ConsumerCoordinatorTest {
         metadata.update(cluster, time.milliseconds());
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String,
List<String>>emptyMap(),
                 Errors.INVALID_GROUP_ID.code()));
@@ -300,7 +300,7 @@ public class ConsumerCoordinatorTest {
         metadata.update(cluster, time.milliseconds());
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // normal join group
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId,
Arrays.asList(topicName));
@@ -338,7 +338,7 @@ public class ConsumerCoordinatorTest {
         metadata.update(cluster, time.milliseconds());
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId,
Arrays.asList(topicName));
         partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
@@ -373,7 +373,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // normal join group
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
@@ -404,7 +404,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
@@ -432,7 +432,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
@@ -462,7 +462,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
@@ -478,7 +478,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator returns unknown member id
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
@@ -508,7 +508,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
@@ -532,7 +532,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
@@ -562,7 +562,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
@@ -597,7 +597,7 @@ public class ConsumerCoordinatorTest {
         metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // prepare initial rebalance
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId,
topics);
@@ -658,7 +658,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // join the group once
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
@@ -686,7 +686,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // disconnected from original coordinator will cause re-discover and join again
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()),
true);
@@ -707,7 +707,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // coordinator doesn't like the session timeout
         client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code()));
@@ -719,7 +719,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
 
@@ -741,7 +741,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
@@ -767,7 +767,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.needReassignment();
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // haven't joined, so should not cause a commit
         time.sleep(autoCommitIntervalMs);
@@ -795,7 +795,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.seek(tp, 100);
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         time.sleep(autoCommitIntervalMs);
@@ -821,7 +821,7 @@ public class ConsumerCoordinatorTest {
 
         // now find the coordinator
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // sleep only for the retry backoff
         time.sleep(retryBackoffMs);
@@ -836,7 +836,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
 
@@ -852,7 +852,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetAsyncWithDefaultCallback() {
         int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
null);
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
@@ -865,7 +865,7 @@ public class ConsumerCoordinatorTest {
         subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
 
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
@@ -896,7 +896,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetAsyncFailedWithDefaultCallback() {
         int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
         coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)),
null);
         assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
@@ -906,7 +906,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetAsyncCoordinatorNotAvailable() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // async commit with coordinator not available
         MockCommitCallback cb = new MockCommitCallback();
@@ -921,7 +921,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetAsyncNotCoordinator() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // async commit with not coordinator
         MockCommitCallback cb = new MockCommitCallback();
@@ -936,7 +936,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetAsyncDisconnected() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // async commit with coordinator disconnected
         MockCommitCallback cb = new MockCommitCallback();
@@ -951,7 +951,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetSyncNotCoordinator() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then
submit the commit request)
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
@@ -963,7 +963,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetSyncCoordinatorNotAvailable() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then
submit the commit request)
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
@@ -975,7 +975,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetSyncCoordinatorDisconnected() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then
submit the commit request)
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())),
true);
@@ -988,7 +988,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetMetadataTooLarge() {
         // since offset metadata is provided by the user, we have to propagate the exception
so they can handle it
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code())));
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L,
"metadata")));
@@ -998,7 +998,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetIllegalGeneration() {
         // we cannot retry if a rebalance occurs before the commit completed
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.ILLEGAL_GENERATION.code())));
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L,
"metadata")));
@@ -1008,7 +1008,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetUnknownMemberId() {
         // we cannot retry if a rebalance occurs before the commit completed
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN_MEMBER_ID.code())));
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L,
"metadata")));
@@ -1018,7 +1018,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetRebalanceInProgress() {
         // we cannot retry if a rebalance occurs before the commit completed
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.REBALANCE_IN_PROGRESS.code())));
         coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L,
"metadata")));
@@ -1027,7 +1027,7 @@ public class ConsumerCoordinatorTest {
     @Test(expected = KafkaException.class)
     public void testCommitOffsetSyncCallbackWithNonRetriableException() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // sync commit with invalid partitions should throw if we have no callback
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN.code())),
false);
@@ -1037,7 +1037,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testRefreshOffset() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
@@ -1050,7 +1050,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testRefreshOffsetLoadInProgress() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
@@ -1064,7 +1064,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testRefreshOffsetNotCoordinatorForConsumer() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
@@ -1079,7 +1079,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testRefreshOffsetWithNoFetchableOffsets() {
         client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();

http://git-wip-us.apache.org/repos/asf/kafka/blob/1fb0d796/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 85af549..c21b9bf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -131,7 +131,7 @@ public class WorkerGroupMember {
     }
 
     public void ensureActive() {
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
         coordinator.ensureActiveGroup();
     }
 
@@ -143,7 +143,7 @@ public class WorkerGroupMember {
         long remaining = timeout;
         while (remaining >= 0) {
             long start = time.milliseconds();
-            coordinator.ensureCoordinatorKnown();
+            coordinator.ensureCoordinatorReady();
             coordinator.ensureActiveGroup();
             client.poll(remaining);
             remaining -= time.milliseconds() - start;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1fb0d796/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index f7423ec..4c2ac40 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -171,7 +171,7 @@ public class WorkerCoordinatorTest {
         final String consumerId = "leader";
 
         client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // normal join group
         Map<String, Long> memberConfigOffsets = new HashMap<>();
@@ -211,7 +211,7 @@ public class WorkerCoordinatorTest {
         final String memberId = "member";
 
         client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // normal join group
         client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
@@ -252,7 +252,7 @@ public class WorkerCoordinatorTest {
         final String memberId = "member";
 
         client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // config mismatch results in assignment error
         client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
@@ -283,7 +283,7 @@ public class WorkerCoordinatorTest {
         PowerMock.replayAll();
 
         client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
-        coordinator.ensureCoordinatorKnown();
+        coordinator.ensureCoordinatorReady();
 
         // join the group once
         client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));


Mime
View raw message