kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4716: Fix case when controller cannot be reached
Date Thu, 09 Feb 2017 22:02:37 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 39f1a7d8c -> bad631a7c


KAFKA-4716: Fix case when controller cannot be reached

Author: Eno Thereska <eno@confluent.io>

Reviewers: Dan Norwood, Ismael Juma, Guozhang Wang

Closes #2526 from enothereska/0.10.2-KAFKA-4716


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bad631a7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bad631a7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bad631a7

Branch: refs/heads/0.10.2
Commit: bad631a7c6d06a9bf3985af0881538a5dda60d59
Parents: 39f1a7d
Author: Eno Thereska <eno@confluent.io>
Authored: Thu Feb 9 14:02:33 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Feb 9 14:02:33 2017 -0800

----------------------------------------------------------------------
 .../internals/InternalTopicManager.java         | 13 ++---
 .../processor/internals/StreamsKafkaClient.java | 52 ++++++++++++++------
 .../internals/InternalTopicManagerTest.java     | 13 +++--
 3 files changed, 52 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bad631a7/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index a9bd5d7..52536e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -58,9 +58,10 @@ public class InternalTopicManager {
     public void makeReady(final Map<InternalTopicConfig, Integer> topics) {
         for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
             try {
-                final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic();
+                final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
+                final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
                 final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics,
existingTopicPartitions);
-                streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention);
+                streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention,
metadata);
                 return;
             } catch (StreamsException ex) {
                 log.warn("Could not create internal topics: " + ex.getMessage() + " Retry
#" + i);
@@ -73,7 +74,8 @@ public class InternalTopicManager {
      * Get the number of partitions for the given topics
      */
     public Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic();
+        final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
+        final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
         existingTopicPartitions.keySet().retainAll(topics);
 
         return existingTopicPartitions;
@@ -108,11 +110,10 @@ public class InternalTopicManager {
         return topicsToBeCreated;
     }
 
-    private Map<String, Integer> fetchExistingPartitionCountByTopic() {
+    private Map<String, Integer> fetchExistingPartitionCountByTopic(final MetadataResponse
metadata) {
         // The names of existing topics and corresponding partition counts
         final Map<String, Integer> existingPartitionCountByTopic = new HashMap<>();
-
-        Collection<MetadataResponse.TopicMetadata> topicsMetadata = streamsKafkaClient.fetchTopicsMetadata();
+        final Collection<MetadataResponse.TopicMetadata> topicsMetadata = metadata.topicMetadata();
 
         for (MetadataResponse.TopicMetadata topicMetadata: topicsMetadata) {
             existingPartitionCountByTopic.put(topicMetadata.topic(), topicMetadata.partitionMetadata().size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/bad631a7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 8a9b255..94d8854 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -47,7 +47,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -137,7 +137,8 @@ public class StreamsKafkaClient {
     /**
      * Create a set of new topics using batch request.
      */
-    public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final
int replicationFactor, final long windowChangeLogAdditionalRetention) {
+    public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final
int replicationFactor,
+                             final long windowChangeLogAdditionalRetention, final MetadataResponse
metadata) {
 
         final Map<String, CreateTopicsRequest.TopicDetails> topicRequestDetails = new
HashMap<>();
         for (InternalTopicConfig internalTopicConfig:topicsMap.keySet()) {
@@ -155,7 +156,7 @@ public class StreamsKafkaClient {
         }
 
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
-            getBrokerId(),
+            getControllerReadyBrokerId(metadata),
             new CreateTopicsRequest.Builder(
                 topicRequestDetails,
                 streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG)),
@@ -180,15 +181,13 @@ public class StreamsKafkaClient {
         }
     }
 
-    private String getBrokerId() {
+    /**
+     *
+     * @param nodes List of nodes to pick from.
+     * @return The first node that is ready to accept requests.
+     */
+    private String ensureOneNodeIsReady(final List<Node> nodes) {
         String brokerId = null;
-        final Metadata metadata = new Metadata(
-            streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
-            streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
-        final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-        metadata.update(Cluster.bootstrap(addresses), Time.SYSTEM.milliseconds());
-
-        final List<Node> nodes = metadata.fetch().nodes();
         final long readyTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
         boolean foundNode = false;
         while (!foundNode && (Time.SYSTEM.milliseconds() < readyTimeout)) {
@@ -207,6 +206,29 @@ public class StreamsKafkaClient {
         return brokerId;
     }
 
+    /**
+     *
+     * @return if Id of the controller node, or an exception if no controller is found or
+     * controller is not ready
+     */
+    private String getControllerReadyBrokerId(final MetadataResponse metadata) {
+        return ensureOneNodeIsReady(Collections.singletonList(metadata.controller()));
+    }
+
+    /**
+     * @return the Id of any broker that is ready, or an exception if no broker is ready.
+     */
+    private String getAnyReadyBrokerId() {
+        final Metadata metadata = new Metadata(
+            streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
+            streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
+        final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+        metadata.update(Cluster.bootstrap(addresses), Time.SYSTEM.milliseconds());
+
+        final List<Node> nodes = metadata.fetch().nodes();
+        return ensureOneNodeIsReady(nodes);
+    }
+
     private ClientResponse sendRequest(final ClientRequest clientRequest) {
         kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
         final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
@@ -234,10 +256,10 @@ public class StreamsKafkaClient {
     /**
      * Fetch the metadata for all topics
      */
-    public Collection<MetadataResponse.TopicMetadata> fetchTopicsMetadata() {
+    public MetadataResponse fetchMetadata() {
 
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
-            getBrokerId(),
+            getAnyReadyBrokerId(),
             new MetadataRequest.Builder(null),
             Time.SYSTEM.milliseconds(),
             true);
@@ -250,7 +272,7 @@ public class StreamsKafkaClient {
                 "Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
         }
         final MetadataResponse metadataResponse = (MetadataResponse) clientResponse.responseBody();
-        return metadataResponse.topicMetadata();
+        return metadataResponse;
     }
 
     /**
@@ -263,7 +285,7 @@ public class StreamsKafkaClient {
      */
     public void checkBrokerCompatibility() throws StreamsException {
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
-            getBrokerId(),
+            getAnyReadyBrokerId(),
             new ApiVersionsRequest.Builder(),
             Time.SYSTEM.milliseconds(),
             true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/bad631a7/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 548cb45..69b3c46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
@@ -96,15 +95,19 @@ public class InternalTopicManagerTest {
         }
 
         @Override
-        public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
final int replicationFactor, final long windowChangeLogAdditionalRetention) {
+        public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
final int replicationFactor,
+                                 final long windowChangeLogAdditionalRetention, final MetadataResponse
metadata) {
             // do nothing
         }
 
         @Override
-        public Collection<MetadataResponse.TopicMetadata> fetchTopicsMetadata() {
-            MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
1, null, new ArrayList<Node>(), new ArrayList<Node>());
+        public MetadataResponse fetchMetadata() {
+            Node node = new Node(1, "host1", 1001);
+            MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
1, node, new ArrayList<Node>(), new ArrayList<Node>());
             MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
topic, true, Collections.singletonList(partitionMetadata));
-            return Collections.singleton(topicMetadata);
+            MetadataResponse response = new MetadataResponse(Collections.<Node>emptyList(),
null, MetadataResponse.NO_CONTROLLER_ID,
+                Collections.singletonList(topicMetadata), 0);
+            return response;
         }
     }
 }
\ No newline at end of file


Mime
View raw message