kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-3514: Part II, Choose tasks with data on all partitions to process (#5398)
Date Fri, 03 Aug 2018 01:35:02 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new afe00ef  KAFKA-3514: Part II, Choose tasks with data on all partitions to process
(#5398)
afe00ef is described below

commit afe00effe2d79e89e159972cf1f5f7ffeb0c6e97
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Aug 2 18:34:53 2018 -0700

    KAFKA-3514: Part II, Choose tasks with data on all partitions to process (#5398)
    
    1. In each iteration, decide if a task is processable if all of its partitions contains
data, so it can decide which record to process next.
    
    1.a Add one exception that, if the task indeed have data on some but not all of its partitions,
we only consider as not processable for some finite round of iterations.
    1.b Add a task-level metric to record whenever we are forced to process a task that is
only "partially data available", since it may leads to non-determinism.
    
    2. Break the main loop on put-raw-data and process-them. Since now not all data put into
the queue would be processed completely within a single iteration.
    
    3. NOTE that within an iteration, if a task has exhausted one of its queue it will still
be processed, since we only update processable list once in each iteration, I'm improving
on this on the follow-up part III PR.
    
    4. Found and fixed a bug in metrics recording: the taskName and sensorName parameters
were exchanged.
    
    5. Optimized task stream time computation again since our current partition stream time
reasoning has been simplified.
    
    6. Added unit tests.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@users.noreply.github.com>,
Bill Bejeck <bbejeck@gmail.com>
---
 .../processor/internals/AssignedStreamsTasks.java  |   5 +-
 .../processor/internals/PartitionGroup.java        |  55 +++---
 .../streams/processor/internals/StreamTask.java    |  33 +++-
 .../streams/processor/internals/StreamThread.java  |  13 +-
 .../internals/metrics/StreamsMetricsImpl.java      |   8 +-
 .../kafka/streams/state/internals/NamedCache.java  |   2 +-
 .../integration/utils/IntegrationTestUtils.java    |  41 ++++
 .../internals/AssignedStreamsTasksTest.java        |  27 +++
 .../processor/internals/PartitionGroupTest.java    |  16 +-
 .../processor/internals/StreamTaskTest.java        | 212 +++++++++------------
 ...StreamToTableJoinScalaIntegrationTestBase.scala |  10 +-
 ...bleJoinScalaIntegrationTestImplicitSerdes.scala |   4 -
 12 files changed, 242 insertions(+), 184 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index f98e635..0a83965 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -87,11 +87,13 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
      */
     int process() {
         int processed = 0;
+
         final Iterator<Map.Entry<TaskId, StreamTask>> it = running.entrySet().iterator();
         while (it.hasNext()) {
             final StreamTask task = it.next().getValue();
+
             try {
-                if (task.process()) {
+                if (task.isProcessable() && task.process()) {
                     processed++;
                 }
             } catch (final TaskMigratedException e) {
@@ -108,6 +110,7 @@ class AssignedStreamsTasks extends AssignedTasks<StreamTask> implements
Restorin
                 throw e;
             }
         }
+
         return processed;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 34252bf..f17c63a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -27,15 +27,23 @@ import java.util.Set;
 
 /**
  * A PartitionGroup is composed from a set of partitions. It also maintains the timestamp
of this
- * group, hence the associated task as the min timestamp across all partitions in the group.
+ * group, a.k.a. the stream time of the associated task. It is defined as the maximum timestamp
of
+ * all the records having been retrieved for processing from this PartitionGroup so far.
+ *
+ * We decide from which partition to retrieve the next record to process based on partitions'
timestamps.
+ * The timestamp of a specific partition is initialized as UNKNOWN (-1), and is updated with
the head record's timestamp
+ * if it is smaller (i.e. it should be monotonically increasing); when the partition's buffer
becomes empty and there is
+ * no head record, the partition's timestamp will not be updated any more.
  */
 public class PartitionGroup {
 
     private final Map<TopicPartition, RecordQueue> partitionQueues;
-
     private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
+
     private long streamTime;
     private int totalBuffered;
+    private boolean allBuffered;
+
 
     public static class RecordInfo {
         RecordQueue queue;
@@ -53,11 +61,11 @@ public class PartitionGroup {
         }
     }
 
-
     PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) {
         nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp));
         this.partitionQueues = partitionQueues;
         totalBuffered = 0;
+        allBuffered = false;
         streamTime = RecordQueue.NOT_KNOWN;
     }
 
@@ -79,17 +87,16 @@ public class PartitionGroup {
             if (record != null) {
                 --totalBuffered;
 
-                if (!queue.isEmpty()) {
+                if (queue.isEmpty()) {
+                    // if a certain queue has been drained, reset the flag
+                    allBuffered = false;
+                } else {
                     nonEmptyQueuesByTime.offer(queue);
                 }
 
-                // Since this was previously a queue with min timestamp,
-                // streamTime could only advance if this queue's time did.
-                if (queue.timestamp() > streamTime) {
-                    computeStreamTime();
-                }
+                // always update the stream time to the record's timestamp yet to be processed
if it is larger
+                streamTime = Math.max(streamTime, record.timestamp);
             }
-
         }
 
         return record;
@@ -106,17 +113,18 @@ public class PartitionGroup {
         final RecordQueue recordQueue = partitionQueues.get(partition);
 
         final int oldSize = recordQueue.size();
-        final long oldTimestamp = recordQueue.timestamp();
         final int newSize = recordQueue.addRawRecords(rawRecords);
 
         // add this record queue to be considered for processing in the future if it was
empty before
         if (oldSize == 0 && newSize > 0) {
             nonEmptyQueuesByTime.offer(recordQueue);
-        }
 
-        // Adding to this queue could only advance streamTime if it was previously the queue
with min timestamp (= streamTime)
-        if (oldTimestamp <= streamTime && recordQueue.timestamp() > streamTime)
{
-            computeStreamTime();
+            // if all partitions now are non-empty, set the flag
+            // we do not need to update the stream time here since this task will definitely
be
+            // processed next, and hence the stream time will be updated when we retrieved
records by then
+            if (nonEmptyQueuesByTime.size() == this.partitionQueues.size()) {
+                allBuffered = true;
+            }
         }
 
         totalBuffered += newSize - oldSize;
@@ -136,18 +144,6 @@ public class PartitionGroup {
         return streamTime;
     }
 
-    private void computeStreamTime() {
-        // we should always return the smallest timestamp of all partitions
-        // to avoid group partition time goes backward
-        long timestamp = Long.MAX_VALUE;
-        for (final RecordQueue queue : partitionQueues.values()) {
-            if (queue.timestamp() < timestamp) {
-                timestamp = queue.timestamp();
-            }
-        }
-        this.streamTime = timestamp;
-    }
-
     /**
      * @throws IllegalStateException if the record's partition does not belong to this partition
group
      */
@@ -165,7 +161,12 @@ public class PartitionGroup {
         return totalBuffered;
     }
 
+    boolean allPartitionsBuffered() {
+        return allBuffered;
+    }
+
     public void close() {
+        clear();
         partitionQueues.clear();
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7f121fe..6f3b031 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -59,6 +59,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
     private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC,
-1, -1L, null, null);
 
+    private static final int WAIT_ON_PARTIAL_INPUT = 5;
+
     private final PartitionGroup partitionGroup;
     private final PartitionGroup.RecordInfo recordInfo;
     private final PunctuationQueue streamTimePunctuationQueue;
@@ -72,12 +74,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     private boolean commitRequested = false;
     private boolean commitOffsetNeeded = false;
     private boolean transactionInFlight = false;
+    private int waits = WAIT_ON_PARTIAL_INPUT;
     private final Time time;
     private final TaskMetrics taskMetrics;
 
     protected static final class TaskMetrics {
         final StreamsMetricsImpl metrics;
         final Sensor taskCommitTimeSensor;
+        final Sensor taskEnforcedProcessSensor;
         private final String taskName;
 
 
@@ -108,7 +112,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
             // add the operation metrics with additional tags
             final Map<String, String> tagMap = metrics.tagMap("task-id", taskName);
-            taskCommitTimeSensor = metrics.taskLevelSensor("commit", taskName, Sensor.RecordingLevel.DEBUG,
parent);
+            taskCommitTimeSensor = metrics.taskLevelSensor(taskName, "commit", Sensor.RecordingLevel.DEBUG,
parent);
             taskCommitTimeSensor.add(
                 new MetricName("commit-latency-avg", group, "The average latency of commit
operation.", tagMap),
                 new Avg()
@@ -125,6 +129,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                 new MetricName("commit-total", group, "The total number of occurrence of
commit operations.", tagMap),
                 new Count()
             );
+
+            // add the metrics for enforced processing
+            taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName, "enforced-process",
Sensor.RecordingLevel.DEBUG, parent);
+            taskEnforcedProcessSensor.add(
+                    new MetricName("enforced-process-rate", group, "The average number of
occurrence of enforced-process per second.", tagMap),
+                    new Rate(TimeUnit.SECONDS, new Count())
+            );
+            taskEnforcedProcessSensor.add(
+                    new MetricName("enforced-process-total", group, "The total number of
occurrence of enforced-process operations.", tagMap),
+                    new Count()
+            );
+
         }
 
         void removeAllSensors() {
@@ -264,6 +280,21 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     }
 
     /**
+     * An active task is processable if its buffer contains data for all of its input source
topic partitions
+     */
+    public boolean isProcessable() {
+        if (partitionGroup.allPartitionsBuffered()) {
+            return true;
+        } else if (partitionGroup.numBuffered() > 0 && --waits < 0) {
+            taskMetrics.taskEnforcedProcessSensor.record();
+            waits = WAIT_ON_PARTIAL_INPUT;
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
      * Process one record.
      *
      * @return true if this method processes a record, false if it does not process a record.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 428aa1d..42f55ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -831,18 +831,21 @@ public class StreamThread extends Thread {
             }
         }
 
-        if (records != null && !records.isEmpty() && taskManager.hasActiveRunningTasks())
{
+        if (records != null && !records.isEmpty()) {
             streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
             addRecordsToTasks(records);
+        }
+
+        if (taskManager.hasActiveRunningTasks()) {
             final long totalProcessed = processAndMaybeCommit(recordsProcessedBeforeCommit);
             if (totalProcessed > 0) {
                 final long processLatency = computeLatency();
                 streamsMetrics.processTimeSensor.record(processLatency / (double) totalProcessed,
timerStartedMs);
                 processedBeforeCommit = adjustRecordsProcessedBeforeCommit(
-                    recordsProcessedBeforeCommit,
-                    totalProcessed,
-                    processLatency,
-                    commitTimeMs);
+                        recordsProcessedBeforeCommit,
+                        totalProcessed,
+                        processLatency,
+                        commitTimeMs);
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 662ded5..e99a5b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -88,13 +88,13 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     }
 
     public final Sensor taskLevelSensor(final String taskName,
-                                         final String sensorName,
-                                         final Sensor.RecordingLevel recordingLevel,
-                                         final Sensor... parents) {
+                                        final String sensorName,
+                                        final Sensor.RecordingLevel recordingLevel,
+                                        final Sensor... parents) {
         final String key = threadName + "." + taskName;
         synchronized (taskLevelSensors) {
             if (!taskLevelSensors.containsKey(key)) {
-                taskLevelSensors.put(key, new LinkedList<String>());
+                taskLevelSensors.put(key, new LinkedList<>());
             }
 
             final String fullSensorName = key + "." + sensorName;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 77b9c1e..12b4cf3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -370,7 +370,7 @@ class NamedCache {
                 "record-cache-id", "all",
                 "task-id", taskName
             );
-            final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor("hitRatio", taskName,
Sensor.RecordingLevel.DEBUG);
+            final Sensor taskLevelHitRatioSensor = metrics.taskLevelSensor(taskName, "hitRatio",
Sensor.RecordingLevel.DEBUG);
             taskLevelHitRatioSensor.add(
                 new MetricName("hitRatio-avg", group, "The average cache hit ratio.", allMetricTags),
                 new Avg()
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 749d748..1a78ed3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -50,7 +50,9 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -350,6 +352,45 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
+    public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(final
Properties consumerConfig,
+                                                                                    final
String topic,
+                                                                                    final
List<KeyValue<K, V>> expectedRecords) throws InterruptedException {
+        return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords,
DEFAULT_TIMEOUT);
+    }
+
+    public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(final
Properties consumerConfig,
+                                                                                    final
String topic,
+                                                                                    final
List<KeyValue<K, V>> expectedRecords,
+                                                                                    final
long waitTime) throws InterruptedException {
+        final List<KeyValue<K, V>> accumData = new ArrayList<>();
+        try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
+            final TestCondition valuesRead = new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final List<KeyValue<K, V>> readData =
+                            readKeyValues(topic, consumer, waitTime, expectedRecords.size());
+                    accumData.addAll(readData);
+
+                    final Map<K, V> finalData = new HashMap<>();
+
+                    for (final KeyValue<K, V> keyValue : accumData) {
+                        finalData.put(keyValue.key, keyValue.value);
+                    }
+
+                    for (final KeyValue<K, V> keyValue : expectedRecords) {
+                        if (!keyValue.value.equals(finalData.get(keyValue.key)))
+                            return false;
+                    }
+
+                    return true;
+                }
+            };
+            final String conditionDetails = "Did not receive all " + expectedRecords + "
records from topic " + topic;
+            TestUtils.waitForCondition(valuesRead, waitTime, conditionDetails);
+        }
+        return accumData;
+    }
+
     public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final
Properties consumerConfig,
                                                                                 final String
topic,
                                                                                 final int
expectedNumRecords,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 8a8d625..7efe653 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -337,6 +337,7 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
         mockTaskInitialization();
+        EasyMock.expect(t1.isProcessable()).andReturn(true);
         t1.process();
         EasyMock.expectLastCall().andThrow(new TaskMigratedException());
         t1.close(false, true);
@@ -354,6 +355,32 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
+    public void shouldNotProcessUnprocessableTasks() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.isProcessable()).andReturn(false);
+        EasyMock.replay(t1);
+        addAndInitTask();
+
+        assertThat(assignedTasks.process(), equalTo(0));
+
+        EasyMock.verify(t1);
+    }
+
+    @Test
+    public void shouldAlwaysProcessProcessableTasks() {
+        mockTaskInitialization();
+        EasyMock.expect(t1.isProcessable()).andReturn(true);
+        EasyMock.expect(t1.process()).andReturn(true).once();
+        EasyMock.replay(t1);
+
+        addAndInitTask();
+
+        assertThat(assignedTasks.process(), equalTo(1));
+
+        EasyMock.verify(t1);
+    }
+
+    @Test
     public void shouldPunctuateRunningTasks() {
         mockTaskInitialization();
         EasyMock.expect(t1.maybePunctuateStreamTime()).andReturn(true);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index b3123e4..2df4f66 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -88,12 +88,12 @@ public class PartitionGroupTest {
         group.addRawRecords(partition2, list2);
         // 1:[1, 3, 5]
         // 2:[2, 4, 6]
-        // st: 1
+        // st: -1 since no records was being processed yet
 
         assertEquals(6, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(3, group.numBuffered(partition2));
-        assertEquals(1L, group.timestamp());
+        assertEquals(-1L, group.timestamp());
 
         StampedRecord record;
         final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
@@ -108,7 +108,7 @@ public class PartitionGroupTest {
         assertEquals(5, group.numBuffered());
         assertEquals(2, group.numBuffered(partition1));
         assertEquals(3, group.numBuffered(partition2));
-        assertEquals(2L, group.timestamp());
+        assertEquals(1L, group.timestamp());
 
         // get one record, now the time should be advanced
         record = group.nextRecord(info);
@@ -120,7 +120,7 @@ public class PartitionGroupTest {
         assertEquals(4, group.numBuffered());
         assertEquals(2, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
+        assertEquals(2L, group.timestamp());
 
         // add 2 more records with timestamp 2, 4 to partition-1
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -134,7 +134,7 @@ public class PartitionGroupTest {
         assertEquals(6, group.numBuffered());
         assertEquals(4, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
-        assertEquals(3L, group.timestamp());
+        assertEquals(2L, group.timestamp());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
@@ -146,7 +146,7 @@ public class PartitionGroupTest {
         assertEquals(5, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
-        assertEquals(4L, group.timestamp());
+        assertEquals(3L, group.timestamp());
 
         // get one record, time should not be advanced
         record = group.nextRecord(info);
@@ -158,7 +158,7 @@ public class PartitionGroupTest {
         assertEquals(4, group.numBuffered());
         assertEquals(3, group.numBuffered(partition1));
         assertEquals(1, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        assertEquals(4L, group.timestamp());
 
         // get one more record, now time should be advanced
         record = group.nextRecord(info);
@@ -206,7 +206,7 @@ public class PartitionGroupTest {
         assertEquals(0, group.numBuffered());
         assertEquals(0, group.numBuffered(partition1));
         assertEquals(0, group.numBuffered(partition2));
-        assertEquals(5L, group.timestamp());
+        assertEquals(6L, group.timestamp());
 
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index bfbb2a0..146bcb3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -57,6 +57,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -111,7 +112,7 @@ public class StreamTaskTest {
 
     private final ProcessorTopology topology = ProcessorTopology.withSources(
         Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, processorSystemTime),
-        mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2))
+        mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2))
     );
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
@@ -309,97 +310,6 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testMaybePunctuateStreamTime() {
-        task = createStatelessTask(createConfig(false));
-        task.initializeStateStores();
-        task.initializeTopology();
-
-        task.addRecords(partition1, Arrays.asList(
-            getConsumerRecord(partition1, 0),
-            getConsumerRecord(partition1, 20),
-            getConsumerRecord(partition1, 32),
-            getConsumerRecord(partition1, 40),
-            getConsumerRecord(partition1, 60)
-        ));
-
-        task.addRecords(partition2, Arrays.asList(
-            getConsumerRecord(partition2, 25),
-            getConsumerRecord(partition2, 35),
-            getConsumerRecord(partition2, 45),
-            getConsumerRecord(partition2, 61)
-        ));
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(8, task.numBuffered());
-        assertEquals(1, source1.numReceived);
-        assertEquals(0, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(7, task.numBuffered());
-        assertEquals(2, source1.numReceived);
-        assertEquals(0, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(6, task.numBuffered());
-        assertEquals(2, source1.numReceived);
-        assertEquals(1, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(5, task.numBuffered());
-        assertEquals(3, source1.numReceived);
-        assertEquals(1, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(4, task.numBuffered());
-        assertEquals(3, source1.numReceived);
-        assertEquals(2, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(3, task.numBuffered());
-        assertEquals(4, source1.numReceived);
-        assertEquals(2, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(2, task.numBuffered());
-        assertEquals(4, source1.numReceived);
-        assertEquals(3, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(1, task.numBuffered());
-        assertEquals(5, source1.numReceived);
-        assertEquals(3, source2.numReceived);
-
-        assertFalse(task.maybePunctuateStreamTime());
-
-        assertTrue(task.process());
-        assertEquals(0, task.numBuffered());
-        assertEquals(5, source1.numReceived);
-        assertEquals(4, source2.numReceived);
-
-        assertFalse(task.process());
-        assertFalse(task.maybePunctuateStreamTime());
-
-        processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
0L, 20L, 32L, 40L, 60L);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
     public void shouldPunctuateOnceStreamTimeAfterGap() {
         task = createStatelessTask(createConfig(false));
         task.initializeStateStores();
@@ -419,64 +329,67 @@ public class StreamTaskTest {
             getConsumerRecord(partition2, 161)
         ));
 
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 20
+        // st: -1
+        assertFalse(task.maybePunctuateStreamTime()); // punctuate at 20
 
+        // st: 20
         assertTrue(task.process());
         assertEquals(7, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(0, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 25
         assertTrue(task.process());
         assertEquals(6, task.numBuffered());
         assertEquals(1, source1.numReceived);
         assertEquals(1, source2.numReceived);
-
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 142
-
-        // only one punctuation after 100ms gap
         assertFalse(task.maybePunctuateStreamTime());
 
+        // st: 142
+        // punctuate at 142
         assertTrue(task.process());
         assertEquals(5, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(1, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 145
+        // only one punctuation after 100ms gap
         assertTrue(task.process());
         assertEquals(4, task.numBuffered());
         assertEquals(2, source1.numReceived);
         assertEquals(2, source2.numReceived);
+        assertFalse(task.maybePunctuateStreamTime());
 
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 155
-
+        // st: 155
+        // punctuate at 155
         assertTrue(task.process());
         assertEquals(3, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(2, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 159
         assertTrue(task.process());
         assertEquals(2, task.numBuffered());
         assertEquals(3, source1.numReceived);
         assertEquals(3, source2.numReceived);
+        assertFalse(task.maybePunctuateStreamTime());
 
-        assertTrue(task.maybePunctuateStreamTime()); // punctuate at 160, still aligned on
the initial punctuation
-
+        // st: 160, aligned at 0
         assertTrue(task.process());
         assertEquals(1, task.numBuffered());
         assertEquals(4, source1.numReceived);
         assertEquals(3, source2.numReceived);
+        assertTrue(task.maybePunctuateStreamTime());
 
-        assertFalse(task.maybePunctuateStreamTime());
-
+        // st: 161
         assertTrue(task.process());
         assertEquals(0, task.numBuffered());
         assertEquals(4, source1.numReceived);
         assertEquals(4, source2.numReceived);
+        assertFalse(task.maybePunctuateStreamTime());
 
         assertFalse(task.process());
         assertFalse(task.maybePunctuateStreamTime());
@@ -484,9 +397,8 @@ public class StreamTaskTest {
         processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME,
20L, 142L, 155L, 160L);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testCancelPunctuateStreamTime() {
+    public void shouldRespectPunctuateCancellationStreamTime() {
         task = createStatelessTask(createConfig(false));
         task.initializeStateStores();
         task.initializeTopology();
@@ -503,12 +415,19 @@ public class StreamTaskTest {
             getConsumerRecord(partition2, 45)
         ));
 
+        assertFalse(task.maybePunctuateStreamTime());
+
+        // st is now 20
+        assertTrue(task.process());
+
         assertTrue(task.maybePunctuateStreamTime());
 
+        // st is now 25
         assertTrue(task.process());
 
         assertFalse(task.maybePunctuateStreamTime());
 
+        // st is now 30
         assertTrue(task.process());
 
         processorStreamTime.mockProcessor.scheduleCancellable.cancel();
@@ -519,6 +438,61 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldRespectPunctuateCancellationSystemTime() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+        final long now = time.milliseconds();
+        time.sleep(10);
+        assertTrue(task.maybePunctuateSystemTime());
+        processorSystemTime.mockProcessor.scheduleCancellable.cancel();
+        time.sleep(10);
+        assertFalse(task.maybePunctuateSystemTime());
+        processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 10);
+    }
+
+    @Test
+    public void shouldBeProcessableIfAllPartitionsBuffered() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        assertFalse(task.isProcessable());
+
+        final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
+
+        task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1,
1, 0, bytes, bytes)));
+
+        assertFalse(task.isProcessable());
+
+        task.addRecords(partition2, Collections.singleton(new ConsumerRecord<>(topic2,
1, 0, bytes, bytes)));
+
+        assertTrue(task.isProcessable());
+    }
+
+    @Test
+    public void shouldBeProcessableIfWaitedForTooLong() {
+        task = createStatelessTask(createConfig(false));
+        task.initializeStateStores();
+        task.initializeTopology();
+
+        assertFalse(task.isProcessable());
+
+        final byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
+
+        task.addRecords(partition1, Collections.singleton(new ConsumerRecord<>(topic1,
1, 0, bytes, bytes)));
+
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+        assertFalse(task.isProcessable());
+
+        assertTrue(task.isProcessable());
+    }
+
+
+    @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
         task = createStatelessTask(createConfig(false));
         task.initializeStateStores();
@@ -576,20 +550,6 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void testCancelPunctuateSystemTime() {
-        task = createStatelessTask(createConfig(false));
-        task.initializeStateStores();
-        task.initializeTopology();
-        final long now = time.milliseconds();
-        time.sleep(10);
-        assertTrue(task.maybePunctuateSystemTime());
-        processorSystemTime.mockProcessor.scheduleCancellable.cancel();
-        time.sleep(10);
-        assertFalse(task.maybePunctuateSystemTime());
-        processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME,
now + 10);
-    }
-
-    @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
         task = createTaskThatThrowsException();
         task.initializeStateStores();
@@ -1110,7 +1070,7 @@ public class StreamTaskTest {
     private StreamTask createStatelessTask(final StreamsConfig streamsConfig) {
         final ProcessorTopology topology = ProcessorTopology.withSources(
             Utils.<ProcessorNode>mkList(source1, source2, processorStreamTime, processorSystemTime),
-            mkMap(mkEntry(topic1, (SourceNode) source1), mkEntry(topic2, (SourceNode) source2))
+            mkMap(mkEntry(topic1, source1), mkEntry(topic2, source2))
         );
 
         source1.addChild(processorStreamTime);
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
index 32ad793..cf87eb5 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala
@@ -24,11 +24,7 @@ import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.utils.MockTime
 import org.apache.kafka.streams._
 import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
-import org.apache.kafka.streams.processor.internals.StreamThread
-import org.apache.kafka.streams.scala.ImplicitConversions._
-import org.apache.kafka.streams.scala.kstream._
 import org.apache.kafka.test.TestUtils
-import org.junit.Assert._
 import org.junit._
 import org.junit.rules.TemporaryFolder
 import org.scalatest.junit.JUnitSuite
@@ -129,9 +125,9 @@ class StreamToTableJoinScalaIntegrationTestBase extends JUnitSuite with
StreamTo
       // consume and verify result
       val consumerConfig = getConsumerConfig()
 
-      IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
-                                                               outputTopic,
-                                                               expectedClicksPerRegion.size)
+      IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig,
+                                                                 outputTopic,
+                                                                 expectedClicksPerRegion.asJava)
     } else {
       java.util.Collections.emptyList()
     }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index e5253f9..3d1bab5 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -82,9 +82,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
     val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
       produceNConsume(userClicksTopic, userRegionsTopic, outputTopic)
     streams.close()
-
-    import collection.JavaConverters._
-    assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key))
   }
 
   @Test def testShouldCountClicksPerRegionJava(): Unit = {
@@ -149,6 +146,5 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
       produceNConsume(userClicksTopicJ, userRegionsTopicJ, outputTopicJ)
 
     streams.close()
-    assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key))
   }
 }


Mime
View raw message