kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove deprecated ZkUtils usage from EmbeddedKafkaCluster (#5324)
Date Thu, 19 Jul 2018 21:28:20 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 96c53e9  MINOR: Remove deprecated ZkUtils usage from EmbeddedKafkaCluster (#5324)
96c53e9 is described below

commit 96c53e96b8834c550e6db54b2748c08e545f9150
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Fri Jul 20 02:58:12 2018 +0530

    MINOR: Remove deprecated ZkUtils usage from EmbeddedKafkaCluster (#5324)
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Ismael Juma <ismael@juma.me.uk>,
Guozhang Wang <wangguoz@gmail.com>
---
 checkstyle/import-control.xml                      |  6 +--
 .../integration/AbstractJoinIntegrationTest.java   |  2 +-
 .../integration/InternalTopicIntegrationTest.java  | 44 +++++++++++--------
 .../integration/utils/EmbeddedKafkaCluster.java    | 39 +++++++++++------
 .../streams/integration/utils/KafkaEmbedded.java   | 50 ++++++++++++++--------
 5 files changed, 86 insertions(+), 55 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 106ad0a..35f42e3 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -211,8 +211,6 @@
     <allow pkg="org.apache.kafka.clients"/>
     <allow pkg="org.apache.kafka.clients.producer" exact-match="true"/>
     <allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/>
-    <!-- Temporary until EosTestDriver migrates to the Java AdminClient -->
-    <allow pkg="kafka.admin" exact-match="true"/>
 
     <allow pkg="org.apache.kafka.streams"/>
 
@@ -231,11 +229,9 @@
       <allow pkg="kafka.server" />
       <allow pkg="kafka.tools" />
       <allow pkg="kafka.utils" />
-      <allow pkg="kafka.zk" />
-      <allow pkg="kafka.zookeeper" />
       <allow pkg="kafka.log" />
       <allow pkg="scala" />
-      <allow pkg="scala.collection" />
+      <allow class="kafka.zk.EmbeddedZookeeper"/>
     </subpackage>
 
     <subpackage name="test">
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 80ab606..3e29fc2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -163,7 +163,7 @@ public abstract class AbstractJoinIntegrationTest {
 
     @After
     public void cleanup() throws InterruptedException {
-        CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
+        CLUSTER.deleteAllTopicsAndWait(120000);
     }
 
     private void checkResult(final String outputTopic, final List<String> expectedResult)
throws InterruptedException {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index d379e0d..e120999 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -18,13 +18,15 @@ package org.apache.kafka.streams.integration;
 
 import kafka.log.LogConfig;
 import kafka.utils.MockTime;
-import kafka.zk.AdminZkClient;
-import kafka.zk.KafkaZkClient;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -46,14 +48,14 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import scala.collection.JavaConverters;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
@@ -70,8 +72,6 @@ public class InternalTopicIntegrationTest {
 
     private static final String APP_ID = "internal-topics-integration-test";
     private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
-    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
-    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
 
     private final MockTime mockTime = CLUSTER.time;
 
@@ -113,23 +113,29 @@ public class InternalTopicIntegrationTest {
     }
 
     private Properties getTopicProperties(final String changelog) {
-        try (KafkaZkClient kafkaZkClient = KafkaZkClient.apply(CLUSTER.zKConnectString(),
false,
-                DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE,
-                Time.SYSTEM, "testMetricGroup", "testMetricType")) {
-            final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
-            final Map<String, Properties> topicConfigs =
-                JavaConverters.mapAsJavaMapConverter(adminZkClient.getAllTopicConfigs()).asJava();
-
-            for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet())
{
-                if (topicConfig.getKey().equals(changelog)) {
-                    return topicConfig.getValue();
+        try (final AdminClient adminClient = createAdminClient()) {
+            final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,
changelog);
+            try {
+                final Config config = adminClient.describeConfigs(Collections.singletonList(configResource)).values().get(configResource).get();
+                final Properties properties = new Properties();
+                for (final ConfigEntry configEntry : config.entries()) {
+                    if (configEntry.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
{
+                        properties.put(configEntry.name(), configEntry.value());
+                    }
                 }
+                return properties;
+            } catch (final InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
             }
-
-            return new Properties();
         }
     }
 
+    private AdminClient createAdminClient() {
+        final Properties adminClientConfig = new Properties();
+        adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        return AdminClient.create(adminClientConfig);
+    }
+
     @Test
     public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception {
         final String appID = APP_ID + "-compact";
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 3ea365c..daa1103 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -19,11 +19,9 @@ package org.apache.kafka.streams.integration.utils;
 import kafka.server.KafkaConfig$;
 import kafka.server.KafkaServer;
 import kafka.utils.MockTime;
-import kafka.utils.ZkUtils;
 import kafka.zk.EmbeddedZookeeper;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.rules.ExternalResource;
@@ -33,6 +31,7 @@ import scala.collection.JavaConverters;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -40,7 +39,7 @@ import java.util.Properties;
 import java.util.Set;
 
 /**
- * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
+ * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and supplied number
of Kafka brokers.
  */
 public class EmbeddedKafkaCluster extends ExternalResource {
 
@@ -50,7 +49,6 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     private static final int TOPIC_DELETION_TIMEOUT = 30000;
     private EmbeddedZookeeper zookeeper = null;
     private final KafkaEmbedded[] brokers;
-    private ZkUtils zkUtils = null;
 
     private final Properties brokerConfig;
     public final MockTime time;
@@ -88,12 +86,6 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         zookeeper = new EmbeddedZookeeper();
         log.debug("ZooKeeper instance is running at {}", zKConnectString());
 
-        zkUtils = ZkUtils.apply(
-            zKConnectString(),
-            30000,
-            30000,
-            JaasUtils.isZkSecurityEnabled());
-
         brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
         brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
@@ -126,7 +118,6 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         for (final KafkaEmbedded broker : brokers) {
             broker.stop();
         }
-        zkUtils.close();
         zookeeper.shutdown();
     }
 
@@ -274,6 +265,24 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         }
     }
 
+    /**
+     * Deletes all topics and blocks until all topics got deleted.
+     *
+     * @param timeoutMs the max time to wait for the topics to be deleted (does not block
if {@code <= 0})
+     */
+    public void deleteAllTopicsAndWait(final long timeoutMs) throws InterruptedException
{
+        final List<String> topics = JavaConverters.seqAsJavaListConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava();
+        for (final String topic : topics) {
+            try {
+                brokers[0].deleteTopic(topic);
+            } catch (final UnknownTopicOrPartitionException e) { }
+        }
+
+        if (timeoutMs > 0) {
+            TestUtils.waitForCondition(new TopicsDeletedCondition(topics), timeoutMs, "Topics
not deleted after " + timeoutMs + " milli seconds.");
+        }
+    }
+
     public void deleteAndRecreateTopics(final String... topics) throws InterruptedException
{
         deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
         createTopics(topics);
@@ -295,10 +304,14 @@ public class EmbeddedKafkaCluster extends ExternalResource {
             Collections.addAll(deletedTopics, topics);
         }
 
+        private TopicsDeletedCondition(final Collection<String> topics) {
+            deletedTopics.addAll(topics);
+        }
+
         @Override
         public boolean conditionMet() {
             final Set<String> allTopics = new HashSet<>(
-                    JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
+                    JavaConverters.seqAsJavaListConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava());
             return !allTopics.removeAll(deletedTopics);
         }
     }
@@ -313,7 +326,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         @Override
         public boolean conditionMet() {
             final Set<String> allTopics = new HashSet<>(
-                    JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
+                    JavaConverters.seqAsJavaListConverter(brokers[0].kafkaServer().zkClient().getAllTopicsInCluster()).asJava());
             return allTopics.equals(remainingTopics);
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index c884320..dcacf68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -16,16 +16,19 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
-import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaConfig$;
 import kafka.server.KafkaServer;
 import kafka.utils.MockTime;
 import kafka.utils.TestUtils;
-import kafka.zk.AdminZkClient;
-import kafka.zk.KafkaZkClient;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -33,7 +36,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
 /**
  * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092`
by
@@ -46,8 +52,6 @@ public class KafkaEmbedded {
     private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class);
 
     private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
-    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
-    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000;
 
     private final Properties effectiveConfig;
     private final File logDir;
@@ -93,6 +97,7 @@ public class KafkaEmbedded {
         effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true);
         effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000);
         effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true);
+        effectiveConfig.put(KafkaConfig$.MODULE$.ZkSessionTimeoutMsProp(), 10000);
 
         effectiveConfig.putAll(initialConfig);
         effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath());
@@ -171,24 +176,35 @@ public class KafkaEmbedded {
                             final Properties topicConfig) {
         log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {}
}",
             topic, partitions, replication, topicConfig);
-        try (KafkaZkClient kafkaZkClient = createZkClient()) {
-            final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
-            adminZkClient.createTopic(topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
+        final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
+        newTopic.configs((Map) topicConfig);
+
+        try (final AdminClient adminClient = createAdminClient()) {
+            adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
+        } catch (final InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
         }
     }
 
-    private KafkaZkClient createZkClient() {
-        return KafkaZkClient.apply(zookeeperConnect(), false, DEFAULT_ZK_SESSION_TIMEOUT_MS,
-                DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, "testMetricGroup",
"testMetricType");
+    public AdminClient createAdminClient() {
+        final Properties adminClientConfig = new Properties();
+        adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList());
+        final Object listeners = effectiveConfig.get(KafkaConfig$.MODULE$.ListenersProp());
+        if (listeners != null && listeners.toString().contains("SSL")) {
+            adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+            adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password)
effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
+            adminClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        }
+        return AdminClient.create(adminClientConfig);
     }
 
     public void deleteTopic(final String topic) {
         log.debug("Deleting topic { name: {} }", topic);
-
-        try (KafkaZkClient kafkaZkClient = createZkClient()) {
-            final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
-            adminZkClient.deleteTopic(topic);
-            kafkaZkClient.close();
+        try (final AdminClient adminClient = createAdminClient()) {
+            adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
+        } catch (final InterruptedException | ExecutionException e) {
+            if (!(e.getCause() instanceof UnknownTopicOrPartitionException))
+                throw new RuntimeException(e);
         }
     }
 


Mime
View raw message