kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6318: StreamsResetter should return non-zero return code on error (#4305)
Date Tue, 02 Jan 2018 18:46:00 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fd2bdbf  KAFKA-6318: StreamsResetter should return non-zero return code on error
(#4305)
fd2bdbf is described below

commit fd2bdbfd700273699725f281a7abab4ff8e55a04
Author: Siva Santhalingam <siva.santhalingam@gmail.com>
AuthorDate: Tue Jan 2 10:45:57 2018 -0800

    KAFKA-6318: StreamsResetter should return non-zero return code on error (#4305)
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>,
Ted Yu <yuzhihong@gmail.com>, Bill Bejeck <bbejeck@gmail.com>
---
 .../main/scala/kafka/tools/StreamsResetter.java    |  9 ++--
 .../integration/AbstractResetIntegrationTest.java  | 25 +++++++++++
 .../streams/integration/ResetIntegrationTest.java  | 51 ++++++++++++++++++++++
 3 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 4851a94..4496876 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -130,7 +130,7 @@ public class StreamsResetter {
 
             final HashMap<Object, Object> consumerConfig = new HashMap<>(config);
             consumerConfig.putAll(properties);
-            maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
+            exitCode = maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig,
dryRun);
             maybeDeleteInternalTopics(kafkaAdminClient, dryRun);
 
         } catch (final Throwable e) {
@@ -237,9 +237,10 @@ public class StreamsResetter {
         CommandLineUtils.checkInvalidArgs(optionParser, options, shiftByOption, allScenarioOptions.$minus(shiftByOption));
     }
 
-    private void maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig,
final boolean dryRun) throws Exception {
+    private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig,
final boolean dryRun) throws Exception {
         final List<String> inputTopics = options.valuesOf(inputTopicsOption);
         final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
+        int topicNotFound = EXIT_CODE_SUCCESS;
 
         final List<String> notFoundInputTopics = new ArrayList<>();
         final List<String> notFoundIntermediateTopics = new ArrayList<>();
@@ -248,7 +249,7 @@ public class StreamsResetter {
 
         if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
             System.out.println("No input or intermediate topics specified. Skipping seek.");
-            return;
+            return EXIT_CODE_SUCCESS;
         }
 
         if (inputTopics.size() != 0) {
@@ -316,6 +317,7 @@ public class StreamsResetter {
                 for (final String topic : notFoundInputTopics) {
                     System.out.println("Topic: " + topic);
                 }
+                topicNotFound = EXIT_CODE_ERROR;
             }
 
             if (notFoundIntermediateTopics.size() > 0) {
@@ -330,6 +332,7 @@ public class StreamsResetter {
             throw e;
         }
         System.out.println("Done.");
+        return topicNotFound;
     }
 
     // visible for testing
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 9131007..26673ca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -73,6 +73,7 @@ abstract class AbstractResetIntegrationTest {
     private static final String OUTPUT_TOPIC_2 = "outputTopic2";
     private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
     private static final String INTERMEDIATE_USER_TOPIC = "userTopic";
+    private static final String NON_EXISTING_TOPIC = "nonExistingTopic2";
 
     private static final long STREAMS_CONSUMER_TIMEOUT = 2000L;
     private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
@@ -612,6 +613,30 @@ abstract class AbstractResetIntegrationTest {
         Assert.assertEquals(0, exitCode);
     }
 
+    void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
+
+        final Properties streamsConfiguration = prepareTest();
+        final List<String> parameterList = new ArrayList<>(
+                Arrays.asList("--application-id", APP_ID + testNo,
+                        "--bootstrap-servers", bootstrapServers,
+                        "--input-topics", NON_EXISTING_TOPIC));
+
+        final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+        // RUN
+        KafkaStreams streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(),
streamsConfiguration);
+        streams.start();
+
+        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
+
+        streams.close();
+
+    }
+
     private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws
Exception {
         // do not use list topics request, but read from the embedded cluster's zookeeper
path directly to confirm
         if (intermediateUserTopic != null) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index d781d95..ef9a67d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -18,14 +18,21 @@ package org.apache.kafka.streams.integration;
 
 import kafka.server.KafkaConfig$;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import kafka.tools.StreamsResetter;
 
 import java.util.Properties;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+
 
 /**
  * Tests local state store and global application cleanup.
@@ -35,6 +42,10 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest
{
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER;
+    private static final long CLEANUP_CONSUMER_TIMEOUT = 2000L;
+    private static final String APP_ID = "Integration-test";
+    private static final String NON_EXISTING_TOPIC = "nonExistingTopic";
+    private static int testNo = 1;
 
     static {
         final Properties props = new Properties();
@@ -81,4 +92,44 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest
{
     public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws
Exception {
         super.testReprocessingByDurationAfterResetWithoutIntermediateUserTopic();
     }
+
+    @Test
+    public void shouldNotAllowToResetWhileStreamsRunning() throws Exception {
+        super.shouldNotAllowToResetWhileStreamsIsRunning();
+    }
+
+    @Test
+    public void shouldNotAllowToResetWhenInputTopicAbsent() throws Exception {
+
+        final List<String> parameterList = new ArrayList<>(
+                Arrays.asList("--application-id", APP_ID + testNo,
+                        "--bootstrap-servers", bootstrapServers,
+                        "--input-topics", NON_EXISTING_TOPIC));
+
+        final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
+    }
+
+    @Test
+    public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception {
+
+        final List<String> parameterList = new ArrayList<>(
+                Arrays.asList("--application-id", APP_ID + testNo,
+                        "--bootstrap-servers", bootstrapServers,
+                        "--intermediate-topics", NON_EXISTING_TOPIC));
+
+        final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
+        final Properties cleanUpConfig = new Properties();
+        cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
+        cleanUpConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + CLEANUP_CONSUMER_TIMEOUT);
+
+        final int exitCode = new StreamsResetter().run(parameters, cleanUpConfig);
+        Assert.assertEquals(1, exitCode);
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message