kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5374; Set allow auto topic creation to false when requesting node information only
Date Sat, 03 Jun 2017 05:26:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 054e9740f -> a30513a53


KAFKA-5374; Set allow auto topic creation to false when requesting node information only

It avoids the need to handle protocol downgrades and it's safe (i.e. it will never cause
the auto creation of topics).

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3220 from ijuma/kafka-5374-admin-client-metadata

(cherry picked from commit f389b715707e4e53eaf6ce476a218a4a06c427ee)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a30513a5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a30513a5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a30513a5

Branch: refs/heads/0.11.0
Commit: a30513a53961cb494ec919b2c7cf6c7ad8927eee
Parents: 054e974
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Sat Jun 3 06:26:06 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Jun 3 06:26:33 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  1 +
 .../kafka/clients/admin/KafkaAdminClient.java   |  8 +++--
 .../client_compatibility_features_test.py       |  2 ++
 .../kafka/tools/ClientCompatibilityTest.java    | 35 ++++++++++++++++++++
 4 files changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a30513a5/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index ab4f15d..f83f6e7 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -162,6 +162,7 @@
 
   <subpackage name="tools">
     <allow pkg="org.apache.kafka.common"/>
+    <allow pkg="org.apache.kafka.clients.admin" />
     <allow pkg="org.apache.kafka.clients.producer" />
     <allow pkg="org.apache.kafka.clients.consumer" />
     <allow pkg="com.fasterxml.jackson" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/a30513a5/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 199b07a..da76032 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -281,8 +281,10 @@ public class KafkaAdminClient extends AdminClient {
         ApiVersions apiVersions = new ApiVersions();
 
         try {
+            // Since we only request node information, it's safe to pass true for allowAutoTopicCreation
(and it
+            // simplifies communication with older brokers)
             metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
-                    config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
+                    config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), true);
             List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class);
             Map<String, String> metricTags = Collections.singletonMap("client-id",
clientId);
@@ -1222,7 +1224,9 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new MetadataRequest.Builder(Collections.<String>emptyList(),
false);
+                // Since this only requests node information, it's safe to pass true for
allowAutoTopicCreation (and it
+                // simplifies communication with older brokers)
+                return new MetadataRequest.Builder(Collections.<String>emptyList(),
true);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a30513a5/tests/kafkatest/tests/client/client_compatibility_features_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index 1b6519d..1b32540 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -77,11 +77,13 @@ class ClientCompatibilityFeaturesTest(Test):
                "--offsets-for-times-supported %s "
                "--cluster-id-supported %s "
                "--expect-record-too-large-exception %s "
+               "--num-cluster-nodes %d "
                "--topic %s " % (self.zk.path.script("kafka-run-class.sh", node),
                                self.kafka.bootstrap_servers(),
                                features["offsets-for-times-supported"],
                                features["cluster-id-supported"],
                                features["expect-record-too-large-exception"],
+                               len(self.kafka.nodes),
                                self.topics.keys()[0]))
         results_dir = TestContext.results_dir(self.test_context, 0)
         os.makedirs(results_dir)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a30513a5/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 2a7d3e6..4fce8ce 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -20,6 +20,8 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -31,6 +33,8 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -44,6 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -69,6 +74,7 @@ public class ClientCompatibilityTest {
         final boolean offsetsForTimesSupported;
         final boolean expectClusterId;
         final boolean expectRecordTooLargeException;
+        final int numClusterNodes;
 
         TestConfig(Namespace res) {
             this.bootstrapServer = res.getString("bootstrapServer");
@@ -76,6 +82,7 @@ public class ClientCompatibilityTest {
             this.offsetsForTimesSupported = res.getBoolean("offsetsForTimesSupported");
             this.expectClusterId = res.getBoolean("clusterIdSupported");
             this.expectRecordTooLargeException = res.getBoolean("expectRecordTooLargeException");
+            this.numClusterNodes = res.getInt("numClusterNodes");
         }
     }
 
@@ -121,6 +128,14 @@ public class ClientCompatibilityTest {
             .help("True if we should expect a RecordTooLargeException when trying to read
from a topic " +
                   "that contains a message that is bigger than " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG
+
                   ".  This is pre-KIP-74 behavior.");
+        parser.addArgument("--num-cluster-nodes")
+            .action(store())
+            .required(true)
+            .type(Integer.class)
+            .dest("numClusterNodes")
+            .metavar("NUM_CLUSTER_NODES")
+            .help("The number of cluster nodes we should expect to see from the AdminClient.");
+
         Namespace res = null;
         try {
             res = parser.parseArgs(args);
@@ -183,6 +198,7 @@ public class ClientCompatibilityTest {
 
     void run() throws Exception {
         long prodTimeMs = Time.SYSTEM.milliseconds();
+        testAdminClient();
         testProduce();
         testConsume(prodTimeMs);
     }
@@ -202,6 +218,25 @@ public class ClientCompatibilityTest {
         producer.close();
     }
 
+    void testAdminClient() throws Exception {
+        Properties adminProps = new Properties();
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
+        try (AdminClient client = AdminClient.create(adminProps)) {
+            while (true) {
+                Collection<Node> nodes = client.describeCluster().nodes().get();
+                if (nodes.size() == testConfig.numClusterNodes) {
+                    break;
+                } else if (nodes.size() > testConfig.numClusterNodes) {
+                    throw new KafkaException("Expected to see " + testConfig.numClusterNodes
+
+                        " nodes, but saw " + nodes.size());
+                }
+                Thread.sleep(1);
+                log.info("Saw only {} cluster nodes.  Waiting to see {}.",
+                    nodes.size(), testConfig.numClusterNodes);
+            }
+        }
+    }
+
     private static class OffsetsForTime {
         Map<TopicPartition, OffsetAndTimestamp> result;
 


Mime
View raw message