This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 954be11 KAFKA-6978: make window retention time strict (#5218)
954be11 is described below
commit 954be11bf2d3dc9fa11a69830d2ef5ff580ff533
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Fri Jun 22 19:08:00 2018 -0500
KAFKA-6978: make window retention time strict (#5218)
Enforce window retention times strictly:
* records for windows that are expired get dropped
* queries for timestamps old enough to be expired immediately answered with null
Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
.../kstream/internals/KStreamWindowAggregate.java | 39 ++++--
.../kstream/internals/KStreamWindowReduce.java | 129 ++------------------
.../internals/GlobalProcessorContextImpl.java | 2 +-
.../internals/InternalProcessorContext.java | 2 +-
.../internals/InternalTopologyBuilder.java | 108 ++++++-----------
.../processor/internals/PartitionGroup.java | 66 ++++++-----
.../processor/internals/ProcessorContextImpl.java | 11 +-
.../processor/internals/StandbyContextImpl.java | 2 +-
.../streams/processor/internals/StreamTask.java | 1 -
.../processor/internals/TimestampSupplier.java | 21 ++++
.../apache/kafka/streams/state/WindowStore.java | 2 +-
.../state/internals/RocksDBWindowStore.java | 2 +-
.../internals/KStreamWindowAggregateTest.java | 131 +++++++++++++++------
.../kstream/internals/KStreamWindowReduceTest.java | 59 ++++++++--
.../internals/AbstractProcessorContextTest.java | 2 +-
.../processor/internals/PartitionGroupTest.java | 39 +++++-
.../kafka/test/InternalMockProcessorContext.java | 16 +--
.../apache/kafka/test/NoOpProcessorContext.java | 4 +-
.../kafka/streams/TopologyTestDriverTest.java | 17 +--
19 files changed, 332 insertions(+), 321 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index a774762..4be1880 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.slf4j.Logger;
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
- private static final Logger LOG = LoggerFactory.getLogger(KStreamWindowAggregate.class);
+ private final Logger log = LoggerFactory.getLogger(getClass());
private final String storeName;
private final Windows<W> windows;
@@ -66,11 +67,14 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
private WindowStore<K, T> windowStore;
private TupleForwarder<Windowed<K>, T> tupleForwarder;
private StreamsMetricsImpl metrics;
+ private InternalProcessorContext internalProcessorContext;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
+ this.internalProcessorContext = (InternalProcessorContext) context;
+
metrics = (StreamsMetricsImpl) context.metrics();
windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
@@ -82,7 +86,7 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
// if the key is null, we do not need proceed aggregating the record
// the record with the table
if (key == null) {
- LOG.warn(
+ log.warn(
"Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
value, context().topic(), context().partition(), context().offset()
);
@@ -92,21 +96,32 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
// first get the matching windows
final long timestamp = context().timestamp();
+ final long expiryTime = internalProcessorContext.streamTime() - windows.maintainMs();
+
final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
// try update the window, and create the new window for the rest of unmatched window that do not exist yet
for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
- T oldAgg = windowStore.fetch(key, entry.getKey());
-
- if (oldAgg == null) {
- oldAgg = initializer.apply();
+ final Long windowStart = entry.getKey();
+ if (windowStart > expiryTime) {
+ T oldAgg = windowStore.fetch(key, windowStart);
+
+ if (oldAgg == null) {
+ oldAgg = initializer.apply();
+ }
+
+ final T newAgg = aggregator.apply(key, value, oldAgg);
+
+ // update the store with the new value
+ windowStore.put(key, newAgg, windowStart);
+ tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
+ } else {
+ log.warn(
+ "Skipping record for expired window. key=[{}] topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window=[{}] expiration=[{}]",
+ key, context().topic(), context().partition(), context().offset(), context().timestamp(), windowStart, expiryTime
+ );
+ metrics.skippedRecordsSensor().record();
}
-
- final T newAgg = aggregator.apply(key, value, oldAgg);
-
- // update the store with the new value
- windowStore.put(key, newAgg, entry.getKey());
- tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 9db861d..babe3eb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -18,132 +18,17 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.WindowStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, V> {
- private static final Logger LOG = LoggerFactory.getLogger(KStreamWindowReduce.class);
-
- private final String storeName;
- private final Windows<W> windows;
- private final Reducer<V> reducer;
-
- private boolean sendOldValues = false;
+class KStreamWindowReduce<K, V, W extends Window> extends KStreamWindowAggregate<K, V, V, W> {
KStreamWindowReduce(final Windows<W> windows,
final String storeName,
final Reducer<V> reducer) {
- this.windows = windows;
- this.storeName = storeName;
- this.reducer = reducer;
- }
-
- @Override
- public Processor<K, V> get() {
- return new KStreamWindowReduceProcessor();
- }
-
- @Override
- public void enableSendingOldValues() {
- sendOldValues = true;
- }
-
- private class KStreamWindowReduceProcessor extends AbstractProcessor<K, V> {
-
- private WindowStore<K, V> windowStore;
- private TupleForwarder<Windowed<K>, V> tupleForwarder;
- private StreamsMetricsImpl metrics;
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(final ProcessorContext context) {
- super.init(context);
- metrics = (StreamsMetricsImpl) context.metrics();
- windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
- tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
- }
-
- @Override
- public void process(final K key, final V value) {
- // if the key is null, we do not need proceed aggregating
- // the record with the table
- if (key == null) {
- LOG.warn(
- "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
- value, context().topic(), context().partition(), context().offset()
- );
- metrics.skippedRecordsSensor().record();
- return;
- }
-
- // first get the matching windows
- final long timestamp = context().timestamp();
- final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
-
- // try update the window, and create the new window for the rest of unmatched window that do not exist yet
- for (final Map.Entry<Long, W> entry : matchedWindows.entrySet()) {
- final V oldAgg = windowStore.fetch(key, entry.getKey());
-
- final V newAgg;
- if (oldAgg == null) {
- newAgg = value;
- } else {
- newAgg = reducer.apply(oldAgg, value);
- }
-
- // update the store with the new value
- windowStore.put(key, newAgg, entry.getKey());
- tupleForwarder.maybeForward(new Windowed<>(key, entry.getValue()), newAgg, oldAgg);
- }
- }
- }
-
- @Override
- public KTableValueGetterSupplier<Windowed<K>, V> view() {
-
- return new KTableValueGetterSupplier<Windowed<K>, V>() {
-
- public KTableValueGetter<Windowed<K>, V> get() {
- return new KStreamWindowReduceValueGetter();
- }
-
- @Override
- public String[] storeNames() {
- return new String[]{storeName};
- }
- };
- }
-
- private class KStreamWindowReduceValueGetter implements KTableValueGetter<Windowed<K>, V> {
-
- private WindowStore<K, V> windowStore;
-
- @SuppressWarnings("unchecked")
- @Override
- public void init(final ProcessorContext context) {
- windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public V get(final Windowed<K> windowedKey) {
- final K key = windowedKey.key();
- final W window = (W) windowedKey.window();
-
- return windowStore.fetch(key, window.start());
- }
-
- @Override
- public void close() {
- }
+ super(
+ windows,
+ storeName,
+ () -> null,
+ (key, newValue, oldValue) -> oldValue == null ? newValue : reducer.apply(oldValue, newValue)
+ );
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index c469be9..30599e2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -97,7 +97,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
}
@Override
- public Long streamTime() {
+ public long streamTime() {
throw new RuntimeException("Stream time is not implemented for the global processor context.");
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index cfc1970..6db7a3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -64,5 +64,5 @@ public interface InternalProcessorContext extends ProcessorContext {
*/
void uninitialize();
- Long streamTime();
+ long streamTime();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 5b4b4d73..ed51754 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -123,83 +123,51 @@ public class InternalTopologyBuilder {
private Map<Integer, Set<String>> nodeGroups = null;
// TODO: this is only temporary for 2.0 and should be removed
- public final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>();
+ private final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>();
- public interface StateStoreFactory {
- Set<String> users();
- boolean loggingEnabled();
- StateStore build();
- String name();
- boolean isWindowStore();
- Map<String, String> logConfig();
- long retentionPeriod();
- }
-
- private static abstract class AbstractStateStoreFactory implements StateStoreFactory {
+ public static class StateStoreFactory {
+ private final StoreBuilder builder;
private final Set<String> users = new HashSet<>();
- private final String name;
- private final boolean loggingEnabled;
- private final boolean windowStore;
- private final Map<String, String> logConfig;
-
- AbstractStateStoreFactory(final String name,
- final boolean loggingEnabled,
- final boolean windowStore,
- final Map<String, String> logConfig) {
- this.name = name;
- this.loggingEnabled = loggingEnabled;
- this.windowStore = windowStore;
- this.logConfig = logConfig;
- }
- @Override
- public Set<String> users() {
- return users;
+ private StateStoreFactory(final StoreBuilder<?> builder) {
+ this.builder = builder;
}
- @Override
- public boolean loggingEnabled() {
- return loggingEnabled;
+ public StateStore build() {
+ return builder.build();
}
- @Override
- public String name() {
- return name;
+ long retentionPeriod() {
+ if (isWindowStore()) {
+ return ((WindowStoreBuilder) builder).retentionPeriod();
+ } else {
+ throw new IllegalStateException("retentionPeriod is not supported when not a window store");
+ }
}
- @Override
- public boolean isWindowStore() {
- return windowStore;
+ private Set<String> users() {
+ return users;
}
- @Override
- public Map<String, String> logConfig() {
- return logConfig;
+ /** Visible for testing */
+ public boolean loggingEnabled() {
+ return builder.loggingEnabled();
}
- }
- private static class StoreBuilderFactory extends AbstractStateStoreFactory {
- private final StoreBuilder builder;
-
- StoreBuilderFactory(final StoreBuilder<?> builder) {
- super(builder.name(),
- builder.loggingEnabled(),
- builder instanceof WindowStoreBuilder,
- builder.logConfig());
- this.builder = builder;
+ private String name() {
+ return builder.name();
}
- @Override
- public StateStore build() {
- return builder.build();
+ private boolean isWindowStore() {
+ return builder instanceof WindowStoreBuilder;
}
- @Override
- public long retentionPeriod() {
- if (!isWindowStore()) {
- throw new IllegalStateException("retentionPeriod is not supported when not a window store");
- }
- return ((WindowStoreBuilder) builder).retentionPeriod();
+ // Apparently Java strips the generics from this method because we're using the raw type for builder,
+ // even though this method doesn't use builder's (missing) type parameter. Our usage seems obviously
+ // correct, though, hence the suppression.
+ @SuppressWarnings("unchecked")
+ private Map<String, String> logConfig() {
+ return builder.logConfig();
}
}
@@ -526,7 +494,7 @@ public class InternalTopologyBuilder {
throw new TopologyException("StateStore " + storeBuilder.name() + " is already added.");
}
- stateFactories.put(storeBuilder.name(), new StoreBuilderFactory(storeBuilder));
+ stateFactories.put(storeBuilder.name(), new StateStoreFactory(storeBuilder));
if (processorNames != null) {
for (final String processorName : processorNames) {
@@ -1124,12 +1092,12 @@ public class InternalTopologyBuilder {
private InternalTopicConfig createChangelogTopicConfig(final StateStoreFactory factory,
final String name) {
- if (!factory.isWindowStore()) {
- return new UnwindowedChangelogTopicConfig(name, factory.logConfig());
- } else {
+ if (factory.isWindowStore()) {
final WindowedChangelogTopicConfig config = new WindowedChangelogTopicConfig(name, factory.logConfig());
config.setRetentionMs(factory.retentionPeriod());
return config;
+ } else {
+ return new UnwindowedChangelogTopicConfig(name, factory.logConfig());
}
}
@@ -1744,18 +1712,18 @@ public class InternalTopologyBuilder {
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("Topologies:\n ");
- final TopologyDescription.Subtopology[] sortedSubtopologies =
+ final TopologyDescription.Subtopology[] sortedSubtopologies =
subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[subtopologies.size()]);
- final TopologyDescription.GlobalStore[] sortedGlobalStores =
+ final TopologyDescription.GlobalStore[] sortedGlobalStores =
globalStores.descendingSet().toArray(new TopologyDescription.GlobalStore[globalStores.size()]);
int expectedId = 0;
int subtopologiesIndex = sortedSubtopologies.length - 1;
int globalStoresIndex = sortedGlobalStores.length - 1;
while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
sb.append(" ");
- final TopologyDescription.Subtopology subtopology =
+ final TopologyDescription.Subtopology subtopology =
sortedSubtopologies[subtopologiesIndex];
- final TopologyDescription.GlobalStore globalStore =
+ final TopologyDescription.GlobalStore globalStore =
sortedGlobalStores[globalStoresIndex];
if (subtopology.id() == expectedId) {
sb.append(subtopology);
@@ -1767,14 +1735,14 @@ public class InternalTopologyBuilder {
expectedId++;
}
while (subtopologiesIndex != -1) {
- final TopologyDescription.Subtopology subtopology =
+ final TopologyDescription.Subtopology subtopology =
sortedSubtopologies[subtopologiesIndex];
sb.append(" ");
sb.append(subtopology);
subtopologiesIndex--;
}
while (globalStoresIndex != -1) {
- final TopologyDescription.GlobalStore globalStore =
+ final TopologyDescription.GlobalStore globalStore =
sortedGlobalStores[globalStoresIndex];
sb.append(" ");
sb.append(globalStore);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index dcaa755..c809da9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -33,7 +33,9 @@ public class PartitionGroup {
private final Map<TopicPartition, RecordQueue> partitionQueues;
- private final PriorityQueue<RecordQueue> queuesByTime;
+ private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
+ private long streamTime;
+ private int totalBuffered;
public static class RecordInfo {
RecordQueue queue;
@@ -51,30 +53,12 @@ public class PartitionGroup {
}
}
- // since task is thread-safe, we do not need to synchronize on local variables
- private int totalBuffered;
PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) {
- queuesByTime = new PriorityQueue<>(partitionQueues.size(), new Comparator<RecordQueue>() {
-
- @Override
- public int compare(final RecordQueue queue1, final RecordQueue queue2) {
- final long time1 = queue1.timestamp();
- final long time2 = queue2.timestamp();
-
- if (time1 < time2) {
- return -1;
- }
- if (time1 > time2) {
- return 1;
- }
- return 0;
- }
- });
-
+ nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::timestamp));
this.partitionQueues = partitionQueues;
-
totalBuffered = 0;
+ streamTime = -1;
}
/**
@@ -85,19 +69,27 @@ public class PartitionGroup {
StampedRecord nextRecord(final RecordInfo info) {
StampedRecord record = null;
- final RecordQueue queue = queuesByTime.poll();
+ final RecordQueue queue = nonEmptyQueuesByTime.poll();
+ info.queue = queue;
+
if (queue != null) {
// get the first record from this queue.
record = queue.poll();
- if (!queue.isEmpty()) {
- queuesByTime.offer(queue);
+ if (record != null) {
+ --totalBuffered;
+
+ if (!queue.isEmpty()) {
+ nonEmptyQueuesByTime.offer(queue);
+ }
+
+ // Since this was previously a queue with min timestamp,
+ // streamTime could only advance if this queue's time did.
+ if (queue.timestamp() > streamTime) {
+ computeStreamTime();
+ }
}
- }
- info.queue = queue;
- if (record != null) {
- --totalBuffered;
}
return record;
@@ -114,11 +106,17 @@ public class PartitionGroup {
final RecordQueue recordQueue = partitionQueues.get(partition);
final int oldSize = recordQueue.size();
+ final long oldTimestamp = recordQueue.timestamp();
final int newSize = recordQueue.addRawRecords(rawRecords);
// add this record queue to be considered for processing in the future if it was empty before
if (oldSize == 0 && newSize > 0) {
- queuesByTime.offer(recordQueue);
+ nonEmptyQueuesByTime.offer(recordQueue);
+ }
+
+ // Adding to this queue could only advance streamTime if it was previously the queue with min timestamp (= streamTime)
+ if (oldTimestamp <= streamTime && recordQueue.timestamp() > streamTime) {
+ computeStreamTime();
}
totalBuffered += newSize - oldSize;
@@ -135,15 +133,19 @@ public class PartitionGroup {
* partition timestamp among all its partitions
*/
public long timestamp() {
+ return streamTime;
+ }
+
+ private void computeStreamTime() {
// we should always return the smallest timestamp of all partitions
// to avoid group partition time goes backward
long timestamp = Long.MAX_VALUE;
for (final RecordQueue queue : partitionQueues.values()) {
- if (timestamp > queue.timestamp()) {
+ if (queue.timestamp() < timestamp) {
timestamp = queue.timestamp();
}
}
- return timestamp;
+ this.streamTime = timestamp;
}
/**
@@ -168,7 +170,7 @@ public class PartitionGroup {
}
public void clear() {
- queuesByTime.clear();
+ nonEmptyQueuesByTime.clear();
for (final RecordQueue queue : partitionQueues.values()) {
queue.clear();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 36a309c..beab35f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -28,13 +28,12 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.util.List;
-import java.util.function.Supplier;
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
private final StreamTask task;
private final RecordCollector collector;
- private Supplier<Long> streamTimeSupplier;
+ private TimestampSupplier streamTimeSupplier;
private final ToInternal toInternal = new ToInternal();
private final static To SEND_TO_ALL = To.all();
@@ -56,7 +55,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@Override
public RecordCollector recordCollector() {
- return this.collector;
+ return collector;
}
/**
@@ -155,13 +154,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
return task.schedule(interval, type, callback);
}
- void setStreamTimeSupplier(final Supplier<Long> streamTimeSupplier) {
+ void setStreamTimeSupplier(final TimestampSupplier streamTimeSupplier) {
this.streamTimeSupplier = streamTimeSupplier;
}
@Override
- public Long streamTime() {
- return this.streamTimeSupplier.get();
+ public long streamTime() {
+ return streamTimeSupplier.get();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 58c2e2c..acc6ad6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -217,7 +217,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
}
@Override
- public Long streamTime() {
+ public long streamTime() {
throw new RuntimeException("Stream time is not implemented for the standby context.");
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6993ef8..6af4c4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -635,7 +635,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
* @param partition the partition
* @param records the records
*/
- @SuppressWarnings("unchecked")
public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
final int newQueueSize = partitionGroup.addRawRecords(partition, records);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java
new file mode 100644
index 0000000..a6a7a42
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+public interface TimestampSupplier {
+ long get();
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index d2f6ad8..ee4c19a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.StateStore;
public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> {
/**
- * Put a key-value pair with the current wall-clock time as the timestamp
+ * Put a key-value pair with the current record time as the timestamp
* into the corresponding window
* @param key The key to associate the value to
* @param value The value to update, it can be null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index a9af36d..4c0e01f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -102,7 +102,7 @@ public class RocksDBWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.all();
return new WindowStoreIteratorWrapper<>(bytesIterator, serdes, windowSize).keyValueIterator();
}
-
+
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetchAll(timeFrom, timeTo);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index d134bd1..dd2cf05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -16,23 +16,26 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessor;
@@ -44,7 +47,10 @@ import java.util.List;
import java.util.Properties;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@@ -134,12 +140,7 @@ public class KStreamWindowAggregateTest {
table2.toStream().process(supplier);
- table1.join(table2, new ValueJoiner<String, String, String>() {
- @Override
- public String apply(final String p1, final String p2) {
- return p1 + "%" + p2;
- }
- }).toStream().process(supplier);
+ table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
driver.pipeInput(recordFactory.create(topic1, "A", "1", 0L));
@@ -151,11 +152,11 @@ public class KStreamWindowAggregateTest {
final List<MockProcessor<Windowed<String>, String>> processors = supplier.capturedProcessors(3);
processors.get(0).checkAndClearProcessResult(
- "[A@0/10]:0+1",
- "[B@0/10]:0+2",
- "[C@0/10]:0+3",
- "[D@0/10]:0+4",
- "[A@0/10]:0+1+1"
+ "[A@0/10]:0+1",
+ "[B@0/10]:0+2",
+ "[C@0/10]:0+3",
+ "[D@0/10]:0+4",
+ "[A@0/10]:0+1+1"
);
processors.get(1).checkAndClearProcessResult();
processors.get(2).checkAndClearProcessResult();
@@ -167,11 +168,11 @@ public class KStreamWindowAggregateTest {
driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L));
processors.get(0).checkAndClearProcessResult(
- "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
- "[B@0/10]:0+2+2", "[B@5/15]:0+2",
- "[D@0/10]:0+4+4", "[D@5/15]:0+4",
- "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
- "[C@0/10]:0+3+3", "[C@5/15]:0+3"
+ "[A@0/10]:0+1+1+1", "[A@5/15]:0+1",
+ "[B@0/10]:0+2+2", "[B@5/15]:0+2",
+ "[D@0/10]:0+4+4", "[D@5/15]:0+4",
+ "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2",
+ "[C@0/10]:0+3+3", "[C@5/15]:0+3"
);
processors.get(1).checkAndClearProcessResult();
processors.get(2).checkAndClearProcessResult();
@@ -184,18 +185,18 @@ public class KStreamWindowAggregateTest {
processors.get(0).checkAndClearProcessResult();
processors.get(1).checkAndClearProcessResult(
- "[A@0/10]:0+a",
- "[B@0/10]:0+b",
- "[C@0/10]:0+c",
- "[D@0/10]:0+d",
- "[A@0/10]:0+a+a"
+ "[A@0/10]:0+a",
+ "[B@0/10]:0+b",
+ "[C@0/10]:0+c",
+ "[D@0/10]:0+d",
+ "[A@0/10]:0+a+a"
);
processors.get(2).checkAndClearProcessResult(
- "[A@0/10]:0+1+1+1%0+a",
- "[B@0/10]:0+2+2+2%0+b",
- "[C@0/10]:0+3+3%0+c",
- "[D@0/10]:0+4+4%0+d",
- "[A@0/10]:0+1+1+1%0+a+a");
+ "[A@0/10]:0+1+1+1%0+a",
+ "[B@0/10]:0+2+2+2%0+b",
+ "[C@0/10]:0+3+3%0+c",
+ "[D@0/10]:0+4+4%0+d",
+ "[A@0/10]:0+1+1+1%0+a+a");
driver.pipeInput(recordFactory.create(topic2, "A", "a", 5L));
driver.pipeInput(recordFactory.create(topic2, "B", "b", 6L));
@@ -205,18 +206,18 @@ public class KStreamWindowAggregateTest {
processors.get(0).checkAndClearProcessResult();
processors.get(1).checkAndClearProcessResult(
- "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
- "[B@0/10]:0+b+b", "[B@5/15]:0+b",
- "[D@0/10]:0+d+d", "[D@5/15]:0+d",
- "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
- "[C@0/10]:0+c+c", "[C@5/15]:0+c"
+ "[A@0/10]:0+a+a+a", "[A@5/15]:0+a",
+ "[B@0/10]:0+b+b", "[B@5/15]:0+b",
+ "[D@0/10]:0+d+d", "[D@5/15]:0+d",
+ "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b",
+ "[C@0/10]:0+c+c", "[C@5/15]:0+c"
);
processors.get(2).checkAndClearProcessResult(
- "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
- "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
- "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
- "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
- "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
+ "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a",
+ "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b",
+ "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d",
+ "[B@0/10]:0+2+2+2%0+b+b+b", "[B@5/15]:0+2+2%0+b+b",
+ "[C@0/10]:0+3+3%0+c+c", "[C@5/15]:0+3%0+c"
);
}
}
@@ -231,7 +232,7 @@ public class KStreamWindowAggregateTest {
.windowedBy(TimeWindows.of(10).advanceBy(5))
.aggregate(
MockInitializer.STRING_INIT,
- MockAggregator.<String, String>toStringInstance("+"),
+ MockAggregator.toStringInstance("+"),
Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String())
);
@@ -244,4 +245,56 @@ public class KStreamWindowAggregateTest {
assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[0] offset=[0]"));
}
}
+
+ @Test
+ public void shouldLogAndMeterWhenSkippingExpiredWindow() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final String topic = "topic";
+
+ final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
+ stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .windowedBy(TimeWindows.of(10).advanceBy(5).until(100))
+ .aggregate(
+ () -> "",
+ MockAggregator.toStringInstance("+"),
+ Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+ )
+ .toStream()
+ .map((key, value) -> new KeyValue<>(key.toString(), value))
+ .to("output");
+
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) {
+ driver.pipeInput(recordFactory.create(topic, "k", "100", 100L));
+ driver.pipeInput(recordFactory.create(topic, "k", "0", 0L));
+ driver.pipeInput(recordFactory.create(topic, "k", "1", 1L));
+ driver.pipeInput(recordFactory.create(topic, "k", "2", 2L));
+ driver.pipeInput(recordFactory.create(topic, "k", "3", 3L));
+ driver.pipeInput(recordFactory.create(topic, "k", "4", 4L));
+ driver.pipeInput(recordFactory.create(topic, "k", "5", 5L));
+ driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
+ LogCaptureAppender.unregister(appender);
+
+ assertThat(getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue(), equalTo(7.0));
+ assertThat(appender.getMessages(), hasItems(
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0] expiration=[0]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0] expiration=[0]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0] expiration=[0]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0] expiration=[0]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0] expiration=[0]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0] expiration=[0]",
+ "Skipping record for expired window. key=[k] topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0] expiration=[0]"
+ ));
+
+ OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@95/105]", "+100", 100);
+ OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/110]", "+100", 100);
+ OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5", 5);
+ OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/15]", "+5+6", 6);
+ assertThat(driver.readOutput("output"), nullValue());
+ }
+ }
+
+ private ProducerRecord<String, String> getOutput(final TopologyTestDriver driver) {
+ return driver.readOutput("output", new StringDeserializer(), new StringDeserializer());
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index 4ae2f76..4cf9324 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -16,23 +16,29 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
import java.util.Properties;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@@ -49,12 +55,7 @@ public class KStreamWindowReduceTest {
.stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(500L))
- .reduce(new Reducer<String>() {
- @Override
- public String apply(final String value1, final String value2) {
- return value1 + "+" + value2;
- }
- });
+ .reduce((value1, value2) -> value1 + "+" + value2);
try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
@@ -66,4 +67,48 @@ public class KStreamWindowReduceTest {
assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[0] offset=[0]"));
}
}
+
+ @Test
+ public void shouldLogAndMeterOnExpiredEvent() {
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder
+ .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .windowedBy(TimeWindows.of(5L).until(100))
+ .reduce((value1, value2) -> value1 + "+" + value2)
+ .toStream()
+ .map((key, value) -> new KeyValue<>(key.toString(), value))
+ .to("output");
+
+
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+ driver.pipeInput(recordFactory.create("TOPIC", "k", "100", 100L));
+ driver.pipeInput(recordFactory.create("TOPIC", "k", "0", 0L));
+ driver.pipeInput(recordFactory.create("TOPIC", "k", "1", 1L));
+ driver.pipeInput(recordFactory.create("TOPIC", "k", "2", 2L));
+ driver.pipeInput(recordFactory.create("TOPIC", "k", "3", 3L));
+ driver.pipeInput(recordFactory.create("TOPIC", "k", "4", 4L));
+ driver.pipeInput(recordFactory.create("TOPIC", "k", "5", 5L));
+ LogCaptureAppender.unregister(appender);
+
+ assertThat(getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue(), equalTo(5.0));
+ assertThat(appender.getMessages(), hasItems(
+ "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[1] timestamp=[0] window=[0] expiration=[0]",
+ "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[2] timestamp=[1] window=[0] expiration=[0]",
+ "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[3] timestamp=[2] window=[0] expiration=[0]",
+ "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[4] timestamp=[3] window=[0] expiration=[0]",
+ "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[5] timestamp=[4] window=[0] expiration=[0]"
+ ));
+
+ OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@100/105]", "100", 100);
+ OutputVerifier.compareKeyValueTimestamp(getOutput(driver), "[k@5/10]", "5", 5);
+ assertThat(driver.readOutput("output"), nullValue());
+ }
+ }
+
+ private ProducerRecord<String, String> getOutput(final TopologyTestDriver driver) {
+ return driver.readOutput("output", new StringDeserializer(), new StringDeserializer());
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 1ef3f5d..f25f38f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -218,7 +218,7 @@ public class AbstractProcessorContextTest {
public void commit() {}
@Override
- public Long streamTime() {
+ public long streamTime() {
throw new RuntimeException("not implemented");
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 1d2f613..9823ae1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -86,6 +86,9 @@ public class PartitionGroupTest {
new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
group.addRawRecords(partition2, list2);
+ // 1:[1, 3, 5]
+ // 2:[2, 4, 6]
+ // st: 1
assertEquals(6, group.numBuffered());
assertEquals(3, group.numBuffered(partition1));
@@ -97,6 +100,9 @@ public class PartitionGroupTest {
// get one record, now the time should be advanced
record = group.nextRecord(info);
+ // 1:[3, 5]
+ // 2:[2, 4, 6]
+ // st: 2
assertEquals(partition1, info.partition());
assertEquals(1L, record.timestamp);
assertEquals(5, group.numBuffered());
@@ -106,6 +112,9 @@ public class PartitionGroupTest {
// get one record, now the time should be advanced
record = group.nextRecord(info);
+ // 1:[3, 5]
+ // 2:[4, 6]
+ // st: 3
assertEquals(partition2, info.partition());
assertEquals(2L, record.timestamp);
assertEquals(4, group.numBuffered());
@@ -113,13 +122,15 @@ public class PartitionGroupTest {
assertEquals(2, group.numBuffered(partition2));
assertEquals(3L, group.timestamp());
- // add three 3 records with timestamp 2, 4, 6 to partition-1 again
+ // add 2 more records with timestamp 2, 4 to partition-1
final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue));
group.addRawRecords(partition1, list3);
-
+ // 1:[3, 5, 2, 4]
+ // 2:[4, 6]
+ // st: 3 (non-decreasing, so adding 2 doesn't change it)
assertEquals(6, group.numBuffered());
assertEquals(4, group.numBuffered(partition1));
assertEquals(2, group.numBuffered(partition2));
@@ -127,6 +138,9 @@ public class PartitionGroupTest {
// get one record, time should not be advanced
record = group.nextRecord(info);
+ // 1:[5, 2, 4]
+ // 2:[4, 6]
+ // st: 3 (2's presence prevents it from advancing to 4)
assertEquals(partition1, info.partition());
assertEquals(3L, record.timestamp);
assertEquals(5, group.numBuffered());
@@ -134,8 +148,11 @@ public class PartitionGroupTest {
assertEquals(2, group.numBuffered(partition2));
assertEquals(3L, group.timestamp());
- // get one more record, now time should be advanced
+ // get one record, time should not be advanced
record = group.nextRecord(info);
+ // 1:[2, 4]
+ // 2:[4, 6]
+ // st: 3 (2's presence prevents it from advancing to 4)
assertEquals(partition1, info.partition());
assertEquals(5L, record.timestamp);
assertEquals(4, group.numBuffered());
@@ -143,8 +160,11 @@ public class PartitionGroupTest {
assertEquals(2, group.numBuffered(partition2));
assertEquals(3L, group.timestamp());
- // get one more record, time should not be advanced
+ // get one more record, now time should be advanced
record = group.nextRecord(info);
+ // 1:[4]
+ // 2:[4, 6]
+ // st: 4
assertEquals(partition1, info.partition());
assertEquals(2L, record.timestamp);
assertEquals(3, group.numBuffered());
@@ -152,8 +172,11 @@ public class PartitionGroupTest {
assertEquals(2, group.numBuffered(partition2));
assertEquals(4L, group.timestamp());
- // get one more record, now time should be advanced
+ // get one more record, time should not be advanced
record = group.nextRecord(info);
+ // 1:[4]
+ // 2:[6]
+ // st: 4
assertEquals(partition2, info.partition());
assertEquals(4L, record.timestamp);
assertEquals(2, group.numBuffered());
@@ -163,6 +186,9 @@ public class PartitionGroupTest {
// get one more record, time should not be advanced
record = group.nextRecord(info);
+ // 1:[]
+ // 2:[6]
+ // st: 4 (doesn't advance because 1 is empty, so it's still reporting the last-known time of 4)
assertEquals(partition1, info.partition());
assertEquals(4L, record.timestamp);
assertEquals(1, group.numBuffered());
@@ -172,6 +198,9 @@ public class PartitionGroupTest {
// get one more record, time should not be advanced
record = group.nextRecord(info);
+ // 1:[]
+ // 2:[]
+ // st: 4 (1 and 2 are empty, so they are still reporting the last-known times of 4 and 6.)
assertEquals(partition2, info.partition());
assertEquals(6L, record.timestamp);
assertEquals(0, group.numBuffered());
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 3251489..ca6d5a7 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -61,7 +61,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
private Serde<?> keySerde;
private Serde<?> valSerde;
private long timestamp = -1L;
- private long streamTime = -1;
+ private long streamTime = -1L;
public InternalMockProcessorContext() {
this(null,
@@ -119,12 +119,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
valSerde,
new StreamsMetricsImpl(new Metrics(), "mock"),
new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
- new RecordCollector.Supplier() {
- @Override
- public RecordCollector recordCollector() {
- return collector;
- }
- },
+ () -> collector,
cache
);
}
@@ -180,12 +175,12 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
@Override
public void initialized() {}
- public void setStreamTime(final long time) {
- streamTime = time;
+ public void setStreamTime(final long currentTime) {
+ streamTime = currentTime;
}
@Override
- public Long streamTime() {
+ public long streamTime() {
return streamTime;
}
@@ -346,5 +341,4 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
return new WrappedBatchingStateRestoreCallback(restoreCallback);
}
-
}
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 7b695c4..521a50e 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -86,8 +86,8 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
}
@Override
- public Long streamTime() {
- throw new RuntimeException("not implemented");
+ public long streamTime() {
+ throw new RuntimeException("streamTime is not implemented for NoOpProcessorContext");
}
@Override
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index d0d4ed1..497a6c3 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -54,6 +54,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -63,9 +64,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
-import java.util.Arrays;
import java.util.regex.Pattern;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -97,13 +100,11 @@ public class TopologyTestDriverTest {
private final ConsumerRecord<byte[], byte[]> consumerRecord2 = consumerRecordFactory.create(SOURCE_TOPIC_2, key2, value2, timestamp2);
private TopologyTestDriver testDriver;
- private final Properties config = new Properties() {
- {
- put(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver");
- put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
- put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
- }
- };
+ private final Properties config = mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "test-TopologyTestDriver"),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath())
+ ));
private KeyValueStore<String, Long> store;
private final StringDeserializer stringDeserializer = new StringDeserializer();
|