kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5862: Remove ZK dependency from Streams reset tool, Part I
Date Sat, 23 Sep 2017 04:05:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1fd70c7c9 -> 271f6b5ae


KAFKA-5862: Remove ZK dependency from Streams reset tool, Part I

Author: Bill Bejeck <bill@confluent.io>
Author: bbejeck <bbejeck@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Ted Yu <yuzhihong@gmail.com>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3927 from bbejeck/KAFKA-5862_remove_zk_dependency_from_streams_reset_tool


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/271f6b5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/271f6b5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/271f6b5a

Branch: refs/heads/trunk
Commit: 271f6b5aec885d2eb348dea4de637ac269d3e1ca
Parents: 1fd70c7
Author: Bill Bejeck <bill@confluent.io>
Authored: Sat Sep 23 12:05:16 2017 +0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sat Sep 23 12:05:16 2017 +0800

----------------------------------------------------------------------
 checkstyle/import-control-core.xml              |   1 +
 .../main/scala/kafka/tools/StreamsResetter.java | 123 +++++++++++--------
 .../integration/ResetIntegrationTest.java       |  59 ++++-----
 3 files changed, 103 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/271f6b5a/checkstyle/import-control-core.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 856df58..bf06a19 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -53,6 +53,7 @@
   </subpackage>
 
   <subpackage name="tools">
+    <allow pkg="org.apache.kafka.clients.admin" />
     <allow pkg="kafka.admin" />
     <allow pkg="kafka.javaapi" />
     <allow pkg="kafka.producer" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/271f6b5a/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 9cf0e5c..09d0d75 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -17,19 +17,14 @@
 package kafka.tools;
 
 
-import joptsimple.OptionException;
-import joptsimple.OptionParser;
-import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
-import joptsimple.OptionSpecBuilder;
-import kafka.admin.AdminClient;
-import kafka.admin.TopicCommand;
-import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
 
@@ -38,8 +33,16 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import joptsimple.OptionException;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
 
 /**
  * {@link StreamsResetter} resets the processing state of a Kafka Streams application so
that, for example, you can reprocess its input from scratch.
@@ -68,7 +71,7 @@ public class StreamsResetter {
     private static final int EXIT_CODE_ERROR = 1;
 
     private static OptionSpec<String> bootstrapServerOption;
-    private static OptionSpec<String> zookeeperOption;
+    private static OptionSpecBuilder zookeeperOption;
     private static OptionSpec<String> applicationIdOption;
     private static OptionSpec<String> inputTopicsOption;
     private static OptionSpec<String> intermediateTopicsOption;
@@ -89,52 +92,57 @@ public class StreamsResetter {
 
         int exitCode = EXIT_CODE_SUCCESS;
 
-        AdminClient adminClient = null;
-        ZkUtils zkUtils = null;
+        KafkaAdminClient kafkaAdminClient = null;
+
         try {
             parseArguments(args);
             dryRun = options.has(dryRunOption);
 
-            adminClient = AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
             final String groupId = options.valueOf(applicationIdOption);
 
+            validateNoActiveConsumers(groupId);
 
-            zkUtils = ZkUtils.apply(options.valueOf(zookeeperOption),
-                30000,
-                30000,
-                JaasUtils.isZkSecurityEnabled());
+            final Properties adminClientProperties = new Properties();
+            adminClientProperties.put("bootstrap.servers", options.valueOf(bootstrapServerOption));
+            kafkaAdminClient = (KafkaAdminClient) AdminClient.create(adminClientProperties);
 
             allTopics.clear();
-            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
-
-
-            if (!adminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty())
{
-                throw new IllegalStateException("Consumer group '" + groupId + "' is still
active. " +
-                            "Make sure to stop all running application instances before running
the reset tool.");
-            }
+            allTopics.addAll(kafkaAdminClient.listTopics().names().get(60, TimeUnit.SECONDS));
 
             if (dryRun) {
                 System.out.println("----Dry run displays the actions which will be performed
when running Streams Reset Tool----");
             }
             maybeResetInputAndSeekToEndIntermediateTopicOffsets();
-            maybeDeleteInternalTopics(zkUtils);
+            maybeDeleteInternalTopics(kafkaAdminClient);
 
         } catch (final Throwable e) {
             exitCode = EXIT_CODE_ERROR;
             System.err.println("ERROR: " + e);
             e.printStackTrace(System.err);
         } finally {
-            if (adminClient != null) {
-                adminClient.close();
-            }
-            if (zkUtils != null) {
-                zkUtils.close();
+            if (kafkaAdminClient != null) {
+                kafkaAdminClient.close(60, TimeUnit.SECONDS);
             }
         }
 
         return exitCode;
     }
 
+    private void validateNoActiveConsumers(final String groupId) {
+        kafka.admin.AdminClient olderAdminClient = null;
+        try {
+            olderAdminClient = kafka.admin.AdminClient.createSimplePlaintext(options.valueOf(bootstrapServerOption));
+            if (!olderAdminClient.describeConsumerGroup(groupId, 0).consumers().get().isEmpty())
{
+                throw new IllegalStateException("Consumer group '" + groupId + "' is still
active. "
+                                                + "Make sure to stop all running application
instances before running the reset tool.");
+            }
+        } finally {
+            if (olderAdminClient != null) {
+                olderAdminClient.close();
+            }
+        }
+    }
+
     private void parseArguments(final String[] args) throws IOException {
 
         final OptionParser optionParser = new OptionParser(false);
@@ -148,11 +156,8 @@ public class StreamsResetter {
             .ofType(String.class)
             .defaultsTo("localhost:9092")
             .describedAs("urls");
-        zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper url with format: HOST:POST")
-            .withRequiredArg()
-            .ofType(String.class)
-            .defaultsTo("localhost:2181")
-            .describedAs("url");
+        zookeeperOption = optionParser.accepts("zookeeper", "Zookeeper option is deprecated
by bootstrap.servers, as the reset tool would no longer access Zookeeper directly.");
+
         inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of
user input topics. For these topics, the tool will reset the offset to the earliest available
offset.")
             .withRequiredArg()
             .ofType(String.class)
@@ -314,30 +319,46 @@ public class StreamsResetter {
         return options.valuesOf(intermediateTopicsOption).contains(topic);
     }
 
-    private void maybeDeleteInternalTopics(final ZkUtils zkUtils) {
+    private void maybeDeleteInternalTopics(final KafkaAdminClient adminClient) {
 
         System.out.println("Deleting all internal/auto-created topics for application " +
options.valueOf(applicationIdOption));
-
-        for (final String topic : allTopics) {
-            if (isInternalTopic(topic)) {
-                try {
-                    if (!dryRun) {
-                        final TopicCommand.TopicCommandOptions commandOptions = new TopicCommand.TopicCommandOptions(new
String[]{
-                            "--zookeeper", options.valueOf(zookeeperOption),
-                            "--delete", "--topic", topic});
-                        TopicCommand.deleteTopic(zkUtils, commandOptions);
-                    } else {
-                        System.out.println("Topic: " + topic);
-                    }
-                } catch (final RuntimeException e) {
-                    System.err.println("ERROR: Deleting topic " + topic + " failed.");
-                    throw e;
+        List<String> topicsToDelete = new ArrayList<>();
+        for (final String listing : allTopics) {
+            if (isInternalTopic(listing)) {
+                if (!dryRun) {
+                    topicsToDelete.add(listing);
+                } else {
+                    System.out.println("Topic: " + listing);
                 }
             }
         }
+        if (!dryRun) {
+            doDelete(topicsToDelete, adminClient);
+        }
         System.out.println("Done.");
     }
 
+    private void doDelete(final List<String> topicsToDelete,
+                          final KafkaAdminClient adminClient) {
+        boolean hasDeleteErrors = false;
+        final DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
+        final Map<String, KafkaFuture<Void>> results = deleteTopicsResult.values();
+
+        for (final Map.Entry<String, KafkaFuture<Void>> entry : results.entrySet())
{
+            try {
+                entry.getValue().get(30, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                System.err.println("ERROR: deleting topic " + entry.getKey());
+                e.printStackTrace(System.err);
+                hasDeleteErrors = true;
+            }
+        }
+        if (hasDeleteErrors) {
+            throw new RuntimeException("Encountered an error deleting one or more topics");
+        }
+    }
+
+
     private boolean isInternalTopic(final String topicName) {
         return topicName.startsWith(options.valueOf(applicationIdOption) + "-")
             && (topicName.endsWith("-changelog") || topicName.endsWith("-repartition"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/271f6b5a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 897028d..d76f5da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -16,19 +16,14 @@
  */
 package org.apache.kafka.streams.integration;
 
-import kafka.admin.AdminClient;
-import kafka.server.KafkaConfig$;
-import kafka.tools.StreamsResetter;
-import kafka.utils.MockTime;
-import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -56,6 +51,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import kafka.admin.AdminClient;
+import kafka.server.KafkaConfig$;
+import kafka.tools.StreamsResetter;
+import kafka.utils.MockTime;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -94,6 +95,7 @@ public class ResetIntegrationTest {
 
     private static int testNo = 0;
     private static AdminClient adminClient = null;
+    private static KafkaAdminClient kafkaAdminClient = null;
 
     private final MockTime mockTime = CLUSTER.time;
     private final WaitUntilConsumerGroupGotClosed consumerGroupInactive = new WaitUntilConsumerGroupGotClosed();
@@ -104,6 +106,11 @@ public class ResetIntegrationTest {
             adminClient.close();
             adminClient = null;
         }
+
+        if (kafkaAdminClient != null) {
+            kafkaAdminClient.close(10, TimeUnit.SECONDS);
+            kafkaAdminClient = null;
+        }
     }
 
     @Before
@@ -114,6 +121,12 @@ public class ResetIntegrationTest {
             adminClient = AdminClient.createSimplePlaintext(CLUSTER.bootstrapServers());
         }
 
+        if (kafkaAdminClient == null) {
+            Properties props = new Properties();
+            props.put("bootstrap.servers", CLUSTER.bootstrapServers());
+            kafkaAdminClient =  (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(props);
+        }
+
         // busy wait until cluster (ie, ConsumerGroupCoordinator) is available
         while (true) {
             Thread.sleep(50);
@@ -338,20 +351,20 @@ public class ResetIntegrationTest {
     }
 
     private void cleanGlobal(final String intermediateUserTopic) {
+        // leaving --zookeeper arg here to ensure tool works if users add it
         final String[] parameters;
         if (intermediateUserTopic != null) {
             parameters = new String[]{
                 "--application-id", APP_ID + testNo,
                 "--bootstrap-servers", CLUSTER.bootstrapServers(),
-                "--zookeeper", CLUSTER.zKConnectString(),
                 "--input-topics", INPUT_TOPIC,
-                "--intermediate-topics", INTERMEDIATE_USER_TOPIC
+                "--intermediate-topics", INTERMEDIATE_USER_TOPIC,
+                "--zookeeper", "localhost:2181"
             };
         } else {
             parameters = new String[]{
                 "--application-id", APP_ID + testNo,
                 "--bootstrap-servers", CLUSTER.bootstrapServers(),
-                "--zookeeper", CLUSTER.zKConnectString(),
                 "--input-topics", INPUT_TOPIC
             };
         }
@@ -363,7 +376,7 @@ public class ResetIntegrationTest {
         Assert.assertEquals(0, exitCode);
     }
 
-    private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) {
+    private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws
Exception {
         final Set<String> expectedRemainingTopicsAfterCleanup = new HashSet<>();
         expectedRemainingTopicsAfterCleanup.add(INPUT_TOPIC);
         if (intermediateUserTopic != null) {
@@ -374,25 +387,13 @@ public class ResetIntegrationTest {
         expectedRemainingTopicsAfterCleanup.add(OUTPUT_TOPIC_2_RERUN);
         expectedRemainingTopicsAfterCleanup.add("__consumer_offsets");
 
-        Set<String> allTopics;
-        ZkUtils zkUtils = null;
-        try {
-            zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(),
-                30000,
-                30000,
-                JaasUtils.isZkSecurityEnabled());
-
-            do {
-                Utils.sleep(100);
-                allTopics = new HashSet<>();
-                allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
-            } while (allTopics.size() != expectedRemainingTopicsAfterCleanup.size());
-        } finally {
-            if (zkUtils != null) {
-                zkUtils.close();
-            }
-        }
+        final Set<String> allTopics = new HashSet<>();
+
+        final ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
+        listTopicsOptions.listInternal(true);
+        allTopics.addAll(kafkaAdminClient.listTopics(listTopicsOptions).names().get(30000,
TimeUnit.MILLISECONDS));
         assertThat(allTopics, equalTo(expectedRemainingTopicsAfterCleanup));
+
     }
 
     private class WaitUntilConsumerGroupGotClosed implements TestCondition {


Mime
View raw message