kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-5704: Corrected Connect distributed startup behavior to allow older brokers to auto-create topics
Date Wed, 09 Aug 2017 03:20:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b8cf97686 -> 1a653c813

KAFKA-5704: Corrected Connect distributed startup behavior to allow older brokers to auto-create

When a Connect distributed worker starts up talking with broker versions and later,
it will use the AdminClient to look for the internal topics and attempt to create them if
they are missing. Although the AdminClient was added in, the AdminClient uses APIs
to create topics that existed in and later. This feature works as expected when Connect
uses a broker version or later.

However, when a Connect distributed worker starts up using a broker older than, the
AdminClient is not able to find the required APIs and thus will throw an UnsupportedVersionException.
Unfortunately, this exception is not caught and instead causes the Connect worker to fail
even when the topics already exist.

This change handles the UnsupportedVersionException by logging a debug message and doing nothing.
The existing producer logic will get information about the topics, which will cause the broker
to create them if they don’t exist and broker auto-creation of topics is enabled. This is
the same behavior that existed prior to, and so this change restores that behavior
for brokers older than

This change also adds a system test that verifies Connect works with a variety of brokers
and is able to run source and sink connectors. The test verifies that Connect can read from
the internal topics when the connectors are restarted.

Author: Randall Hauch <rhauch@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3641 from rhauch/kafka-5704

Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a653c81
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a653c81
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a653c81

Branch: refs/heads/trunk
Commit: 1a653c813c842c0b67f26fb119d7727e272cf834
Parents: b8cf976
Author: Randall Hauch <rhauch@gmail.com>
Authored: Tue Aug 8 20:20:41 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Aug 8 20:20:41 2017 -0700

 .../apache/kafka/connect/util/TopicAdmin.java   | 10 +++---
 .../kafka/connect/util/TopicAdminTest.java      | 15 +++-----
 .../tests/connect/connect_distributed_test.py   | 36 ++++++++++++++++++--
 3 files changed, 44 insertions(+), 17 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index adc3378..5da4f2d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -195,13 +195,14 @@ public class TopicAdmin implements AutoCloseable {
      * are excluded from the result.
      * <p>
      * If multiple topic definitions have the same topic name, the last one with that name
will be used.
-     * </p>
+     * <p>
+     * Apache Kafka added support for creating topics in, so this method works as
expected with that and later versions.
+     * With brokers older than, this method is unable to create topics and always
returns an empty set.
      * @param topics the specifications of the topics
      * @return the names of the topics that were created by this operation; never null but
possibly empty
      * @throws ConnectException            if an error occurs, the operation takes too long,
or the thread is interrupted while
      *                                     attempting to perform this operation
-     * @throws UnsupportedVersionException if the broker does not support the necessary APIs
to perform this request
     public Set<String> createTopics(NewTopic... topics) {
         Map<String, NewTopic> topicsByName = new HashMap<>();
@@ -233,8 +234,9 @@ public class TopicAdmin implements AutoCloseable {
                 if (cause instanceof UnsupportedVersionException) {
-                    log.error("Unable to use Kafka admin client to create topic descriptions
for '{}' using the brokers at {}", topicNameList, bootstrapServers);
-                    throw (UnsupportedVersionException) cause;
+                    log.debug("Unable to use Kafka admin client to create topic descriptions
for '{}' using the brokers at {}," +
+                                      "falling back to assume topic(s) exist or will be auto-created
by the broker", topicNameList, bootstrapServers);
+                    return Collections.emptySet();
                 if (cause instanceof TimeoutException) {
                     // Timed out waiting for the operation to complete

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index f90b77f..0a61d3e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
@@ -36,15 +35,13 @@ import java.util.Set;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 public class TopicAdminTest {
-     * 0.10.x clients can't talk with 0.9.x brokers, and introduced the new protocol
with API versions.
-     * That means we can simulate an API version mismatch.
-     *
-     * @throws Exception
+     * clients can talk with older brokers, but the CREATE_TOPIC API was added in That means,
+     * if our TopicAdmin talks to a pre 0.10.1 broker, it should receive an UnsupportedVersionException,
+     * create no topics, and return false.
     public void returnNullWithApiVersionMismatch() {
@@ -56,10 +53,8 @@ public class TopicAdminTest {
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            admin.createTopic(newTopic);
-            fail();
-        } catch (UnsupportedVersionException e) {
-            // expected
+            boolean created = admin.createTopic(newTopic);
+            assertFalse(created);

diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 5c7793a..da7d1de 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -24,6 +24,7 @@ from kafkatest.services.kafka import KafkaService, config_property
 from kafkatest.services.connect import ConnectDistributedService, VerifiableSource, VerifiableSink,
ConnectRestError, MockSink, MockSource
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.version import DEV_BRANCH, LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0,
LATEST_0_9, LATEST_0_8_2, KafkaVersion
 from collections import Counter, namedtuple
 import itertools
@@ -75,12 +76,12 @@ class ConnectDistributedTest(Test):
         self.key_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
-        self.broker_config_overrides = [["auto.create.topics.enable", "false"]]
-    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None):
+    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None,
broker_version=DEV_BRANCH, auto_create_topics=False):
         self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
                                   security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
-                                  topics=self.topics, server_prop_overides=self.broker_config_overrides)
+                                  topics=self.topics, version=broker_version,
+                                  server_prop_overides=[["auto.create.topics.enable", str(auto_create_topics)]])
         if timestamp_type is not None:
             for node in self.kafka.nodes:
                 node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
@@ -502,6 +503,35 @@ class ConnectDistributedTest(Test):
             assert obj['payload']['content'] in self.FIRST_INPUT_LIST
             assert obj['payload'][ts_fieldname] == ts
+    @cluster(num_nodes=5)
+    @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT)
+    @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT)
+    @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT)
+    @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, security_protocol=SecurityConfig.PLAINTEXT)
+    @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT)
+    @parametrize(broker_version=str(LATEST_0_9), auto_create_topics=True, security_protocol=SecurityConfig.PLAINTEXT)
+    def test_broker_compatibility(self, broker_version, auto_create_topics, security_protocol):
+        """
+        Verify that Connect will start up with various broker versions with various configurations.

+        When Connect distributed starts up, it either creates internal topics (v0.10.1.0
and after) 
+        or relies upon the broker to auto-create the topics (v0.10.0.x and before).
+        """
+        self.setup_services(broker_version=broker_version, auto_create_topics=auto_create_topics,
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+        self.logger.info("Creating connectors")
+        self._start_connector("connect-file-source.properties")
+        self._start_connector("connect-file-sink.properties")
+        # Generating data on the source node should generate new records and create new output
on the sink node. Timeouts
+        # here need to be more generous than they are for standalone mode because a) it takes
longer to write configs,
+        # do rebalancing of the group, etc, and b) without explicit leave group support,
rebalancing takes awhile
+        for node in self.cc.nodes:
+            node.account.ssh("echo -e -n " + repr(self.FIRST_INPUTS) + " >> " + self.INPUT_FILE)
+        wait_until(lambda: self._validate_file_output(self.FIRST_INPUT_LIST), timeout_sec=70,
err_msg="Data added to input file was not seen in the output file in a reasonable amount of
     def _validate_file_output(self, input):
         input_set = set(input)
         # Output needs to be collected from all nodes because we can't be sure where the
tasks will be scheduled.

View raw message