kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4716: send create topics to controller in internaltopicmanager
Date Thu, 09 Feb 2017 22:01:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f8c11eb0c -> b865a8b1d


KAFKA-4716: send create topics to controller in internaltopicmanager

This PR fixes a blocker issue, where the streams client code cannot talk to the controller.
It also enables a system test that was previously failing.

This PR is for trunk only. A separate PR with just the fix (but not the tests) will be created
for 0.10.2.

Author: Eno Thereska <eno@confluent.io>
Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy, Ismael Juma, Matthias J. Sax, Guozhang Wang

Closes #2522 from enothereska/KAFKA-4716-metadata


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b865a8b1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b865a8b1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b865a8b1

Branch: refs/heads/trunk
Commit: b865a8b1dcae642b80280b8a7b5e23e82666061b
Parents: f8c11eb
Author: Eno Thereska <eno@confluent.io>
Authored: Thu Feb 9 14:01:13 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Feb 9 14:01:13 2017 -0800

----------------------------------------------------------------------
 .../internals/InternalTopicManager.java         | 13 ++---
 .../processor/internals/StreamsKafkaClient.java | 52 ++++++++++++++------
 .../kafka/streams/perf/SimpleBenchmark.java     |  5 ++
 .../internals/InternalTopicManagerTest.java     | 13 +++--
 .../streams/streams_simple_benchmark_test.py    |  2 +-
 .../tests/streams/streams_smoke_test.py         |  4 +-
 6 files changed, 60 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b865a8b1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index a9bd5d7..52536e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -58,9 +58,10 @@ public class InternalTopicManager {
     public void makeReady(final Map<InternalTopicConfig, Integer> topics) {
         for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
             try {
-                final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic();
+                final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
+                final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
                 final Map<InternalTopicConfig, Integer> topicsToBeCreated = validateTopicPartitions(topics,
existingTopicPartitions);
-                streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention);
+                streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention,
metadata);
                 return;
             } catch (StreamsException ex) {
                 log.warn("Could not create internal topics: " + ex.getMessage() + " Retry
#" + i);
@@ -73,7 +74,8 @@ public class InternalTopicManager {
      * Get the number of partitions for the given topics
      */
     public Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic();
+        final MetadataResponse metadata = streamsKafkaClient.fetchMetadata();
+        final Map<String, Integer> existingTopicPartitions = fetchExistingPartitionCountByTopic(metadata);
         existingTopicPartitions.keySet().retainAll(topics);
 
         return existingTopicPartitions;
@@ -108,11 +110,10 @@ public class InternalTopicManager {
         return topicsToBeCreated;
     }
 
-    private Map<String, Integer> fetchExistingPartitionCountByTopic() {
+    private Map<String, Integer> fetchExistingPartitionCountByTopic(final MetadataResponse
metadata) {
         // The names of existing topics and corresponding partition counts
         final Map<String, Integer> existingPartitionCountByTopic = new HashMap<>();
-
-        Collection<MetadataResponse.TopicMetadata> topicsMetadata = streamsKafkaClient.fetchTopicsMetadata();
+        final Collection<MetadataResponse.TopicMetadata> topicsMetadata = metadata.topicMetadata();
 
         for (MetadataResponse.TopicMetadata topicMetadata: topicsMetadata) {
             existingPartitionCountByTopic.put(topicMetadata.topic(), topicMetadata.partitionMetadata().size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/b865a8b1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 25e4a36..362050f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -47,7 +47,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -137,7 +137,8 @@ public class StreamsKafkaClient {
     /**
      * Create a set of new topics using batch request.
      */
-    public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final
int replicationFactor, final long windowChangeLogAdditionalRetention) {
+    public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final
int replicationFactor,
+                             final long windowChangeLogAdditionalRetention, final MetadataResponse
metadata) {
 
         final Map<String, CreateTopicsRequest.TopicDetails> topicRequestDetails = new
HashMap<>();
         for (InternalTopicConfig internalTopicConfig:topicsMap.keySet()) {
@@ -155,7 +156,7 @@ public class StreamsKafkaClient {
         }
 
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
-            getBrokerId(),
+            getControllerReadyBrokerId(metadata),
             new CreateTopicsRequest.Builder(
                 topicRequestDetails,
                 streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG)),
@@ -180,15 +181,13 @@ public class StreamsKafkaClient {
         }
     }
 
-    private String getBrokerId() {
+    /**
+     *
+     * @param nodes List of nodes to pick from.
+     * @return The first node that is ready to accept requests.
+     */
+    private String ensureOneNodeIsReady(final List<Node> nodes) {
         String brokerId = null;
-        final Metadata metadata = new Metadata(
-            streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
-            streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
-        final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-        metadata.update(Cluster.bootstrap(addresses), Time.SYSTEM.milliseconds());
-
-        final List<Node> nodes = metadata.fetch().nodes();
         final long readyTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
         boolean foundNode = false;
         while (!foundNode && (Time.SYSTEM.milliseconds() < readyTimeout)) {
@@ -207,6 +206,29 @@ public class StreamsKafkaClient {
         return brokerId;
     }
 
+    /**
+     *
+     * @return if Id of the controller node, or an exception if no controller is found or
+     * controller is not ready
+     */
+    private String getControllerReadyBrokerId(final MetadataResponse metadata) {
+        return ensureOneNodeIsReady(Collections.singletonList(metadata.controller()));
+    }
+
+    /**
+     * @return the Id of any broker that is ready, or an exception if no broker is ready.
+     */
+    private String getAnyReadyBrokerId() {
+        final Metadata metadata = new Metadata(
+            streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG),
+            streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
+        final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+        metadata.update(Cluster.bootstrap(addresses), Time.SYSTEM.milliseconds());
+
+        final List<Node> nodes = metadata.fetch().nodes();
+        return ensureOneNodeIsReady(nodes);
+    }
+
     private ClientResponse sendRequest(final ClientRequest clientRequest) {
         kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
         final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
@@ -234,10 +256,10 @@ public class StreamsKafkaClient {
     /**
      * Fetch the metadata for all topics
      */
-    public Collection<MetadataResponse.TopicMetadata> fetchTopicsMetadata() {
+    public MetadataResponse fetchMetadata() {
 
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
-            getBrokerId(),
+            getAnyReadyBrokerId(),
             new MetadataRequest.Builder(null),
             Time.SYSTEM.milliseconds(),
             true);
@@ -250,7 +272,7 @@ public class StreamsKafkaClient {
                 "Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
         }
         final MetadataResponse metadataResponse = (MetadataResponse) clientResponse.responseBody();
-        return metadataResponse.topicMetadata();
+        return metadataResponse;
     }
 
     /**
@@ -263,7 +285,7 @@ public class StreamsKafkaClient {
      */
     public void checkBrokerCompatibility() throws StreamsException {
         final ClientRequest clientRequest = kafkaClient.newClientRequest(
-            getBrokerId(),
+            getAnyReadyBrokerId(),
             new ApiVersionsRequest.Builder(),
             Time.SYSTEM.milliseconds(),
             true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b865a8b1/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 29dcf51..21efe50 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -65,6 +65,11 @@ import java.util.Random;
  * 3. Run the stream processing step second: SimpleBenchmark localhost:9092 /tmp/statedir
numRecords false "all"
  * Note that what changed is the 4th parameter, from "true" indicating that is a load phase,
to "false" indicating
  * that this is a real run.
+ *
+ * Note that "all" is a convenience option when running this test locally and will not work
when running the test
+ * at scale (through tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py).
That is due to exact syncronization
+ * needs for each test (e.g., you wouldn't want one instance to run "count" while another
+ * is still running "consume"
  */
 public class SimpleBenchmark {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b865a8b1/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 548cb45..69b3c46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -30,7 +30,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
@@ -96,15 +95,19 @@ public class InternalTopicManagerTest {
         }
 
         @Override
-        public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
final int replicationFactor, final long windowChangeLogAdditionalRetention) {
+        public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap,
final int replicationFactor,
+                                 final long windowChangeLogAdditionalRetention, final MetadataResponse
metadata) {
             // do nothing
         }
 
         @Override
-        public Collection<MetadataResponse.TopicMetadata> fetchTopicsMetadata() {
-            MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
1, null, new ArrayList<Node>(), new ArrayList<Node>());
+        public MetadataResponse fetchMetadata() {
+            Node node = new Node(1, "host1", 1001);
+            MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE,
1, node, new ArrayList<Node>(), new ArrayList<Node>());
             MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE,
topic, true, Collections.singletonList(partitionMetadata));
-            return Collections.singleton(topicMetadata);
+            MetadataResponse response = new MetadataResponse(Collections.<Node>emptyList(),
null, MetadataResponse.NO_CONTROLLER_ID,
+                Collections.singletonList(topicMetadata), 0);
+            return response;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b865a8b1/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
index 6af7f11..c1db8c8 100644
--- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
+++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py
@@ -35,7 +35,7 @@ class StreamsSimpleBenchmarkTest(Test):
 
 
     @cluster(num_nodes=9)
-    @matrix(test=["all"], scale=[1])
+    @matrix(test=["produce", "consume", "count", "processstream", "processstreamwithsink",
"processstreamwithstatestore", "processstreamwithcachedstatestore", "kstreamktablejoin", "kstreamkstreamjoin",
"ktablektablejoin"], scale=[1, 2, 3])
     def test_simple_benchmark(self, test, scale):
         """
         Run simple Kafka Streams benchmark

http://git-wip-us.apache.org/repos/asf/kafka/blob/b865a8b1/tests/kafkatest/tests/streams/streams_smoke_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py
index a824d92..496c495 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -27,7 +27,7 @@ class StreamsSmokeTest(KafkaTest):
     """
 
     def __init__(self, test_context):
-        super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
+        super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
             'echo' : { 'partitions': 5, 'replication-factor': 1 },
             'data' : { 'partitions': 5, 'replication-factor': 1 },
             'min' : { 'partitions': 5, 'replication-factor': 1 },
@@ -46,7 +46,7 @@ class StreamsSmokeTest(KafkaTest):
         self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
         self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
 
-    @cluster(num_nodes=8)
+    @cluster(num_nodes=9)
     def test_streams(self):
         """
         Start a few smoke test clients, then repeat start a new one, stop (cleanly) running
one a few times.


Mime
View raw message