kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.1 updated: MINOR: Fix Streams EOS tests (#5612)
Date Wed, 05 Sep 2018 17:50:33 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 142fade  MINOR: Fix Streams EOS tests (#5612)
142fade is described below

commit 142fadeca2ce8ea1dcb1471db53c39a894ab71cb
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Wed Sep 5 10:50:22 2018 -0700

    MINOR: Fix Streams EOS tests (#5612)
    
    Back porting #5501 broke some tests.
    
    Reviewers: John Roesler <vvcephei@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
---
 .../apache/kafka/streams/processor/internals/StreamTaskTest.java   | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1406ee4..ad7ea11 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1269,11 +1269,12 @@ public class StreamTaskTest {
 
     @Test
     public void shouldAlwaysCommitIfEosEnabled() {
+        task = createStatelessTask(true);
+
         final RecordCollectorImpl recordCollector =  new RecordCollectorImpl("StreamTask",
                 new LogContext("StreamTaskTest "), new DefaultProductionExceptionHandler());
         recordCollector.init(producer);
 
-        task = createStatelessTask(true);
         task.initializeStateStores();
         task.initializeTopology();
         task.punctuate(processorSystemTime, 5, PunctuationType.WALL_CLOCK_TIME, new Punctuator()
{
@@ -1355,7 +1356,7 @@ public class StreamTaskTest {
     }
 
     // this task will throw exception when processing (on partition2), flushing, suspending
and closing
-    private StreamTask createTaskThatThrowsException(final boolean enableEos) {
+    private StreamTask createTaskThatThrowsException(final boolean eosEnabled) {
         final ProcessorTopology topology = ProcessorTopology.withSources(
                 Utils.<ProcessorNode>mkList(source1, source3, processorStreamTime,
processorSystemTime),
                 new HashMap<String, SourceNode>() {
@@ -1371,7 +1372,7 @@ public class StreamTaskTest {
         source1.addChild(processorSystemTime);
         source3.addChild(processorSystemTime);
 
-        return new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
config,
+        return new StreamTask(taskId00, partitions, topology, consumer, changelogReader,
eosEnabled ? eosConfig : config,
             streamsMetrics, stateDirectory, null, time, new StreamTask.ProducerSupplier()
{
                 @Override
                 public Producer<byte[], byte[]> get() {


Mime
View raw message