kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5003; StreamThread should catch InvalidTopicException
Date Thu, 06 Apr 2017 11:02:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d5fb7364a -> afeadbef5


KAFKA-5003; StreamThread should catch InvalidTopicException

We should catch `InvalidTopicException` and not just
`NoOffsetForPartitionException`. Also, we need to step through
all partitions that might be affected and reset those.

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Eno Thereska <eno@confluent.io>, Damian
Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2747 from mjsax/minor-fix-reset


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/afeadbef
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/afeadbef
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/afeadbef

Branch: refs/heads/trunk
Commit: afeadbef50ee8cb5c23de26c1b2a5ad2c7ad941e
Parents: d5fb736
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Thu Apr 6 11:54:53 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Apr 6 12:00:38 2017 +0100

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       |  51 ++++--
 ...eamsFineGrainedAutoResetIntegrationTest.java | 172 ++++++++++++++-----
 2 files changed, 160 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/afeadbef/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 8bd6d1a..c700cad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
@@ -544,38 +544,55 @@ public class StreamThread extends Thread {
 
         try {
             records = consumer.poll(pollTimeMs);
-        } catch (NoOffsetForPartitionException ex) {
-            TopicPartition partition = ex.partition();
+        } catch (final InvalidOffsetException e) {
+            resetInvalidOffsets(e);
+        }
+
+        return records;
+    }
+
+    private void resetInvalidOffsets(final InvalidOffsetException e) {
+        final Set<TopicPartition> partitions = e.partitions();
+        final Set<String> loggedTopics = new HashSet<>();
+        final Set<TopicPartition> seekToBeginning = new HashSet<>();
+        final Set<TopicPartition> seekToEnd = new HashSet<>();
+
+        for (final TopicPartition partition : partitions) {
             if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches())
{
-                log.info(String.format("stream-thread [%s] setting topic to consume from
earliest offset %s", this.getName(), partition.topic()));
-                consumer.seekToBeginning(ex.partitions());
+                addToResetList(partition, seekToBeginning, "stream-thread [%s] setting topic
%s to consume from %s offset", "earliest", loggedTopics);
             } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches())
{
-                consumer.seekToEnd(ex.partitions());
-                log.info(String.format("stream-thread [%s] setting topic to consume from
latest offset %s", this.getName(), partition.topic()));
+                addToResetList(partition, seekToEnd, "stream-thread [%s] setting topic %s
to consume from %s offset", "latest", loggedTopics);
             } else {
-
                 if (originalReset == null || (!originalReset.equals("earliest") &&
!originalReset.equals("latest"))) {
                     setState(State.PENDING_SHUTDOWN);
-                    String errorMessage = "No valid committed offset found for input topic
%s (partition %s) and no valid reset policy configured." +
+                    final String errorMessage = "No valid committed offset found for input
topic %s (partition %s) and no valid reset policy configured." +
                         " You need to set configuration parameter \"auto.offset.reset\" or
specify a topic specific reset " +
                         "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset,
...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
-                    throw new StreamsException(String.format(errorMessage, partition.topic(),
partition.partition()), ex);
+                    throw new StreamsException(String.format(errorMessage, partition.topic(),
partition.partition()), e);
                 }
 
                 if (originalReset.equals("earliest")) {
-                    consumer.seekToBeginning(ex.partitions());
+                    addToResetList(partition, seekToBeginning, "stream-thread [%s] no custom
setting defined for topic %s using original config %s for offset reset", "earliest", loggedTopics);
                 } else if (originalReset.equals("latest")) {
-                    consumer.seekToEnd(ex.partitions());
+                    addToResetList(partition, seekToEnd, "stream-thread [%s] no custom setting
defined for topic %s using original config %s for offset reset", "latest", loggedTopics);
                 }
-                log.info(String.format("stream-thread [%s] no custom setting defined for
topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset));
             }
-
         }
 
-        if (rebalanceException != null)
-            throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException);
+        if (!seekToBeginning.isEmpty()) {
+            consumer.seekToBeginning(seekToBeginning);
+        }
+        if (!seekToEnd.isEmpty()) {
+            consumer.seekToEnd(seekToEnd);
+        }
+    }
 
-        return records;
+    private void addToResetList(final TopicPartition partition, final Set<TopicPartition>
partitions, final String logMessage, final String resetPolicy, final Set<String> loggedTopics)
{
+        final String topic = partition.topic();
+        if (loggedTopics.add(topic)) {
+            log.info(String.format(logMessage, getName(), topic, resetPolicy));
+        }
+        partitions.add(partition);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/afeadbef/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index 3028b6b..3594225 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -19,12 +19,16 @@ package org.apache.kafka.streams.integration;
 
 import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -43,7 +47,9 @@ import org.junit.experimental.categories.Category;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
@@ -56,35 +62,71 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
 
     private static final int NUM_BROKERS = 1;
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+    private static final String OUTPUT_TOPIC_0 = "outputTopic_0";
+    private static final String OUTPUT_TOPIC_1 = "outputTopic_1";
+    private static final String OUTPUT_TOPIC_2 = "outputTopic_2";
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private final MockTime mockTime = CLUSTER.time;
 
-    private static final String TOPIC_1 = "topic-1";
-    private static final String TOPIC_2 = "topic-2";
-    private static final String TOPIC_A = "topic-A";
-    private static final String TOPIC_C = "topic-C";
-    private static final String TOPIC_Y = "topic-Y";
-    private static final String TOPIC_Z = "topic-Z";
+    private static final String TOPIC_1_0 = "topic-1_0";
+    private static final String TOPIC_2_0 = "topic-2_0";
+    private static final String TOPIC_A_0 = "topic-A_0";
+    private static final String TOPIC_C_0 = "topic-C_0";
+    private static final String TOPIC_Y_0 = "topic-Y_0";
+    private static final String TOPIC_Z_0 = "topic-Z_0";
+    private static final String TOPIC_1_1 = "topic-1_1";
+    private static final String TOPIC_2_1 = "topic-2_1";
+    private static final String TOPIC_A_1 = "topic-A_1";
+    private static final String TOPIC_C_1 = "topic-C_1";
+    private static final String TOPIC_Y_1 = "topic-Y_1";
+    private static final String TOPIC_Z_1 = "topic-Z_1";
+    private static final String TOPIC_1_2 = "topic-1_2";
+    private static final String TOPIC_2_2 = "topic-2_2";
+    private static final String TOPIC_A_2 = "topic-A_2";
+    private static final String TOPIC_C_2 = "topic-C_2";
+    private static final String TOPIC_Y_2 = "topic-Y_2";
+    private static final String TOPIC_Z_2 = "topic-Z_2";
     private static final String NOOP = "noop";
     private final Serde<String> stringSerde = Serdes.String();
 
     private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
     private Properties streamsConfiguration;
 
+    private final String topic1TestMessage = "topic-1 test";
+    private final String topic2TestMessage = "topic-2 test";
+    private final String topicATestMessage = "topic-A test";
+    private final String topicCTestMessage = "topic-C test";
+    private final String topicYTestMessage = "topic-Y test";
+    private final String topicZTestMessage = "topic-Z test";
+
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(TOPIC_1);
-        CLUSTER.createTopic(TOPIC_2);
-        CLUSTER.createTopic(TOPIC_A);
-        CLUSTER.createTopic(TOPIC_C);
-        CLUSTER.createTopic(TOPIC_Y);
-        CLUSTER.createTopic(TOPIC_Z);
+        CLUSTER.createTopic(TOPIC_1_0);
+        CLUSTER.createTopic(TOPIC_2_0);
+        CLUSTER.createTopic(TOPIC_A_0);
+        CLUSTER.createTopic(TOPIC_C_0);
+        CLUSTER.createTopic(TOPIC_Y_0);
+        CLUSTER.createTopic(TOPIC_Z_0);
+        CLUSTER.createTopic(TOPIC_1_1);
+        CLUSTER.createTopic(TOPIC_2_1);
+        CLUSTER.createTopic(TOPIC_A_1);
+        CLUSTER.createTopic(TOPIC_C_1);
+        CLUSTER.createTopic(TOPIC_Y_1);
+        CLUSTER.createTopic(TOPIC_Z_1);
+        CLUSTER.createTopic(TOPIC_1_2);
+        CLUSTER.createTopic(TOPIC_2_2);
+        CLUSTER.createTopic(TOPIC_A_2);
+        CLUSTER.createTopic(TOPIC_C_2);
+        CLUSTER.createTopic(TOPIC_Y_2);
+        CLUSTER.createTopic(TOPIC_Z_2);
         CLUSTER.createTopic(NOOP);
         CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
-
+        CLUSTER.createTopic(OUTPUT_TOPIC_0);
+        CLUSTER.createTopic(OUTPUT_TOPIC_1);
+        CLUSTER.createTopic(OUTPUT_TOPIC_2);
     }
 
     @Before
@@ -105,41 +147,64 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
     }
 
     @Test
-    public void shouldOnlyReadRecordsWhereEarliestSpecified() throws  Exception {
+    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithGlobalAutoOffsetResetLatest()
throws  Exception {
+        streamsConfiguration.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"latest");
+
+        final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage,
topic2TestMessage);
+        shouldOnlyReadForEarliest("_0", TOPIC_1_0, TOPIC_2_0, TOPIC_A_0, TOPIC_C_0, TOPIC_Y_0,
TOPIC_Z_0, OUTPUT_TOPIC_0, expectedReceivedValues);
+    }
+
+    @Test
+    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithNoCommittedOffsetsWithDefaultGlobalAutoOffsetResetEarliest()
throws  Exception {
+        final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage,
topic2TestMessage, topicYTestMessage, topicZTestMessage);
+        shouldOnlyReadForEarliest("_1", TOPIC_1_1, TOPIC_2_1, TOPIC_A_1, TOPIC_C_1, TOPIC_Y_1,
TOPIC_Z_1, OUTPUT_TOPIC_1, expectedReceivedValues);
+    }
+
+    @Test
+    public void shouldOnlyReadRecordsWhereEarliestSpecifiedWithInvalidCommittedOffsets()
throws  Exception {
+        commitInvalidOffsets();
+
+        final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage,
topic2TestMessage, topicYTestMessage, topicZTestMessage);
+        shouldOnlyReadForEarliest("_2", TOPIC_1_2, TOPIC_2_2, TOPIC_A_2, TOPIC_C_2, TOPIC_Y_2,
TOPIC_Z_2, OUTPUT_TOPIC_2, expectedReceivedValues);
+    }
+
+    private void shouldOnlyReadForEarliest(
+        final String topicSuffix,
+        final String topic1,
+        final String topic2,
+        final String topicA,
+        final String topicC,
+        final String topicY,
+        final String topicZ,
+        final String outputTopic,
+        final List<String> expectedReceivedValues) throws Exception {
+
         final KStreamBuilder builder = new KStreamBuilder();
 
-        final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST,
Pattern.compile("topic-\\d"));
-        final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST,
Pattern.compile("topic-[A-D]"));
-        final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
+        final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST,
Pattern.compile("topic-\\d" + topicSuffix));
+        final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST,
Pattern.compile("topic-[A-D]" + topicSuffix));
+        final KStream<String, String> namedTopicsStream = builder.stream(topicY, topicZ);
 
-        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
-        pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
-        namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+        pattern1Stream.to(stringSerde, stringSerde, outputTopic);
+        pattern2Stream.to(stringSerde, stringSerde, outputTopic);
+        namedTopicsStream.to(stringSerde, stringSerde, outputTopic);
 
         final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class);
 
-        final String topic1TestMessage = "topic-1 test";
-        final String topic2TestMessage = "topic-2 test";
-        final String topicATestMessage = "topic-A test";
-        final String topicCTestMessage = "topic-C test";
-        final String topicYTestMessage = "topic-Y test";
-        final String topicZTestMessage = "topic-Z test";
-
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singletonList(topic1TestMessage),
producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singletonList(topic2TestMessage),
producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singletonList(topicATestMessage),
producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singletonList(topicCTestMessage),
producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singletonList(topicYTestMessage),
producerConfig, mockTime);
-        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singletonList(topicZTestMessage),
producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(topic1, Collections.singletonList(topic1TestMessage),
producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(topic2, Collections.singletonList(topic2TestMessage),
producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(topicA, Collections.singletonList(topicATestMessage),
producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(topicC, Collections.singletonList(topicCTestMessage),
producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(topicY, Collections.singletonList(topicYTestMessage),
producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(topicZ, Collections.singletonList(topicZTestMessage),
producerConfig, mockTime);
 
         final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(),
StringDeserializer.class, StringDeserializer.class);
 
         final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
         streams.start();
 
-        final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage,
topic2TestMessage, topicYTestMessage, topicZTestMessage);
-        final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
DEFAULT_OUTPUT_TOPIC, 4);
-        final List<String> actualValues = new ArrayList<>(4);
+        final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
outputTopic, expectedReceivedValues.size());
+        final List<String> actualValues = new ArrayList<>(expectedReceivedValues.size());
 
         for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) {
             actualValues.add(receivedKeyValue.value);
@@ -149,35 +214,50 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
         Collections.sort(actualValues);
         Collections.sort(expectedReceivedValues);
         assertThat(actualValues, equalTo(expectedReceivedValues));
-
     }
 
+    private void commitInvalidOffsets() {
+        final KafkaConsumer consumer = new KafkaConsumer(TestUtils.consumerConfig(
+            CLUSTER.bootstrapServers(),
+            streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
+            StringDeserializer.class,
+            StringDeserializer.class));
+
+        final Map<TopicPartition, OffsetAndMetadata> invalidOffsets = new HashMap<>();
+        invalidOffsets.put(new TopicPartition(TOPIC_1_2, 0), new OffsetAndMetadata(5, null));
+        invalidOffsets.put(new TopicPartition(TOPIC_2_2, 0), new OffsetAndMetadata(5, null));
+        invalidOffsets.put(new TopicPartition(TOPIC_A_2, 0), new OffsetAndMetadata(5, null));
+        invalidOffsets.put(new TopicPartition(TOPIC_C_2, 0), new OffsetAndMetadata(5, null));
+        invalidOffsets.put(new TopicPartition(TOPIC_Y_2, 0), new OffsetAndMetadata(5, null));
+        invalidOffsets.put(new TopicPartition(TOPIC_Z_2, 0), new OffsetAndMetadata(5, null));
+
+        consumer.commitSync(invalidOffsets);
+
+        consumer.close();
+    }
 
     @Test(expected = TopologyBuilderException.class)
     public void shouldThrowExceptionOverlappingPattern() throws  Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         //NOTE this would realistically get caught when building topology, the test is for
completeness
-        final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST,
Pattern.compile("topic-[A-D]"));
-        final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST,
Pattern.compile("topic-[A-D]"));
-        final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
+        builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
+        builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1"));
+        builder.stream(TOPIC_Y_1, TOPIC_Z_1);
 
         builder.earliestResetTopicsPattern();
-
     }
 
     @Test(expected = TopologyBuilderException.class)
     public void shouldThrowExceptionOverlappingTopic() throws  Exception {
         final KStreamBuilder builder = new KStreamBuilder();
         //NOTE this would realistically get caught when building topology, the test is for
completeness
-        final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST,
Pattern.compile("topic-[A-D]"));
-        final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST,
Pattern.compile("topic-\\d]"));
-        final KStream<String, String> namedTopicsStream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST,
TOPIC_A, TOPIC_Z);
+        builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
+        builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d_1"));
+        builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
 
         builder.latestResetTopicsPattern();
-
     }
 
-
     @Test
     public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception {
         Properties props = new Properties();


Mime
View raw message