This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new af98326 MINOR: Removed deprecated schedule function (#4908) af98326 is described below commit af983267be7a2d0f81527f5a348af377f30caee4 Author: Guozhang Wang AuthorDate: Fri May 4 08:42:01 2018 -0700 MINOR: Removed deprecated schedule function (#4908) While working on this, I also refactored the MockProcessor out of the MockProcessorSupplier to cleanup the unit test paths. Reviewers: John Roesler , Bill Bejeck , Matthias J. Sax --- .../examples/wordcount/WordCountProcessorDemo.java | 4 - .../kafka/streams/kstream/KGroupedStream.java | 4 +- .../kafka/streams/kstream/TimeWindowedKStream.java | 4 +- .../apache/kafka/streams/kstream/Transformer.java | 21 ---- .../kstream/internals/KStreamTransform.java | 9 -- .../kstream/internals/KStreamTransformValues.java | 14 --- .../kafka/streams/processor/AbstractProcessor.java | 17 +-- .../apache/kafka/streams/processor/Processor.java | 11 -- .../kafka/streams/processor/ProcessorContext.java | 12 --- .../internals/GlobalProcessorContextImpl.java | 10 -- .../processor/internals/ProcessorContextImpl.java | 11 -- .../streams/processor/internals/ProcessorNode.java | 4 +- .../processor/internals/StandbyContextImpl.java | 9 -- .../apache/kafka/streams/StreamsBuilderTest.java | 8 +- .../org/apache/kafka/streams/TopologyTest.java | 4 - .../streams/integration/EosIntegrationTest.java | 5 - .../integration/RestoreIntegrationTest.java | 9 +- .../kafka/streams/kstream/KStreamBuilderTest.java | 8 +- .../kstream/internals/AbstractStreamTest.java | 6 +- .../kstream/internals/KStreamBranchTest.java | 16 +-- .../kstream/internals/KStreamFilterTest.java | 16 +-- .../kstream/internals/KStreamFlatMapTest.java | 10 +- .../internals/KStreamFlatMapValuesTest.java | 12 +-- .../internals/KStreamGlobalKTableJoinTest.java | 12 ++- .../internals/KStreamGlobalKTableLeftJoinTest.java | 16 +-- .../streams/kstream/internals/KStreamImplTest.java | 27 +++-- .../kstream/internals/KStreamKStreamJoinTest.java | 40 +++---- .../internals/KStreamKStreamLeftJoinTest.java | 17 +-- .../kstream/internals/KStreamKTableJoinTest.java | 13 ++- .../internals/KStreamKTableLeftJoinTest.java | 10 +- .../streams/kstream/internals/KStreamMapTest.java | 10 +- .../kstream/internals/KStreamMapValuesTest.java | 13 +-- .../kstream/internals/KStreamSelectKeyTest.java | 8 +- .../kstream/internals/KStreamTransformTest.java | 120 ++++++++++----------- .../internals/KStreamTransformValuesTest.java | 19 ++-- .../internals/KStreamWindowAggregateTest.java | 44 ++++---- .../kstream/internals/KTableAggregateTest.java | 46 ++++---- .../kstream/internals/KTableFilterTest.java | 81 +++++++------- .../streams/kstream/internals/KTableImplTest.java | 24 ++--- .../internals/KTableKTableInnerJoinTest.java | 41 +++---- .../internals/KTableKTableLeftJoinTest.java | 24 +++-- .../internals/KTableKTableOuterJoinTest.java | 25 +++-- .../kstream/internals/KTableMapKeysTest.java | 8 +- .../kstream/internals/KTableMapValuesTest.java | 31 +++--- .../kstream/internals/KTableSourceTest.java | 20 ++-- .../streams/processor/TopologyBuilderTest.java | 4 - .../internals/AbstractProcessorContextTest.java | 3 - .../processor/internals/GlobalStateTaskTest.java | 4 +- .../internals/InternalTopologyBuilderTest.java | 3 - .../processor/internals/ProcessorNodeTest.java | 10 -- .../processor/internals/ProcessorTopologyTest.java | 37 +------ .../processor/internals/PunctuationQueueTest.java | 96 ++++++----------- .../processor/internals/StreamTaskTest.java | 18 ++-- .../processor/internals/StreamThreadTest.java | 4 - .../kafka/test/InternalMockProcessorContext.java | 3 - .../org/apache/kafka/test/KStreamTestDriver.java | 15 --- ...ckProcessorSupplier.java => MockProcessor.java} | 87 ++++++--------- .../org/apache/kafka/test/MockProcessorNode.java | 23 ++-- .../apache/kafka/test/MockProcessorSupplier.java | 85 +++------------ .../apache/kafka/test/NoOpProcessorContext.java | 4 - .../streams/scala/kstream/KGroupedStream.scala | 1 - .../kafka/streams/scala/kstream/KStream.scala | 8 -- .../scala/kstream/TimeWindowedKStream.scala | 1 - .../streams/processor/MockProcessorContext.java | 8 -- .../kafka/streams/MockProcessorContextTest.java | 4 - .../kafka/streams/TopologyTestDriverTest.java | 14 --- 66 files changed, 490 insertions(+), 815 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index dbf2b70..523bb46 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -98,10 +98,6 @@ public class WordCountProcessorDemo { } @Override - @Deprecated - public void punctuate(long timestamp) {} - - @Override public void close() {} }; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 29de64c..d8589e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; @@ -177,7 +176,8 @@ public interface KGroupedStream { * query the value of the key on a parallel running instance of your Kafka Streams application. * * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. - * Note: the valueSerde will be automatically set to {@link Serdes#Long()} if there is no valueSerde provided + * Note: the valueSerde will be automatically set to {@link org.apache.kafka.common.serialization.Serdes#Long() Serdes#Long()} + * if there is no valueSerde provided * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that * represent the latest (rolling) count (i.e., number of records) for each key */ diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java index 8ef0bd7..7f9752b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; @@ -101,7 +100,8 @@ public interface TimeWindowedKStream { * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to * query the value of the key on a parallel running instance of your Kafka Streams application. ** @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. - * Note: the valueSerde will be automatically set to {@link Serdes#Long()} if there is no valueSerde provided + * Note: the valueSerde will be automatically set to {@link org.apache.kafka.common.serialization.Serdes#Long() Serdes#Long()} + * if there is no valueSerde provided * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that * represent the latest (rolling) count (i.e., number of records) for each key */ diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java index a83b4a3..bbf8c25 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -21,7 +21,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.To; /** @@ -82,26 +81,6 @@ public interface Transformer { R transform(final K key, final V value); /** - * Perform any periodic operations and possibly generate new {@link KeyValue} pairs if this processor - * {@link ProcessorContext#schedule(long) schedules itself} with the context during - * {@link #init(ProcessorContext) initialization}. - *

- * To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and - * {@link ProcessorContext#forward(Object, Object, To)} can be used. - *

- * Note that {@code punctuate} is called based on stream time (i.e., time progresses with regard to - * timestamps return by the used {@link TimestampExtractor}) - * and not based on wall-clock time. - * - * @deprecated Please use {@link Punctuator} functional interface instead. - * - * @param timestamp the stream time when {@code punctuate} is being called - * @return new {@link KeyValue} pair to be forwarded to down stream—if {@code null} will not be forwarded - */ - @Deprecated - R punctuate(final long timestamp); - - /** * Close this processor and clean up any resources. *

* To generate new {@link KeyValue} pairs {@link ProcessorContext#forward(Object, Object)} and diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java index 0afadbb..1ae8ede 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java @@ -59,15 +59,6 @@ public class KStreamTransform implements ProcessorSupplier { context().forward(pair.key, pair.value); } - @SuppressWarnings("deprecation") - @Override - public void punctuate(long timestamp) { - KeyValue pair = transformer.punctuate(timestamp); - - if (pair != null) - context().forward(pair.key, pair.value); - } - @Override public void close() { transformer.close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java index e644597..d09fae2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java @@ -106,12 +106,6 @@ public class KStreamTransformValues implements ProcessorSupplier return context.schedule(interval, type, callback); } - @SuppressWarnings("deprecation") - @Override - public void schedule(final long interval) { - context.schedule(interval); - } - @Override public void forward(final K key, final V value) { throw new StreamsException("ProcessorContext#forward() must not be called within TransformValues."); @@ -177,14 +171,6 @@ public class KStreamTransformValues implements ProcessorSupplier context.forward(key, valueTransformer.transform(key, value)); } - @SuppressWarnings("deprecation") - @Override - public void punctuate(long timestamp) { - if (valueTransformer.punctuate(timestamp) != null) { - throw new StreamsException("ValueTransformer#punctuate must return null."); - } - } - @Override public void close() { valueTransformer.close(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java index 14e6c2a..83abfca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams.processor; /** * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op - * implementations of {@link #punctuate(long)} and {@link #close()}. + * implementation of {@link #close()}. * * @param the type of keys * @param the type of values @@ -36,21 +36,6 @@ public abstract class AbstractProcessor implements Processor { } /** - * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context - * during {@link #init(ProcessorContext) initialization}. - *

- * This method does nothing by default; if desired, subclasses should override it with custom functionality. - *

- * - * @param timestamp the wallclock time when this method is being called - */ - @SuppressWarnings("deprecation") - @Override - public void punctuate(final long timestamp) { - // do nothing - } - - /** * Close this processor and clean up any resources. *

* This method does nothing by default; if desired, subclasses should override it with custom functionality. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java index 2ed17df..bcdb2f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java @@ -48,17 +48,6 @@ public interface Processor { void process(K key, V value); /** - * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context - * during {@link #init(ProcessorContext) initialization}. - * - * @deprecated Please use {@link Punctuator} functional interface instead. - * - * @param timestamp the stream time when this method is being called - */ - @Deprecated - void punctuate(long timestamp); - - /** * Close this processor and clean up any resources. Be aware that {@link #close()} is called after an internal cleanup. * Thus, it is not possible to write anything to Kafka as underlying clients are already closed. *

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 404b225..93a1455 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -132,18 +132,6 @@ public interface ProcessorContext { final Punctuator callback); /** - * Schedules a periodic operation for processors. A processor may call this method during - * {@link Processor#init(ProcessorContext) initialization} to - * schedule a periodic call - called a punctuation - to {@link Processor#punctuate(long)}. - * - * @deprecated Please use {@link #schedule(long, PunctuationType, Punctuator)} instead. - * - * @param interval the time interval between punctuations - */ - @Deprecated - void schedule(final long interval); - - /** * Forwards a key/value pair to all downstream processors. * Used the input record's timestamp as timestamp for the output record. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 6bc4121..717e6a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -96,14 +96,4 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext { throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context."); } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @SuppressWarnings("deprecation") - @Override - public void schedule(long interval) { - throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context."); - } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 178937f..a539a1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -152,15 +152,4 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re return task.schedule(interval, type, callback); } - @Override - @Deprecated - public void schedule(final long interval) { - schedule(interval, PunctuationType.STREAM_TIME, new Punctuator() { - @Override - public void punctuate(final long timestamp) { - currentNode().processor().punctuate(timestamp); - } - }); - } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 0854b67..a0a7041 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -37,9 +37,9 @@ public class ProcessorNode { private final List> children; private final Map> childByName; - private final String name; - private final Processor processor; private NodeMetrics nodeMetrics; + private final Processor processor; + private final String name; private final Time time; private K key; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index ef4585a..6aeca44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -188,15 +188,6 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle * @throws UnsupportedOperationException on every invocation */ @Override - @Deprecated - public void schedule(final long interval) { - throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks."); - } - - /** - * @throws UnsupportedOperationException on every invocation - */ - @Override public RecordContext recordContext() { throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks."); } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index d3e01fa..15e55d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -189,7 +189,7 @@ public class StreamsBuilderTest { driver.pipeInput(recordFactory.create("topic-source", "A", "aa")); // no exception was thrown - assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); + assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed); } @Test @@ -208,8 +208,8 @@ public class StreamsBuilderTest { final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); driver.pipeInput(recordFactory.create("topic-source", "A", "aa")); - assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed); - assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed); + assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed); + assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed); } @Test @@ -232,7 +232,7 @@ public class StreamsBuilderTest { driver.pipeInput(recordFactory.create(topic2, "C", "cc")); driver.pipeInput(recordFactory.create(topic1, "D", "dd")); - assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed); + assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index eee3386..0c34723 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -296,10 +296,6 @@ public class TopologyTest { @Override public void process(Object key, Object value) { } - @SuppressWarnings("deprecation") - @Override - public void punctuate(long timestamp) { } - @Override public void close() { } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index c4ea964..30c90c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -644,11 +644,6 @@ public class EosIntegrationTest { } @Override - public KeyValue punctuate(final long timestamp) { - return null; - } - - @Override public void close() { } }; } }, storeNames) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index 19ddedf..12b0d97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -57,7 +57,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -106,13 +105,12 @@ public class RestoreIntegrationTest { } @After - public void shutdown() throws IOException { + public void shutdown() { if (kafkaStreams != null) { kafkaStreams.close(30, TimeUnit.SECONDS); } } - @Test public void shouldRestoreState() throws ExecutionException, InterruptedException { final AtomicInteger numReceived = new AtomicInteger(0); @@ -276,11 +274,6 @@ public class RestoreIntegrationTest { } @Override - public void punctuate(final long timestamp) { - - } - - @Override public void close() { } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index 81bdb31..b63f2de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -117,7 +117,7 @@ public class KStreamBuilderTest { driver.pipeInput(recordFactory.create("topic-source", "A", "aa")); // no exception was thrown - assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); + assertEquals(Utils.mkList("A:aa"), processorSupplier.theCapturedProcessor().processed); } @Test @@ -134,8 +134,8 @@ public class KStreamBuilderTest { driver = new TopologyTestDriverWrapper(builder.internalTopologyBuilder, props); driver.pipeInput(recordFactory.create("topic-source", "A", "aa")); - assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.processed); - assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.processed); + assertEquals(Utils.mkList("A:aa"), sourceProcessorSupplier.theCapturedProcessor().processed); + assertEquals(Utils.mkList("A:aa"), throughProcessorSupplier.theCapturedProcessor().processed); } @Test @@ -170,7 +170,7 @@ public class KStreamBuilderTest { driver.pipeInput(recordFactory.create(topic2, "C", "cc")); driver.pipeInput(recordFactory.create(topic1, "D", "dd")); - assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed); + assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java index 2aa07f3..1f9bcba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java @@ -76,12 +76,12 @@ public class AbstractStreamTest { public void testShouldBeExtensible() { final StreamsBuilder builder = new StreamsBuilder(); final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7}; - final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); final String topicName = "topic"; ExtendedKStream stream = new ExtendedKStream<>(builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()))); - stream.randomFilter().process(processor); + stream.randomFilter().process(supplier); final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "abstract-stream-test"); @@ -94,7 +94,7 @@ public class AbstractStreamTest { driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey)); } - assertTrue(processor.processed.size() <= expectedKeys.length); + assertTrue(supplier.theCapturedProcessor().processed.size() <= expectedKeys.length); } private class ExtendedKStream extends AbstractStream { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java index a70bc37..bd3d60b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -26,13 +26,14 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.lang.reflect.Array; +import java.util.List; import java.util.Properties; import static org.junit.Assert.assertEquals; @@ -90,17 +91,15 @@ public class KStreamBranchTest { KStream stream; KStream[] branches; - MockProcessorSupplier[] processors; stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())); branches = stream.branch(isEven, isMultipleOfThree, isOdd); assertEquals(3, branches.length); - processors = (MockProcessorSupplier[]) Array.newInstance(MockProcessorSupplier.class, branches.length); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); for (int i = 0; i < branches.length; i++) { - processors[i] = new MockProcessorSupplier<>(); - branches[i].process(processors[i]); + branches[i].process(supplier); } driver = new TopologyTestDriver(builder.build(), props); @@ -108,9 +107,10 @@ public class KStreamBranchTest { driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey)); } - assertEquals(3, processors[0].processed.size()); - assertEquals(1, processors[1].processed.size()); - assertEquals(2, processors[2].processed.size()); + final List> processors = supplier.capturedProcessors(3); + assertEquals(3, processors.get(0).processed.size()); + assertEquals(1, processors.get(1).processed.size()); + assertEquals(2, processors.get(2).processed.size()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java index a67d688..d338fe3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java @@ -74,18 +74,18 @@ public class KStreamFilterTest { final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7}; KStream stream; - MockProcessorSupplier processor; + MockProcessorSupplier supplier; - processor = new MockProcessorSupplier<>(); + supplier = new MockProcessorSupplier<>(); stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())); - stream.filter(isMultipleOfThree).process(processor); + stream.filter(isMultipleOfThree).process(supplier); driver = new TopologyTestDriver(builder.build(), props); for (int expectedKey : expectedKeys) { driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey)); } - assertEquals(2, processor.processed.size()); + assertEquals(2, supplier.theCapturedProcessor().processed.size()); } @Test @@ -94,18 +94,18 @@ public class KStreamFilterTest { final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7}; KStream stream; - MockProcessorSupplier processor; + MockProcessorSupplier supplier; - processor = new MockProcessorSupplier<>(); + supplier = new MockProcessorSupplier<>(); stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())); - stream.filterNot(isMultipleOfThree).process(processor); + stream.filterNot(isMultipleOfThree).process(supplier); driver = new TopologyTestDriver(builder.build(), props); for (int expectedKey : expectedKeys) { driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey)); } - assertEquals(5, processor.processed.size()); + assertEquals(5, supplier.theCapturedProcessor().processed.size()); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java index e414218..9ce24b5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java @@ -82,23 +82,23 @@ public class KStreamFlatMapTest { final int[] expectedKeys = {0, 1, 2, 3}; KStream stream; - MockProcessorSupplier processor; + MockProcessorSupplier supplier; - processor = new MockProcessorSupplier<>(); + supplier = new MockProcessorSupplier<>(); stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())); - stream.flatMap(mapper).process(processor); + stream.flatMap(mapper).process(supplier); driver = new TopologyTestDriver(builder.build(), props); for (int expectedKey : expectedKeys) { driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey)); } - assertEquals(6, processor.processed.size()); + assertEquals(6, supplier.theCapturedProcessor().processed.size()); String[] expected = {"10:V1", "20:V2", "21:V2", "30:V3", "31:V3", "32:V3"}; for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], processor.processed.get(i)); + assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i)); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java index 14213c9..221b02b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java @@ -80,8 +80,8 @@ public class KStreamFlatMapValuesTest { final int[] expectedKeys = {0, 1, 2, 3}; final KStream stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer())); - final MockProcessorSupplier processor = new MockProcessorSupplier<>(); - stream.flatMapValues(mapper).process(processor); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + stream.flatMapValues(mapper).process(supplier); driver = new TopologyTestDriver(builder.build(), props); for (final int expectedKey : expectedKeys) { @@ -91,7 +91,7 @@ public class KStreamFlatMapValuesTest { String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"}; - assertArrayEquals(expected, processor.processed.toArray()); + assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray()); } @@ -113,9 +113,9 @@ public class KStreamFlatMapValuesTest { final int[] expectedKeys = {0, 1, 2, 3}; final KStream stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer())); - final MockProcessorSupplier processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - stream.flatMapValues(mapper).process(processor); + stream.flatMapValues(mapper).process(supplier); driver = new TopologyTestDriver(builder.build(), props); for (final int expectedKey : expectedKeys) { @@ -125,6 +125,6 @@ public class KStreamFlatMapValuesTest { String[] expected = {"0:v0", "0:k0", "1:v1", "1:k1", "2:v2", "2:k2", "3:v3", "3:k3"}; - assertArrayEquals(expected, processor.processed.toArray()); + assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java index 2936f5f..6e5b816 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; @@ -36,7 +37,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.Collection; import java.util.Properties; import java.util.Set; @@ -50,19 +50,19 @@ public class KStreamGlobalKTableJoinTest { private final Serde intSerde = Serdes.Integer(); private final Serde stringSerde = Serdes.String(); private TopologyTestDriver driver; - private MockProcessorSupplier processor; + private MockProcessor processor; private final int[] expectedKeys = {0, 1, 2, 3}; private StreamsBuilder builder; @Before - public void setUp() throws IOException { + public void setUp() { builder = new StreamsBuilder(); final KStream stream; final GlobalKTable table; // value of stream optionally contains key of table final KeyValueMapper keyMapper; - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); final Consumed streamConsumed = Consumed.with(intSerde, stringSerde); final Consumed tableConsumed = Consumed.with(stringSerde, stringSerde); stream = builder.stream(streamTopic, streamConsumed); @@ -76,7 +76,7 @@ public class KStreamGlobalKTableJoinTest { return tokens.length > 1 ? tokens[1] : null; } }; - stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor); + stream.join(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier); final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-join-test"); @@ -86,6 +86,8 @@ public class KStreamGlobalKTableJoinTest { props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); driver = new TopologyTestDriver(builder.build(), props); + + processor = supplier.theCapturedProcessor(); } @After diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java index 8882113..b3551ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java @@ -29,13 +29,13 @@ import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.Collection; import java.util.Properties; import java.util.Set; @@ -48,20 +48,22 @@ public class KStreamGlobalKTableLeftJoinTest { final private String globalTableTopic = "globalTableTopic"; final private Serde intSerde = Serdes.Integer(); final private Serde stringSerde = Serdes.String(); + + private MockProcessor processor; private TopologyTestDriver driver; - private MockProcessorSupplier processor; - private final int[] expectedKeys = {0, 1, 2, 3}; private StreamsBuilder builder; + private final int[] expectedKeys = {0, 1, 2, 3}; + @Before - public void setUp() throws IOException { + public void setUp() { builder = new StreamsBuilder(); final KStream stream; final GlobalKTable table; // value of stream optionally contains key of table final KeyValueMapper keyMapper; - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); final Consumed streamConsumed = Consumed.with(intSerde, stringSerde); final Consumed tableConsumed = Consumed.with(stringSerde, stringSerde); stream = builder.stream(streamTopic, streamConsumed); @@ -75,7 +77,7 @@ public class KStreamGlobalKTableLeftJoinTest { return tokens.length > 1 ? tokens[1] : null; } }; - stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(processor); + stream.leftJoin(table, keyMapper, MockValueJoiner.TOSTRING_JOINER).process(supplier); final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-global-ktable-left-join-test"); @@ -85,6 +87,8 @@ public class KStreamGlobalKTableLeftJoinTest { props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); driver = new TopologyTestDriver(builder.build(), props); + + processor = supplier.theCapturedProcessor(); } private void pushToStream(final int messageCount, final String valuePrefix, final boolean includeForeignKey) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index f397246..797575d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -69,12 +69,15 @@ import static org.junit.Assert.fail; public class KStreamImplTest { - final private Serde stringSerde = Serdes.String(); - final private Serde intSerde = Serdes.Integer(); + private final Serde stringSerde = Serdes.String(); + private final Serde intSerde = Serdes.Integer(); + private final Consumed consumed = Consumed.with(stringSerde, stringSerde); private final Consumed stringConsumed = Consumed.with(Serdes.String(), Serdes.String()); + + private final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); + private KStream testStream; private StreamsBuilder builder; - private final Consumed consumed = Consumed.with(stringSerde, stringSerde); private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); private TopologyTestDriver driver; @@ -222,12 +225,11 @@ public class KStreamImplTest { final StreamsBuilder builder = new StreamsBuilder(); final String input = "topic"; final KStream stream = builder.stream(input, consumed); - final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier); driver = new TopologyTestDriver(builder.build(), props); driver.pipeInput(recordFactory.create(input, "a", "b")); - assertThat(processorSupplier.processed, equalTo(Collections.singletonList("a:b"))); + assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("a:b"))); } @Test @@ -235,13 +237,12 @@ public class KStreamImplTest { final StreamsBuilder builder = new StreamsBuilder(); final String input = "topic"; final KStream stream = builder.stream(input, consumed); - final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); stream.to("to-topic", Produced.with(stringSerde, stringSerde)); builder.stream("to-topic", consumed).process(processorSupplier); driver = new TopologyTestDriver(builder.build(), props); driver.pipeInput(recordFactory.create(input, "e", "f")); - assertThat(processorSupplier.processed, equalTo(Collections.singletonList("e:f"))); + assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("e:f"))); } @Test @@ -519,7 +520,6 @@ public class KStreamImplTest { final KStream source2 = builder.stream(topic2); final KStream merged = source1.merge(source2); - final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); merged.process(processorSupplier); driver = new TopologyTestDriver(builder.build(), props); @@ -529,7 +529,7 @@ public class KStreamImplTest { driver.pipeInput(recordFactory.create(topic2, "C", "cc")); driver.pipeInput(recordFactory.create(topic1, "D", "dd")); - assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed); + assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.theCapturedProcessor().processed); } @Test @@ -545,7 +545,6 @@ public class KStreamImplTest { final KStream source4 = builder.stream(topic4); final KStream merged = source1.merge(source2).merge(source3).merge(source4); - final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); merged.process(processorSupplier); driver = new TopologyTestDriver(builder.build(), props); @@ -560,14 +559,13 @@ public class KStreamImplTest { driver.pipeInput(recordFactory.create(topic1, "H", "hh")); assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"), - processorSupplier.processed); + processorSupplier.theCapturedProcessor().processed); } @Test public void shouldProcessFromSourceThatMatchPattern() { final KStream pattern2Source = builder.stream(Pattern.compile("topic-\\d")); - final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); pattern2Source.process(processorSupplier); driver = new TopologyTestDriver(builder.build(), props); @@ -579,7 +577,7 @@ public class KStreamImplTest { driver.pipeInput(recordFactory.create("topic-7", "E", "ee")); assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"), - processorSupplier.processed); + processorSupplier.theCapturedProcessor().processed); } @Test @@ -591,7 +589,6 @@ public class KStreamImplTest { final KStream source3 = builder.stream(topic3); final KStream merged = pattern2Source1.merge(pattern2Source2).merge(source3); - final MockProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); merged.process(processorSupplier); driver = new TopologyTestDriver(builder.build(), props); @@ -603,6 +600,6 @@ public class KStreamImplTest { driver.pipeInput(recordFactory.create(topic3, "E", "ee")); assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"), - processorSupplier.processed); + processorSupplier.theCapturedProcessor().processed); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index 63a040a..5d849ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.test.ConsumerRecordFactory; @@ -119,9 +120,7 @@ public class KStreamKStreamJoinTest { final KStream stream1; final KStream stream2; final KStream joined; - final MockProcessorSupplier processor; - - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); joined = stream1.join( @@ -129,7 +128,7 @@ public class KStreamKStreamJoinTest { MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), Joined.with(intSerde, stringSerde, stringSerde)); - joined.process(processor); + joined.process(supplier); final Collection> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -138,6 +137,8 @@ public class KStreamKStreamJoinTest { driver = new TopologyTestDriver(builder.build(), props); + final MockProcessor processor = supplier.theCapturedProcessor(); + // push two items to the primary stream. the other window is empty // w1 = {} // w2 = {} @@ -220,9 +221,7 @@ public class KStreamKStreamJoinTest { final KStream stream1; final KStream stream2; final KStream joined; - final MockProcessorSupplier processor; - - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); @@ -231,7 +230,7 @@ public class KStreamKStreamJoinTest { MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), Joined.with(intSerde, stringSerde, stringSerde)); - joined.process(processor); + joined.process(supplier); final Collection> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); assertEquals(1, copartitionGroups.size()); @@ -239,6 +238,8 @@ public class KStreamKStreamJoinTest { driver = new TopologyTestDriver(builder.build(), props, 0L); + final MockProcessor processor = supplier.theCapturedProcessor(); + // push two items to the primary stream. the other window is empty.this should produce two items // w1 = {} // w2 = {} @@ -323,9 +324,7 @@ public class KStreamKStreamJoinTest { final KStream stream1; final KStream stream2; final KStream joined; - final MockProcessorSupplier processor; - - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); @@ -334,7 +333,7 @@ public class KStreamKStreamJoinTest { MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), Joined.with(intSerde, stringSerde, stringSerde)); - joined.process(processor); + joined.process(supplier); final Collection> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -352,6 +351,8 @@ public class KStreamKStreamJoinTest { driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time)); } + final MockProcessor processor = supplier.theCapturedProcessor(); + processor.checkAndClearProcessResult(); // push two items to the other stream. this should produce two items. @@ -543,9 +544,7 @@ public class KStreamKStreamJoinTest { final KStream stream1; final KStream stream2; final KStream joined; - final MockProcessorSupplier processor; - - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); @@ -556,7 +555,7 @@ public class KStreamKStreamJoinTest { Joined.with(intSerde, stringSerde, stringSerde)); - joined.process(processor); + joined.process(supplier); final Collection> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -565,6 +564,8 @@ public class KStreamKStreamJoinTest { driver = new TopologyTestDriver(builder.build(), props, time); + final MockProcessor processor = supplier.theCapturedProcessor(); + for (int i = 0; i < expectedKeys.length; i++) { driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i)); } @@ -653,9 +654,8 @@ public class KStreamKStreamJoinTest { final KStream stream1; final KStream stream2; final KStream joined; - final MockProcessorSupplier processor; + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - processor = new MockProcessorSupplier<>(); stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); @@ -664,7 +664,7 @@ public class KStreamKStreamJoinTest { MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(0).before(100), Joined.with(intSerde, stringSerde, stringSerde)); - joined.process(processor); + joined.process(supplier); final Collection> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -673,6 +673,8 @@ public class KStreamKStreamJoinTest { driver = new TopologyTestDriver(builder.build(), props, time); + final MockProcessor processor = supplier.theCapturedProcessor(); + for (int i = 0; i < expectedKeys.length; i++) { driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i], time + i)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index cb1aaf1..c67e13d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; @@ -83,9 +84,7 @@ public class KStreamKStreamLeftJoinTest { final KStream stream1; final KStream stream2; final KStream joined; - final MockProcessorSupplier processor; - - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); @@ -93,7 +92,7 @@ public class KStreamKStreamLeftJoinTest { MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), Joined.with(intSerde, stringSerde, stringSerde)); - joined.process(processor); + joined.process(supplier); final Collection> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -102,6 +101,8 @@ public class KStreamKStreamLeftJoinTest { driver = new TopologyTestDriver(builder.build(), props, 0L); + final MockProcessor processor = supplier.theCapturedProcessor(); + // push two items to the primary stream. the other window is empty // w1 {} // w2 {} @@ -168,9 +169,7 @@ public class KStreamKStreamLeftJoinTest { final KStream stream1; final KStream stream2; final KStream joined; - final MockProcessorSupplier processor; - - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); stream1 = builder.stream(topic1, consumed); stream2 = builder.stream(topic2, consumed); @@ -178,7 +177,7 @@ public class KStreamKStreamLeftJoinTest { MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), Joined.with(intSerde, stringSerde, stringSerde)); - joined.process(processor); + joined.process(supplier); final Collection> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -187,6 +186,8 @@ public class KStreamKStreamLeftJoinTest { driver = new TopologyTestDriver(builder.build(), props, time); + final MockProcessor processor = supplier.theCapturedProcessor(); + // push two items to the primary stream. the other window is empty. this should produce two items // w1 = {} // w2 = {} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java index 5b2a797..ec31b5a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessorSupplier; @@ -54,9 +55,11 @@ public class KStreamKTableJoinTest { private final Serde intSerde = Serdes.Integer(); private final Serde stringSerde = Serdes.String(); private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); - private TopologyTestDriver driver; - private MockProcessorSupplier processor; + private final int[] expectedKeys = {0, 1, 2, 3}; + + private MockProcessor processor; + private TopologyTestDriver driver; private StreamsBuilder builder; @Before @@ -66,11 +69,11 @@ public class KStreamKTableJoinTest { final KStream stream; final KTable table; - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); final Consumed consumed = Consumed.with(intSerde, stringSerde); stream = builder.stream(streamTopic, consumed); table = builder.table(tableTopic, consumed); - stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor); + stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(supplier); final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-join-test"); @@ -80,6 +83,8 @@ public class KStreamKTableJoinTest { props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); driver = new TopologyTestDriver(builder.build(), props, 0L); + + processor = supplier.theCapturedProcessor(); } private void pushToStream(final int messageCount, final String valuePrefix) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index 669f4c7..735f71c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.TestUtils; @@ -51,7 +52,8 @@ public class KStreamKTableLeftJoinTest { final private Serde stringSerde = Serdes.String(); private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); private TopologyTestDriver driver; - private MockProcessorSupplier processor; + private MockProcessor processor; + private final int[] expectedKeys = {0, 1, 2, 3}; private StreamsBuilder builder; @@ -63,11 +65,11 @@ public class KStreamKTableLeftJoinTest { final KStream stream; final KTable table; - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); final Consumed consumed = Consumed.with(intSerde, stringSerde); stream = builder.stream(streamTopic, consumed); table = builder.table(tableTopic, consumed); - stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor); + stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(supplier); final Properties props = new Properties(); props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-ktable-left-join-test"); @@ -77,6 +79,8 @@ public class KStreamKTableLeftJoinTest { props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); driver = new TopologyTestDriver(builder.build(), props, 0L); + + processor = supplier.theCapturedProcessor(); } private void pushToStream(final int messageCount, final String valuePrefix) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java index bb22204..b0a383b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java @@ -81,22 +81,22 @@ public class KStreamMapTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; KStream stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde)); - MockProcessorSupplier processor; + MockProcessorSupplier supplier; - processor = new MockProcessorSupplier<>(); - stream.map(mapper).process(processor); + supplier = new MockProcessorSupplier<>(); + stream.map(mapper).process(supplier); driver = new TopologyTestDriver(builder.build(), props); for (int expectedKey : expectedKeys) { driver.pipeInput(recordFactory.create(topicName, expectedKey, "V" + expectedKey)); } - assertEquals(4, processor.processed.size()); + assertEquals(4, supplier.theCapturedProcessor().processed.size()); String[] expected = new String[]{"V0:0", "V1:1", "V2:2", "V3:3"}; for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], processor.processed.get(i)); + assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java index 17a13e0..ed11038 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java @@ -44,6 +44,9 @@ public class KStreamMapValuesTest { final private Serde intSerde = Serdes.Integer(); final private Serde stringSerde = Serdes.String(); + final private MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + + private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new StringSerializer()); private TopologyTestDriver driver; private final Properties props = new Properties(); @@ -81,9 +84,8 @@ public class KStreamMapValuesTest { final int[] expectedKeys = {1, 10, 100, 1000}; KStream stream; - MockProcessorSupplier processor = new MockProcessorSupplier<>(); stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde)); - stream.mapValues(mapper).process(processor); + stream.mapValues(mapper).process(supplier); driver = new TopologyTestDriver(builder.build(), props); for (int expectedKey : expectedKeys) { @@ -91,7 +93,7 @@ public class KStreamMapValuesTest { } String[] expected = {"1:1", "10:2", "100:3", "1000:4"}; - assertArrayEquals(expected, processor.processed.toArray()); + assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray()); } @Test @@ -109,9 +111,8 @@ public class KStreamMapValuesTest { final int[] expectedKeys = {1, 10, 100, 1000}; KStream stream; - MockProcessorSupplier processor = new MockProcessorSupplier<>(); stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde)); - stream.mapValues(mapper).process(processor); + stream.mapValues(mapper).process(supplier); driver = new TopologyTestDriver(builder.build(), props); for (int expectedKey : expectedKeys) { @@ -119,7 +120,7 @@ public class KStreamMapValuesTest { } String[] expected = {"1:2", "10:12", "100:103", "1000:1004"}; - assertArrayEquals(expected, processor.processed.toArray()); + assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java index 0bf6452..1abc0b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java @@ -90,9 +90,9 @@ public class KStreamSelectKeyTest { KStream stream = builder.stream(topicName, Consumed.with(stringSerde, integerSerde)); - MockProcessorSupplier processor = new MockProcessorSupplier<>(); + MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - stream.selectKey(selector).process(processor); + stream.selectKey(selector).process(supplier); driver = new TopologyTestDriver(builder.build(), props); @@ -100,10 +100,10 @@ public class KStreamSelectKeyTest { driver.pipeInput(recordFactory.create(expectedValue)); } - assertEquals(3, processor.processed.size()); + assertEquals(3, supplier.theCapturedProcessor().processed.size()); for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], processor.processed.get(i)); + assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java index aa0cf7e..1567fe1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -48,6 +48,7 @@ public class KStreamTransformTest { private String topicName = "topic"; final private Serde intSerde = Serdes.Integer(); + private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer()); private TopologyTestDriver driver; private final Properties props = new Properties(); @@ -77,34 +78,26 @@ public class KStreamTransformTest { public void testTransform() { StreamsBuilder builder = new StreamsBuilder(); - TransformerSupplier> transformerSupplier = - new TransformerSupplier>() { - public Transformer> get() { - return new Transformer>() { - - private int total = 0; + final TransformerSupplier> transformerSupplier = new TransformerSupplier>() { + public Transformer> get() { + return new Transformer>() { - @Override - public void init(ProcessorContext context) { - } + private int total = 0; - @Override - public KeyValue transform(Number key, Number value) { - total += value.intValue(); - return KeyValue.pair(key.intValue() * 2, total); - } + @Override + public void init(final ProcessorContext context) {} - @Override - public KeyValue punctuate(long timestamp) { - return KeyValue.pair(-1, (int) timestamp); - } + @Override + public KeyValue transform(final Number key, final Number value) { + total += value.intValue(); + return KeyValue.pair(key.intValue() * 2, total); + } - @Override - public void close() { - } - }; - } - }; + @Override + public void close() {} + }; + } + }; final int[] expectedKeys = {1, 10, 100, 1000}; @@ -117,15 +110,18 @@ public class KStreamTransformTest { kstreamDriver.process(topicName, expectedKey, expectedKey * 10); } - kstreamDriver.punctuate(2); - kstreamDriver.punctuate(3); + // TODO: un-comment after replaced with TopologyTestDriver + //kstreamDriver.punctuate(2); + //kstreamDriver.punctuate(3); - assertEquals(6, processor.processed.size()); + //assertEquals(6, processor.theCapturedProcessor().processed.size()); - String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"}; + //String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"}; + + String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"}; for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], processor.processed.get(i)); + assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i)); } } @@ -133,40 +129,34 @@ public class KStreamTransformTest { public void testTransformWithNewDriverAndPunctuator() { StreamsBuilder builder = new StreamsBuilder(); - TransformerSupplier> transformerSupplier = - new TransformerSupplier>() { - public Transformer> get() { - return new Transformer>() { - - private int total = 0; - - @Override - public void init(final ProcessorContext context) { - context.schedule(1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() { - @Override - public void punctuate(long timestamp) { - context.forward(-1, (int) timestamp); - } - }); - } - - @Override - public KeyValue transform(Number key, Number value) { - total += value.intValue(); - return KeyValue.pair(key.intValue() * 2, total); - } - - @Override - public KeyValue punctuate(long timestamp) { - return null; - } - - @Override - public void close() { - } - }; - } - }; + TransformerSupplier> transformerSupplier = new TransformerSupplier>() { + public Transformer> get() { + return new Transformer>() { + + private int total = 0; + + @Override + public void init(final ProcessorContext context) { + context.schedule(1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() { + @Override + public void punctuate(long timestamp) { + context.forward(-1, (int) timestamp); + } + }); + } + + @Override + public KeyValue transform(final Number key, final Number value) { + total += value.intValue(); + return KeyValue.pair(key.intValue() * 2, total); + } + + @Override + public void close() {} + }; + } + }; + final int[] expectedKeys = {1, 10, 100, 1000}; @@ -184,12 +174,12 @@ public class KStreamTransformTest { // This tick further advances the clock to 3, which leads to the "-1:3" result driver.advanceWallClockTime(1); - assertEquals(6, processor.processed.size()); + assertEquals(6, processor.theCapturedProcessor().processed.size()); String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"}; for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], processor.processed.get(i)); + assertEquals(expected[i], processor.theCapturedProcessor().processed.get(i)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java index 59a6a21..6bfc813 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -49,6 +49,8 @@ public class KStreamTransformValuesTest { private String topicName = "topic"; final private Serde intSerde = Serdes.Integer(); + final private MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + private final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer()); private TopologyTestDriver driver; private final Properties props = new Properties(); @@ -107,9 +109,8 @@ public class KStreamTransformValuesTest { final int[] expectedKeys = {1, 10, 100, 1000}; KStream stream; - MockProcessorSupplier processor = new MockProcessorSupplier<>(); stream = builder.stream(topicName, Consumed.with(intSerde, intSerde)); - stream.transformValues(valueTransformerSupplier).process(processor); + stream.transformValues(valueTransformerSupplier).process(supplier); driver = new TopologyTestDriver(builder.build(), props); @@ -118,7 +119,7 @@ public class KStreamTransformValuesTest { } String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"}; - assertArrayEquals(expected, processor.processed.toArray()); + assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray()); } @Test @@ -151,9 +152,8 @@ public class KStreamTransformValuesTest { final int[] expectedKeys = {1, 10, 100, 1000}; KStream stream; - MockProcessorSupplier processor = new MockProcessorSupplier<>(); stream = builder.stream(topicName, Consumed.with(intSerde, intSerde)); - stream.transformValues(valueTransformerSupplier).process(processor); + stream.transformValues(valueTransformerSupplier).process(supplier); driver = new TopologyTestDriver(builder.build(), props); @@ -162,7 +162,7 @@ public class KStreamTransformValuesTest { } String[] expected = {"1:11", "10:121", "100:1221", "1000:12221"}; - assertArrayEquals(expected, processor.processed.toArray()); + assertArrayEquals(expected, supplier.theCapturedProcessor().processed.toArray()); } @@ -226,13 +226,6 @@ public class KStreamTransformValuesTest { } catch (final StreamsException e) { // expected } - - try { - transformValueProcessor.punctuate(0); - fail("should not allow ValueTransformer#puntuate() to return not-null value"); - } catch (final StreamsException e) { - // expected - } } private static final class BadValueTransformer implements ValueTransformerWithKey { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index fc31db9..9050edb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -37,12 +37,14 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.List; import java.util.Properties; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; @@ -85,8 +87,8 @@ public class KStreamWindowAggregateTest { .groupByKey(Serialized.with(strSerde, strSerde)) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized"); - final MockProcessorSupplier, String> proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); + final MockProcessorSupplier, String> supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); driver = new TopologyTestDriver(builder.build(), props, 0L); @@ -128,7 +130,7 @@ public class KStreamWindowAggregateTest { "[B@5/15]:0+2+2+2+2", "[B@10/20]:0+2+2", "[C@5/15]:0+3+3", "[C@10/20]:0+3" ), - proc2.processed + supplier.theCapturedProcessor().processed ); } @@ -143,24 +145,22 @@ public class KStreamWindowAggregateTest { .groupByKey(Serialized.with(strSerde, strSerde)) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic1-Canonized"); - final MockProcessorSupplier, String> proc1 = new MockProcessorSupplier<>(); - table1.toStream().process(proc1); + final MockProcessorSupplier, String> supplier = new MockProcessorSupplier<>(); + table1.toStream().process(supplier); final KTable, String> table2 = builder .stream(topic2, Consumed.with(strSerde, strSerde)).groupByKey(Serialized.with(strSerde, strSerde)) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10).advanceBy(5), strSerde, "topic2-Canonized"); - final MockProcessorSupplier, String> proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); + table2.toStream().process(supplier); - final MockProcessorSupplier, String> proc3 = new MockProcessorSupplier<>(); table1.join(table2, new ValueJoiner() { @Override public String apply(final String p1, final String p2) { return p1 + "%" + p2; } - }).toStream().process(proc3); + }).toStream().process(supplier); driver = new TopologyTestDriver(builder.build(), props, 0L); @@ -170,15 +170,17 @@ public class KStreamWindowAggregateTest { driver.pipeInput(recordFactory.create(topic1, "D", "4", 3L)); driver.pipeInput(recordFactory.create(topic1, "A", "1", 4L)); - proc1.checkAndClearProcessResult( + final List, String>> processors = supplier.capturedProcessors(3); + + processors.get(0).checkAndClearProcessResult( "[A@0/10]:0+1", "[B@0/10]:0+2", "[C@0/10]:0+3", "[D@0/10]:0+4", "[A@0/10]:0+1+1" ); - proc2.checkAndClearProcessResult(); - proc3.checkAndClearProcessResult(); + processors.get(1).checkAndClearProcessResult(); + processors.get(2).checkAndClearProcessResult(); driver.pipeInput(recordFactory.create(topic1, "A", "1", 5L)); driver.pipeInput(recordFactory.create(topic1, "B", "2", 6L)); @@ -186,15 +188,15 @@ public class KStreamWindowAggregateTest { driver.pipeInput(recordFactory.create(topic1, "B", "2", 8L)); driver.pipeInput(recordFactory.create(topic1, "C", "3", 9L)); - proc1.checkAndClearProcessResult( + processors.get(0).checkAndClearProcessResult( "[A@0/10]:0+1+1+1", "[A@5/15]:0+1", "[B@0/10]:0+2+2", "[B@5/15]:0+2", "[D@0/10]:0+4+4", "[D@5/15]:0+4", "[B@0/10]:0+2+2+2", "[B@5/15]:0+2+2", "[C@0/10]:0+3+3", "[C@5/15]:0+3" ); - proc2.checkAndClearProcessResult(); - proc3.checkAndClearProcessResult(); + processors.get(1).checkAndClearProcessResult(); + processors.get(2).checkAndClearProcessResult(); driver.pipeInput(recordFactory.create(topic2, "A", "a", 0L)); driver.pipeInput(recordFactory.create(topic2, "B", "b", 1L)); @@ -202,15 +204,15 @@ public class KStreamWindowAggregateTest { driver.pipeInput(recordFactory.create(topic2, "D", "d", 3L)); driver.pipeInput(recordFactory.create(topic2, "A", "a", 4L)); - proc1.checkAndClearProcessResult(); - proc2.checkAndClearProcessResult( + processors.get(0).checkAndClearProcessResult(); + processors.get(1).checkAndClearProcessResult( "[A@0/10]:0+a", "[B@0/10]:0+b", "[C@0/10]:0+c", "[D@0/10]:0+d", "[A@0/10]:0+a+a" ); - proc3.checkAndClearProcessResult( + processors.get(2).checkAndClearProcessResult( "[A@0/10]:0+1+1+1%0+a", "[B@0/10]:0+2+2+2%0+b", "[C@0/10]:0+3+3%0+c", @@ -223,15 +225,15 @@ public class KStreamWindowAggregateTest { driver.pipeInput(recordFactory.create(topic2, "B", "b", 8L)); driver.pipeInput(recordFactory.create(topic2, "C", "c", 9L)); - proc1.checkAndClearProcessResult(); - proc2.checkAndClearProcessResult( + processors.get(0).checkAndClearProcessResult(); + processors.get(1).checkAndClearProcessResult( "[A@0/10]:0+a+a+a", "[A@5/15]:0+a", "[B@0/10]:0+b+b", "[B@5/15]:0+b", "[D@0/10]:0+d+d", "[D@5/15]:0+d", "[B@0/10]:0+b+b+b", "[B@5/15]:0+b+b", "[C@0/10]:0+c+c", "[C@5/15]:0+c" ); - proc3.checkAndClearProcessResult( + processors.get(2).checkAndClearProcessResult( "[A@0/10]:0+1+1+1%0+a+a+a", "[A@5/15]:0+1%0+a", "[B@0/10]:0+2+2+2%0+b+b", "[B@5/15]:0+2+2%0+b", "[D@0/10]:0+4+4%0+d+d", "[D@5/15]:0+4%0+d", diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index df8d292..a769b49 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockMapper; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; import org.junit.Before; @@ -50,9 +51,10 @@ import static org.junit.Assert.assertEquals; public class KTableAggregateTest { - final private Serde stringSerde = Serdes.String(); + private final Serde stringSerde = Serdes.String(); private final Consumed consumed = Consumed.with(stringSerde, stringSerde); private final Serialized stringSerialzied = Serialized.with(stringSerde, stringSerde); + private final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); private File stateDir = null; @@ -70,7 +72,7 @@ public class KTableAggregateTest { public void testAggBasic() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; - final MockProcessorSupplier proc = new MockProcessorSupplier<>(); + KTable table1 = builder.table(topic1, consumed); KTable table2 = table1.groupBy(MockMapper.noOpKeyValueMapper(), @@ -81,7 +83,7 @@ public class KTableAggregateTest { stringSerde, "topic1-Canonized"); - table2.toStream().process(proc); + table2.toStream().process(supplier); driver.setUp(builder, stateDir, Serdes.String(), Serdes.String()); @@ -110,7 +112,7 @@ public class KTableAggregateTest { "C:0+5", "D:0+6", "B:0+2-2+4-4+7", - "C:0+5-5+8"), proc.processed); + "C:0+5-5+8"), supplier.theCapturedProcessor().processed); } @@ -118,7 +120,6 @@ public class KTableAggregateTest { public void testAggCoalesced() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; - final MockProcessorSupplier proc = new MockProcessorSupplier<>(); KTable table1 = builder.table(topic1, consumed); KTable table2 = table1.groupBy(MockMapper.noOpKeyValueMapper(), @@ -129,7 +130,7 @@ public class KTableAggregateTest { stringSerde, "topic1-Canonized"); - table2.toStream().process(proc); + table2.toStream().process(supplier); driver.setUp(builder, stateDir); @@ -138,7 +139,7 @@ public class KTableAggregateTest { driver.process(topic1, "A", "4"); driver.flushState(); assertEquals(Utils.mkList( - "A:0+4"), proc.processed); + "A:0+4"), supplier.theCapturedProcessor().processed); } @@ -146,7 +147,6 @@ public class KTableAggregateTest { public void testAggRepartition() { final StreamsBuilder builder = new StreamsBuilder(); final String topic1 = "topic1"; - final MockProcessorSupplier proc = new MockProcessorSupplier<>(); KTable table1 = builder.table(topic1, consumed); KTable table2 = table1.groupBy(new KeyValueMapper>() { @@ -170,7 +170,7 @@ public class KTableAggregateTest { stringSerde, "topic1-Canonized"); - table2.toStream().process(proc); + table2.toStream().process(supplier); driver.setUp(builder, stateDir); @@ -200,10 +200,10 @@ public class KTableAggregateTest { "2:0+2-2", "4:0+4", //noop "4:0+4-4", "7:0+7" - ), proc.processed); + ), supplier.theCapturedProcessor().processed); } - private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier proc) { + private void testCountHelper(final StreamsBuilder builder, final String input, final MockProcessorSupplier supplier) { driver.setUp(builder, stateDir); driver.process(input, "A", "green"); @@ -225,53 +225,53 @@ public class KTableAggregateTest { "green:1", "blue:1", "yellow:1", "green:2" - ), proc.processed); + ), supplier.theCapturedProcessor().processed); } @Test public void testCount() { final StreamsBuilder builder = new StreamsBuilder(); final String input = "count-test-input"; - final MockProcessorSupplier proc = new MockProcessorSupplier<>(); builder.table(input, consumed) .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) .count("count") .toStream() - .process(proc); + .process(supplier); - testCountHelper(builder, input, proc); + testCountHelper(builder, input, supplier); } @Test public void testCountWithInternalStore() { final StreamsBuilder builder = new StreamsBuilder(); final String input = "count-test-input"; - final MockProcessorSupplier proc = new MockProcessorSupplier<>(); builder.table(input, consumed) .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) .count() .toStream() - .process(proc); + .process(supplier); - testCountHelper(builder, input, proc); + testCountHelper(builder, input, supplier); } @Test public void testCountCoalesced() { final StreamsBuilder builder = new StreamsBuilder(); final String input = "count-test-input"; - final MockProcessorSupplier proc = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); builder.table(input, consumed) .groupBy(MockMapper.selectValueKeyValueMapper(), stringSerialzied) .count("count") .toStream() - .process(proc); + .process(supplier); driver.setUp(builder, stateDir); + final MockProcessor proc = supplier.theCapturedProcessor(); + driver.process(input, "A", "green"); driver.process(input, "B", "green"); driver.process(input, "A", "blue"); @@ -291,7 +291,7 @@ public class KTableAggregateTest { public void testRemoveOldBeforeAddNew() { final StreamsBuilder builder = new StreamsBuilder(); final String input = "count-test-input"; - final MockProcessorSupplier proc = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); builder.table(input, consumed) .groupBy(new KeyValueMapper>() { @@ -321,10 +321,12 @@ public class KTableAggregateTest { } }, Serdes.String(), "someStore") .toStream() - .process(proc); + .process(supplier); driver.setUp(builder, stateDir); + final MockProcessor proc = supplier.theCapturedProcessor(); + driver.process(input, "11", "A"); driver.flushState(); driver.process(input, "12", "B"); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index 657e05d..bde771b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -21,11 +21,13 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.MockMapper; @@ -35,6 +37,7 @@ import org.junit.Rule; import org.junit.Test; import java.io.File; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -57,10 +60,9 @@ public class KTableFilterTest { final KTable table2, final KTable table3, final String topic) { - MockProcessorSupplier proc2 = new MockProcessorSupplier<>(); - MockProcessorSupplier proc3 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); - table3.toStream().process(proc3); + MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); + table3.toStream().process(supplier); driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer()); @@ -73,8 +75,10 @@ public class KTableFilterTest { driver.process(topic, "B", null); driver.flushState(); - proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"); - proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null"); + final List> processors = supplier.capturedProcessors(2); + + processors.get(0).checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null"); + processors.get(1).checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null"); } @Test @@ -269,11 +273,10 @@ public class KTableFilterTest { final KTableImpl table1, final KTableImpl table2, final String topic1) { - MockProcessorSupplier proc1 = new MockProcessorSupplier<>(); - MockProcessorSupplier proc2 = new MockProcessorSupplier<>(); + MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc1", proc1, table1.name); - builder.build().addProcessor("proc2", proc2, table2.name); + builder.build().addProcessor("proc1", supplier, table1.name); + builder.build().addProcessor("proc2", supplier, table2.name); driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer()); @@ -282,25 +285,27 @@ public class KTableFilterTest { driver.process(topic1, "C", 1); driver.flushState(); - proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); + final List> processors = supplier.capturedProcessors(2); + + processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)"); driver.process(topic1, "A", 2); driver.process(topic1, "B", 2); driver.flushState(); - proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); - proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + processors.get(0).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); driver.process(topic1, "A", 3); driver.flushState(); - proc1.checkAndClearProcessResult("A:(3<-null)"); - proc2.checkAndClearProcessResult("A:(null<-null)"); + processors.get(0).checkAndClearProcessResult("A:(3<-null)"); + processors.get(1).checkAndClearProcessResult("A:(null<-null)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); driver.flushState(); - proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); - proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + processors.get(0).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); + processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)"); } @@ -348,11 +353,11 @@ public class KTableFilterTest { final String topic1) { table2.enableSendingOldValues(); - MockProcessorSupplier proc1 = new MockProcessorSupplier<>(); - MockProcessorSupplier proc2 = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + final Topology topology = builder.build(); - builder.build().addProcessor("proc1", proc1, table1.name); - builder.build().addProcessor("proc2", proc2, table2.name); + topology.addProcessor("proc1", supplier, table1.name); + topology.addProcessor("proc2", supplier, table2.name); driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer()); @@ -361,25 +366,27 @@ public class KTableFilterTest { driver.process(topic1, "C", 1); driver.flushState(); - proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); - proc2.checkEmptyAndClearProcessResult(); + final List> processors = supplier.capturedProcessors(2); + + processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)"); + processors.get(1).checkEmptyAndClearProcessResult(); driver.process(topic1, "A", 2); driver.process(topic1, "B", 2); driver.flushState(); - proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); - proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); + processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)"); + processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)"); driver.process(topic1, "A", 3); driver.flushState(); - proc1.checkAndClearProcessResult("A:(3<-2)"); - proc2.checkAndClearProcessResult("A:(null<-2)"); + processors.get(0).checkAndClearProcessResult("A:(3<-2)"); + processors.get(1).checkAndClearProcessResult("A:(null<-2)"); driver.process(topic1, "A", null); driver.process(topic1, "B", null); driver.flushState(); - proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)"); - proc2.checkAndClearProcessResult("B:(null<-2)"); + processors.get(0).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)"); + processors.get(1).checkAndClearProcessResult("B:(null<-2)"); } @Test @@ -424,11 +431,11 @@ public class KTableFilterTest { final KTableImpl table1, final KTableImpl table2, final String topic1) { - MockProcessorSupplier proc1 = new MockProcessorSupplier<>(); - MockProcessorSupplier proc2 = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + final Topology topology = builder.build(); - builder.build().addProcessor("proc1", proc1, table1.name); - builder.build().addProcessor("proc2", proc2, table2.name); + topology.addProcessor("proc1", supplier, table1.name); + topology.addProcessor("proc2", supplier, table2.name); driver.setUp(builder, stateDir, stringSerde, stringSerde); @@ -436,8 +443,10 @@ public class KTableFilterTest { driver.process(topic1, "B", "reject"); driver.process(topic1, "C", "reject"); driver.flushState(); - proc1.checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)"); - proc2.checkEmptyAndClearProcessResult(); + + final List> processors = supplier.capturedProcessors(2); + processors.get(0).checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)"); + processors.get(1).checkEmptyAndClearProcessResult(); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index a7aed2e..ae1e285 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; import org.apache.kafka.test.MockMapper; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.MockValueJoiner; @@ -47,6 +48,7 @@ import org.junit.Test; import java.io.File; import java.lang.reflect.Field; +import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -80,8 +82,8 @@ public class KTableImplTest { KTable table1 = builder.table(topic1, consumed); - MockProcessorSupplier proc1 = new MockProcessorSupplier<>(); - table1.toStream().process(proc1); + MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + table1.toStream().process(supplier); KTable table2 = table1.mapValues(new ValueMapper() { @Override @@ -90,8 +92,7 @@ public class KTableImplTest { } }); - MockProcessorSupplier proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); + table2.toStream().process(supplier); KTable table3 = table2.filter(new Predicate() { @Override @@ -100,13 +101,11 @@ public class KTableImplTest { } }); - MockProcessorSupplier proc3 = new MockProcessorSupplier<>(); - table3.toStream().process(proc3); + table3.toStream().process(supplier); KTable table4 = table1.through(stringSerde, stringSerde, topic2, storeName2); - MockProcessorSupplier proc4 = new MockProcessorSupplier<>(); - table4.toStream().process(proc4); + table4.toStream().process(supplier); driver.setUp(builder, stateDir); @@ -120,10 +119,11 @@ public class KTableImplTest { driver.flushState(); driver.flushState(); - assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc1.processed); - assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed); - assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4"), proc3.processed); - assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), proc4.processed); + final List> processors = supplier.capturedProcessors(4); + assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), processors.get(0).processed); + assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), processors.get(1).processed); + assertEquals(Utils.mkList("A:null", "B:2", "C:null", "D:4"), processors.get(2).processed); + assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), processors.get(3).processed); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java index 9f5603b..0ca388f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.TestUtils; @@ -67,7 +68,7 @@ public class KTableKTableInnerJoinTest { private void doTestJoin(final StreamsBuilder builder, final int[] expectedKeys, - final MockProcessorSupplier processor, + final MockProcessorSupplier supplier, final KTable joined) { final Collection> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -79,6 +80,8 @@ public class KTableKTableInnerJoinTest { driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String()); driver.setTime(0L); + final MockProcessor processor = supplier.theCapturedProcessor(); + final KTableValueGetter getter = getterSupplier.get(); getter.init(driver.context()); @@ -168,15 +171,13 @@ public class KTableKTableInnerJoinTest { final KTable table1; final KTable table2; final KTable joined; - final MockProcessorSupplier processor; - - processor = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER); - joined.toStream().process(processor); + joined.toStream().process(supplier); - doTestJoin(builder, expectedKeys, processor, joined); + doTestJoin(builder, expectedKeys, supplier, joined); } @Test @@ -203,13 +204,15 @@ public class KTableKTableInnerJoinTest { final int[] expectedKeys, final KTable table1, final KTable table2, - final MockProcessorSupplier proc, + final MockProcessorSupplier supplier, final KTable joined, final boolean sendOldValues) { driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String()); driver.setTime(0L); + final MockProcessor proc = supplier.theCapturedProcessor(); + if (!sendOldValues) { assertFalse(((KTableImpl) table1).sendingOldValueEnabled()); assertFalse(((KTableImpl) table2).sendingOldValueEnabled()); @@ -288,15 +291,15 @@ public class KTableKTableInnerJoinTest { final KTable table1; final KTable table2; final KTable joined; - final MockProcessorSupplier proc; + final MockProcessorSupplier supplier; table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER); - proc = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", proc, ((KTableImpl) joined).name); + supplier = new MockProcessorSupplier<>(); + builder.build().addProcessor("proc", supplier, ((KTableImpl) joined).name); - doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false); + doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, false); } @@ -309,15 +312,15 @@ public class KTableKTableInnerJoinTest { final KTable table1; final KTable table2; final KTable joined; - final MockProcessorSupplier proc; + final MockProcessorSupplier supplier; table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName"); - proc = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", proc, ((KTableImpl) joined).name); + supplier = new MockProcessorSupplier<>(); + builder.build().addProcessor("proc", supplier, ((KTableImpl) joined).name); - doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, false); + doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, false); } @@ -330,16 +333,16 @@ public class KTableKTableInnerJoinTest { final KTable table1; final KTable table2; final KTable joined; - final MockProcessorSupplier proc; + final MockProcessorSupplier supplier; table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER); - proc = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", proc, ((KTableImpl) joined).name); + supplier = new MockProcessorSupplier<>(); + builder.build().addProcessor("proc", supplier, ((KTableImpl) joined).name); - doTestSendingOldValues(builder, expectedKeys, table1, table2, proc, joined, true); + doTestSendingOldValues(builder, expectedKeys, table1, table2, supplier, joined, true); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 6331b57..2eef302 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.MockValueJoiner; @@ -80,9 +81,8 @@ public class KTableKTableLeftJoinTest { final KTable table1 = builder.table(topic1, consumed); final KTable table2 = builder.table(topic2, consumed); final KTable joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER); - final MockProcessorSupplier processor; - processor = new MockProcessorSupplier<>(); - joined.toStream().process(processor); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + joined.toStream().process(supplier); final Collection> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -94,6 +94,8 @@ public class KTableKTableLeftJoinTest { driver.setUp(builder, stateDir); driver.setTime(0L); + final MockProcessor processor = supplier.theCapturedProcessor(); + final KTableValueGetter getter = getterSupplier.get(); getter.init(driver.context()); @@ -174,18 +176,20 @@ public class KTableKTableLeftJoinTest { final KTable table1; final KTable table2; final KTable joined; - final MockProcessorSupplier proc; + final MockProcessorSupplier supplier; table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER); - proc = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", proc, ((KTableImpl) joined).name); + supplier = new MockProcessorSupplier<>(); + builder.build().addProcessor("proc", supplier, ((KTableImpl) joined).name); driver.setUp(builder, stateDir); driver.setTime(0L); + final MockProcessor proc = supplier.theCapturedProcessor(); + assertTrue(((KTableImpl) table1).sendingOldValueEnabled()); assertFalse(((KTableImpl) table2).sendingOldValueEnabled()); assertFalse(((KTableImpl) joined).sendingOldValueEnabled()); @@ -255,7 +259,7 @@ public class KTableKTableLeftJoinTest { final KTable table1; final KTable table2; final KTable joined; - final MockProcessorSupplier proc; + final MockProcessorSupplier supplier; table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); @@ -263,12 +267,14 @@ public class KTableKTableLeftJoinTest { ((KTableImpl) joined).enableSendingOldValues(); - proc = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", proc, ((KTableImpl) joined).name); + supplier = new MockProcessorSupplier<>(); + builder.build().addProcessor("proc", supplier, ((KTableImpl) joined).name); driver.setUp(builder, stateDir); driver.setTime(0L); + final MockProcessor proc = supplier.theCapturedProcessor(); + assertTrue(((KTableImpl) table1).sendingOldValueEnabled()); assertTrue(((KTableImpl) table2).sendingOldValueEnabled()); assertTrue(((KTableImpl) joined).sendingOldValueEnabled()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index 16694d8..cf3321f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.TestUtils; @@ -74,13 +75,13 @@ public class KTableKTableOuterJoinTest { final KTable table1; final KTable table2; final KTable joined; - final MockProcessorSupplier processor; + final MockProcessorSupplier supplier; - processor = new MockProcessorSupplier<>(); + supplier = new MockProcessorSupplier<>(); table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER); - joined.toStream().process(processor); + joined.toStream().process(supplier); final Collection> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -91,6 +92,8 @@ public class KTableKTableOuterJoinTest { driver.setUp(builder, stateDir); + final MockProcessor processor = supplier.theCapturedProcessor(); + final KTableValueGetter getter = getterSupplier.get(); getter.init(driver.context()); @@ -179,17 +182,19 @@ public class KTableKTableOuterJoinTest { final KTable table1; final KTable table2; final KTable joined; - final MockProcessorSupplier proc; + final MockProcessorSupplier supplier; table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER); - proc = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", proc, ((KTableImpl) joined).name); + supplier = new MockProcessorSupplier<>(); + builder.build().addProcessor("proc", supplier, ((KTableImpl) joined).name); driver.setUp(builder, stateDir); + final MockProcessor proc = supplier.theCapturedProcessor(); + assertTrue(((KTableImpl) table1).sendingOldValueEnabled()); assertTrue(((KTableImpl) table2).sendingOldValueEnabled()); assertFalse(((KTableImpl) joined).sendingOldValueEnabled()); @@ -267,7 +272,7 @@ public class KTableKTableOuterJoinTest { final KTable table1; final KTable table2; final KTable joined; - final MockProcessorSupplier proc; + final MockProcessorSupplier supplier; table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); @@ -275,11 +280,13 @@ public class KTableKTableOuterJoinTest { ((KTableImpl) joined).enableSendingOldValues(); - proc = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", proc, ((KTableImpl) joined).name); + supplier = new MockProcessorSupplier<>(); + builder.build().addProcessor("proc", supplier, ((KTableImpl) joined).name); driver.setUp(builder, stateDir); + final MockProcessor proc = supplier.theCapturedProcessor(); + assertTrue(((KTableImpl) table1).sendingOldValueEnabled()); assertTrue(((KTableImpl) table2).sendingOldValueEnabled()); assertTrue(((KTableImpl) joined).sendingOldValueEnabled()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java index 81797cb..78c7902 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java @@ -78,9 +78,9 @@ public class KTableMapKeysTest { final int[] originalKeys = new int[]{1, 2, 3}; final String[] values = new String[]{"V_ONE", "V_TWO", "V_THREE"}; - MockProcessorSupplier processor = new MockProcessorSupplier<>(); + MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - convertedStream.process(processor); + convertedStream.process(supplier); driver.setUp(builder, stateDir); for (int i = 0; i < originalKeys.length; i++) { @@ -88,10 +88,10 @@ public class KTableMapKeysTest { } driver.flushState(); - assertEquals(3, processor.processed.size()); + assertEquals(3, supplier.theCapturedProcessor().processed.size()); for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], processor.processed.get(i)); + assertEquals(expected[i], supplier.theCapturedProcessor().processed.get(i)); } } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index 5d92846..3cd7701 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; import org.junit.Before; @@ -54,7 +55,7 @@ public class KTableMapValuesTest { stateDir = TestUtils.tempDirectory("kafka-test"); } - private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier proc2) { + private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier supplier) { driver.setUp(builder, stateDir, Serdes.String(), Serdes.String()); driver.process(topic1, "A", "1"); @@ -62,7 +63,7 @@ public class KTableMapValuesTest { driver.process(topic1, "C", "3"); driver.process(topic1, "D", "4"); driver.flushState(); - assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), proc2.processed); + assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), supplier.theCapturedProcessor().processed); } @Test @@ -79,10 +80,10 @@ public class KTableMapValuesTest { } }); - MockProcessorSupplier proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); + MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); - doTestKTable(builder, topic1, proc2); + doTestKTable(builder, topic1, supplier); } @Test @@ -99,10 +100,10 @@ public class KTableMapValuesTest { } }, Materialized.>as("anyName").withValueSerde(Serdes.Integer())); - MockProcessorSupplier proc2 = new MockProcessorSupplier<>(); - table2.toStream().process(proc2); + MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + table2.toStream().process(supplier); - doTestKTable(builder, topic1, proc2); + doTestKTable(builder, topic1, supplier); } private void doTestValueGetter(final StreamsBuilder builder, @@ -282,11 +283,14 @@ public class KTableMapValuesTest { } }); - MockProcessorSupplier proc = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", proc, table2.name); + builder.build().addProcessor("proc", supplier, table2.name); driver.setUp(builder, stateDir); + + final MockProcessor proc = supplier.theCapturedProcessor(); + assertFalse(table1.sendingOldValueEnabled()); assertFalse(table2.sendingOldValueEnabled()); @@ -332,11 +336,14 @@ public class KTableMapValuesTest { table2.enableSendingOldValues(); - MockProcessorSupplier proc = new MockProcessorSupplier<>(); + MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", proc, table2.name); + builder.build().addProcessor("proc", supplier, table2.name); driver.setUp(builder, stateDir); + + final MockProcessor proc = supplier.theCapturedProcessor(); + assertTrue(table1.sendingOldValueEnabled()); assertTrue(table2.sendingOldValueEnabled()); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 97c9c7f..70efb41 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; import org.junit.Before; @@ -61,8 +62,8 @@ public class KTableSourceTest { final KTable table1 = builder.table(topic1, Consumed.with(stringSerde, intSerde)); - final MockProcessorSupplier proc1 = new MockProcessorSupplier<>(); - table1.toStream().process(proc1); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); + table1.toStream().process(supplier); driver.setUp(builder, stateDir); driver.process(topic1, "A", 1); @@ -74,7 +75,7 @@ public class KTableSourceTest { driver.process(topic1, "B", null); driver.flushState(); - assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), proc1.processed); + assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), supplier.theCapturedProcessor().processed); } @Test @@ -145,11 +146,14 @@ public class KTableSourceTest { final KTableImpl table1 = (KTableImpl) builder.table(topic1, stringConsumed); - final MockProcessorSupplier proc1 = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc1", proc1, table1.name); + builder.build().addProcessor("proc1", supplier, table1.name); driver.setUp(builder, stateDir); + + final MockProcessor proc1 = supplier.theCapturedProcessor(); + driver.process(topic1, "A", "01"); driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); @@ -187,12 +191,14 @@ public class KTableSourceTest { assertTrue(table1.sendingOldValueEnabled()); - final MockProcessorSupplier proc1 = new MockProcessorSupplier<>(); + final MockProcessorSupplier supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc1", proc1, table1.name); + builder.build().addProcessor("proc1", supplier, table1.name); driver.setUp(builder, stateDir); + final MockProcessor proc1 = supplier.theCapturedProcessor(); + driver.process(topic1, "A", "01"); driver.process(topic1, "B", "01"); driver.process(topic1, "C", "01"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index e3b888d..d1d25e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -638,10 +638,6 @@ public class TopologyBuilderTest { } @Override - public void punctuate(long timestamp) { - } - - @Override public void close() { } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index aac275d..43dc38e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -177,9 +177,6 @@ public class AbstractProcessorContextTest { } @Override - public void schedule(final long interval) {} - - @Override public void forward(final K key, final V value) {} @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index 5637dab..f3e369f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -64,8 +64,8 @@ public class GlobalStateTaskTest { new String[]{topic2}, new IntegerDeserializer(), new IntegerDeserializer()); - private final MockProcessorNode processorOne = new MockProcessorNode<>(-1); - private final MockProcessorNode processorTwo = new MockProcessorNode<>(-1); + private final MockProcessorNode processorOne = new MockProcessorNode<>(); + private final MockProcessorNode processorTwo = new MockProcessorNode<>(); private final Map offsets = new HashMap<>(); private final NoOpProcessorContext context = new NoOpProcessorContext(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index b3663fa..149a158 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -607,9 +607,6 @@ public class InternalTopologyBuilderTest { public void process(final Object key, final Object value) { } @Override - public void punctuate(final long timestamp) { } - - @Override public void close() { } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index a7a2610..0992063 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -62,11 +62,6 @@ public class ProcessorNodeTest { } @Override - public void punctuate(final long timestamp) { - throw new RuntimeException(); - } - - @Override public void close() { throw new RuntimeException(); } @@ -84,11 +79,6 @@ public class ProcessorNodeTest { } @Override - public void punctuate(final long timestamp) { - - } - - @Override public void close() { } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index a80b25d..51d4e05 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -34,7 +34,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.test.ConsumerRecordFactory; @@ -465,11 +464,6 @@ public class ProcessorTopologyTest { public void process(final String key, final String value) { context().forward(key, value); } - - @Override - public void punctuate(final long streamTime) { - context().forward(Long.toString(streamTime), "punctuate"); - } } /** @@ -510,14 +504,6 @@ public class ProcessorTopologyTest { context().forward(key, value + "(" + (i + 1) + ")", i); } } - - @SuppressWarnings("deprecation") - @Override - public void punctuate(final long streamTime) { - for (int i = 0; i != numChildren; ++i) { - context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", i); - } - } } /** @@ -538,19 +524,10 @@ public class ProcessorTopologyTest { context().forward(key, value + "(" + (i + 1) + ")", "sink" + i); } } - - @SuppressWarnings("deprecation") - @Override - public void punctuate(final long streamTime) { - for (int i = 0; i != numChildren; ++i) { - context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i); - } - } } /** - * A processor that stores each key-value pair in an in-memory key-value store registered with the context. When - * {@link #punctuate(long)} is called, it outputs the total number of entries in the store. + * A processor that stores each key-value pair in an in-memory key-value store registered with the context. */ protected static class StatefulProcessor extends AbstractProcessor { private KeyValueStore store; @@ -573,18 +550,6 @@ public class ProcessorTopologyTest { } @Override - public void punctuate(final long streamTime) { - int count = 0; - try (KeyValueIterator iter = store.all()) { - while (iter.hasNext()) { - iter.next(); - ++count; - } - } - context().forward(Long.toString(streamTime), count); - } - - @Override public void close() { store.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java index e799688..ee0d5a3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java @@ -21,26 +21,24 @@ import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.test.MockProcessorNode; import org.junit.Test; -import java.util.ArrayList; - import static org.junit.Assert.assertEquals; public class PunctuationQueueTest { + private final MockProcessorNode node = new MockProcessorNode<>(); + private final PunctuationQueue queue = new PunctuationQueue(); + private final Punctuator punctuator = new Punctuator() { + @Override + public void punctuate(final long timestamp) { + node.mockProcessor.punctuatedStreamTime.add(timestamp); + } + }; + @Test public void testPunctuationInterval() { - final TestProcessor processor = new TestProcessor(); - final ProcessorNode node = new ProcessorNode<>("test", processor, null); - final PunctuationQueue queue = new PunctuationQueue(); - final Punctuator punctuator = new Punctuator() { - @Override - public void punctuate(long timestamp) { - node.processor().punctuate(timestamp); - } - }; - final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator); final long now = sched.timestamp - 100L; @@ -54,42 +52,32 @@ public class PunctuationQueueTest { }; queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(0, processor.punctuatedAt.size()); + assertEquals(0, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 99L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(0, processor.punctuatedAt.size()); + assertEquals(0, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(1, processor.punctuatedAt.size()); + assertEquals(1, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 199L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(1, processor.punctuatedAt.size()); + assertEquals(1, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(2, processor.punctuatedAt.size()); + assertEquals(2, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 1001L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(3, processor.punctuatedAt.size()); + assertEquals(3, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 1002L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(3, processor.punctuatedAt.size()); + assertEquals(3, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 1100L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(4, processor.punctuatedAt.size()); + assertEquals(4, node.mockProcessor.punctuatedStreamTime.size()); } @Test public void testPunctuationIntervalCustomAlignment() { - final TestProcessor processor = new TestProcessor(); - final ProcessorNode node = new ProcessorNode<>("test", processor, null); - final PunctuationQueue queue = new PunctuationQueue(); - final Punctuator punctuator = new Punctuator() { - @Override - public void punctuate(long timestamp) { - node.processor().punctuate(timestamp); - } - }; - final PunctuationSchedule sched = new PunctuationSchedule(node, 50L, 100L, punctuator); final long now = sched.timestamp - 50L; @@ -103,42 +91,32 @@ public class PunctuationQueueTest { }; queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(0, processor.punctuatedAt.size()); + assertEquals(0, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 49L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(0, processor.punctuatedAt.size()); + assertEquals(0, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 50L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(1, processor.punctuatedAt.size()); + assertEquals(1, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 149L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(1, processor.punctuatedAt.size()); + assertEquals(1, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 150L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(2, processor.punctuatedAt.size()); + assertEquals(2, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 1051L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(3, processor.punctuatedAt.size()); + assertEquals(3, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 1052L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(3, processor.punctuatedAt.size()); + assertEquals(3, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 1150L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(4, processor.punctuatedAt.size()); + assertEquals(4, node.mockProcessor.punctuatedStreamTime.size()); } @Test public void testPunctuationIntervalCancelFromPunctuator() { - final TestProcessor processor = new TestProcessor(); - final ProcessorNode node = new ProcessorNode<>("test", processor, null); - final PunctuationQueue queue = new PunctuationQueue(); - final Punctuator punctuator = new Punctuator() { - @Override - public void punctuate(long timestamp) { - node.processor().punctuate(timestamp); - } - }; - final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator); final long now = sched.timestamp - 100L; @@ -154,35 +132,25 @@ public class PunctuationQueueTest { }; queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(0, processor.punctuatedAt.size()); + assertEquals(0, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(1, processor.punctuatedAt.size()); + assertEquals(1, node.mockProcessor.punctuatedStreamTime.size()); queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator); - assertEquals(1, processor.punctuatedAt.size()); + assertEquals(1, node.mockProcessor.punctuatedStreamTime.size()); } private static class TestProcessor extends AbstractProcessor { - public final ArrayList punctuatedAt = new ArrayList<>(); - @Override - public void init(ProcessorContext context) { - } + public void init(ProcessorContext context) {} @Override - public void process(String key, String value) { - } + public void process(String key, String value) {} @Override - public void punctuate(long streamTime) { - punctuatedAt.add(streamTime); - } - - @Override - public void close() { - } + public void close() {} } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 598e47e..3a0fc4e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -393,7 +393,7 @@ public class StreamTaskTest { assertFalse(task.process()); assertFalse(task.maybePunctuateStreamTime()); - processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L); + processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 0L, 20L, 32L, 40L, 60L); } @SuppressWarnings("unchecked") @@ -479,7 +479,7 @@ public class StreamTaskTest { assertFalse(task.process()); assertFalse(task.maybePunctuateStreamTime()); - processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L); + processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L); } @SuppressWarnings("unchecked") @@ -509,11 +509,11 @@ public class StreamTaskTest { assertTrue(task.process()); - processorStreamTime.supplier.scheduleCancellable.cancel(); + processorStreamTime.mockProcessor.scheduleCancellable.cancel(); assertFalse(task.maybePunctuateStreamTime()); - processorStreamTime.supplier.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L); + processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L); } @Test @@ -533,7 +533,7 @@ public class StreamTaskTest { time.sleep(20); assertTrue(task.maybePunctuateSystemTime()); assertFalse(task.maybePunctuateSystemTime()); - processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30, now + 50); + processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30, now + 50); } @Test @@ -544,7 +544,7 @@ public class StreamTaskTest { assertFalse(task.maybePunctuateSystemTime()); time.sleep(9); assertFalse(task.maybePunctuateSystemTime()); - processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME); + processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME); } @Test @@ -570,7 +570,7 @@ public class StreamTaskTest { time.sleep(5); // punctuate at now + 240, still aligned on the initial punctuation assertTrue(task.maybePunctuateSystemTime()); assertFalse(task.maybePunctuateSystemTime()); - processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240); + processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240); } @Test @@ -581,10 +581,10 @@ public class StreamTaskTest { final long now = time.milliseconds(); time.sleep(10); assertTrue(task.maybePunctuateSystemTime()); - processorSystemTime.supplier.scheduleCancellable.cancel(); + processorSystemTime.mockProcessor.scheduleCancellable.cancel(); time.sleep(10); assertFalse(task.maybePunctuateSystemTime()); - processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10); + processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 3ae7acb..5bc1934 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -894,10 +894,6 @@ public class StreamThreadTest { @Override public void process(final Object key, final Object value) {} - @SuppressWarnings("deprecation") - @Override - public void punctuate(final long timestamp) {} - @Override public void close() {} }; diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 5e61910..27a0094 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -198,9 +198,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple } @Override - public void schedule(final long interval) { } - - @Override public void commit() { } @Override diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index c93a306..cf4460d 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -205,21 +205,6 @@ public class KStreamTestDriver extends ExternalResource { return topicNode; } - public void punctuate(final long timestamp) { - final ProcessorNode prevNode = context.currentNode(); - for (final ProcessorNode processor : topology.processors()) { - if (processor.processor() != null) { - context.setRecordContext(createRecordContext(context.topic(), timestamp)); - context.setCurrentNode(processor); - try { - processor.processor().punctuate(timestamp); - } finally { - context.setCurrentNode(prevNode); - } - } - } - } - public void setTime(final long timestamp) { context.setTime(timestamp); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java similarity index 53% copy from streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java copy to streams/src/test/java/org/apache/kafka/test/MockProcessor.java index bdc8d40..927be0b 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java @@ -18,9 +18,7 @@ package org.apache.kafka.test; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Cancellable; -import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; @@ -28,7 +26,7 @@ import java.util.ArrayList; import static org.junit.Assert.assertEquals; -public class MockProcessorSupplier implements ProcessorSupplier { +public class MockProcessor extends AbstractProcessor { public final ArrayList processed = new ArrayList<>(); public final ArrayList processedKeys = new ArrayList<>(); @@ -37,67 +35,50 @@ public class MockProcessorSupplier implements ProcessorSupplier { public final ArrayList punctuatedStreamTime = new ArrayList<>(); public final ArrayList punctuatedSystemTime = new ArrayList<>(); - private final long scheduleInterval; - private final PunctuationType punctuationType; public Cancellable scheduleCancellable; - public MockProcessorSupplier() { - this(-1L); - } - - public MockProcessorSupplier(long scheduleInterval) { - this(scheduleInterval, PunctuationType.STREAM_TIME); - } + private final PunctuationType punctuationType; + private final long scheduleInterval; - public MockProcessorSupplier(long scheduleInterval, PunctuationType punctuationType) { - this.scheduleInterval = scheduleInterval; + public MockProcessor(final PunctuationType punctuationType, final long scheduleInterval) { this.punctuationType = punctuationType; + this.scheduleInterval = scheduleInterval; } - @Override - public Processor get() { - return new MockProcessor(punctuationType); + public MockProcessor() { + this(PunctuationType.STREAM_TIME, -1); } - public class MockProcessor extends AbstractProcessor { - - PunctuationType punctuationType; - - public MockProcessor(PunctuationType punctuationType) { - this.punctuationType = punctuationType; - } - - @Override - public void init(ProcessorContext context) { - super.init(context); - if (scheduleInterval > 0L) { - scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() { - @Override - public void punctuate(long timestamp) { - if (punctuationType == PunctuationType.STREAM_TIME) { - assertEquals(timestamp, context().timestamp()); - } - assertEquals(-1, context().partition()); - assertEquals(-1L, context().offset()); - - (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime) - .add(timestamp); + @Override + public void init(final ProcessorContext context) { + super.init(context); + if (scheduleInterval > 0L) { + scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() { + @Override + public void punctuate(final long timestamp) { + if (punctuationType == PunctuationType.STREAM_TIME) { + assertEquals(timestamp, context().timestamp()); } - }); - } + assertEquals(-1, context().partition()); + assertEquals(-1L, context().offset()); + + (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime) + .add(timestamp); + } + }); } + } - @Override - public void process(K key, V value) { - processedKeys.add(key); - processedValues.add(value); - processed.add((key == null ? "null" : key) + ":" + - (value == null ? "null" : value)); + @Override + public void process(final K key, final V value) { + processedKeys.add(key); + processedValues.add(value); + processed.add((key == null ? "null" : key) + ":" + + (value == null ? "null" : value)); - } } - public void checkAndClearProcessResult(String... expected) { + public void checkAndClearProcessResult(final String... expected) { assertEquals("the number of outputs:" + processed, expected.length, processed.size()); for (int i = 0; i < expected.length; i++) { assertEquals("output[" + i + "]:", expected[i], processed.get(i)); @@ -107,13 +88,12 @@ public class MockProcessorSupplier implements ProcessorSupplier { } public void checkEmptyAndClearProcessResult() { - assertEquals("the number of outputs:", 0, processed.size()); processed.clear(); } - public void checkAndClearPunctuateResult(PunctuationType type, long... expected) { - ArrayList punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime; + public void checkAndClearPunctuateResult(final PunctuationType type, final long... expected) { + final ArrayList punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime; assertEquals("the number of outputs:", expected.length, punctuated.size()); for (int i = 0; i < expected.length; i++) { @@ -122,5 +102,4 @@ public class MockProcessorSupplier implements ProcessorSupplier { processed.clear(); } - } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java index a526bfd..094cb03 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java @@ -17,7 +17,6 @@ package org.apache.kafka.test; import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; @@ -29,9 +28,9 @@ public class MockProcessorNode extends ProcessorNode { private static final String NAME = "MOCK-PROCESS-"; private static final AtomicInteger INDEX = new AtomicInteger(1); - public final MockProcessorSupplier supplier; + public final MockProcessor mockProcessor; + public boolean closed; - public long punctuatedAt; public boolean initialized; public MockProcessorNode(long scheduleInterval) { @@ -39,13 +38,17 @@ public class MockProcessorNode extends ProcessorNode { } public MockProcessorNode(long scheduleInterval, PunctuationType punctuationType) { - this(new MockProcessorSupplier(scheduleInterval, punctuationType)); + this(new MockProcessor(punctuationType, scheduleInterval)); + } + + public MockProcessorNode() { + this(new MockProcessor()); } - private MockProcessorNode(MockProcessorSupplier supplier) { - super(NAME + INDEX.getAndIncrement(), supplier.get(), Collections.emptySet()); + private MockProcessorNode(final MockProcessor mockProcessor) { + super(NAME + INDEX.getAndIncrement(), mockProcessor, Collections.emptySet()); - this.supplier = supplier; + this.mockProcessor = mockProcessor; } @Override @@ -60,12 +63,6 @@ public class MockProcessorNode extends ProcessorNode { } @Override - public void punctuate(final long timestamp, final Punctuator punctuator) { - super.punctuate(timestamp, punctuator); - this.punctuatedAt = timestamp; - } - - @Override public void close() { super.close(); this.closed = true; diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java index bdc8d40..aec47a4 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java @@ -16,30 +16,20 @@ */ package org.apache.kafka.test; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.Punctuator; import java.util.ArrayList; +import java.util.List; import static org.junit.Assert.assertEquals; public class MockProcessorSupplier implements ProcessorSupplier { - public final ArrayList processed = new ArrayList<>(); - public final ArrayList processedKeys = new ArrayList<>(); - public final ArrayList processedValues = new ArrayList<>(); - - public final ArrayList punctuatedStreamTime = new ArrayList<>(); - public final ArrayList punctuatedSystemTime = new ArrayList<>(); - private final long scheduleInterval; private final PunctuationType punctuationType; - public Cancellable scheduleCancellable; + private final List> processors = new ArrayList<>(); public MockProcessorSupplier() { this(-1L); @@ -56,71 +46,20 @@ public class MockProcessorSupplier implements ProcessorSupplier { @Override public Processor get() { - return new MockProcessor(punctuationType); - } - - public class MockProcessor extends AbstractProcessor { - - PunctuationType punctuationType; - - public MockProcessor(PunctuationType punctuationType) { - this.punctuationType = punctuationType; - } - - @Override - public void init(ProcessorContext context) { - super.init(context); - if (scheduleInterval > 0L) { - scheduleCancellable = context.schedule(scheduleInterval, punctuationType, new Punctuator() { - @Override - public void punctuate(long timestamp) { - if (punctuationType == PunctuationType.STREAM_TIME) { - assertEquals(timestamp, context().timestamp()); - } - assertEquals(-1, context().partition()); - assertEquals(-1L, context().offset()); - - (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime) - .add(timestamp); - } - }); - } - } - - @Override - public void process(K key, V value) { - processedKeys.add(key); - processedValues.add(value); - processed.add((key == null ? "null" : key) + ":" + - (value == null ? "null" : value)); - - } - } - - public void checkAndClearProcessResult(String... expected) { - assertEquals("the number of outputs:" + processed, expected.length, processed.size()); - for (int i = 0; i < expected.length; i++) { - assertEquals("output[" + i + "]:", expected[i], processed.get(i)); - } - - processed.clear(); + final MockProcessor processor = new MockProcessor<>(punctuationType, scheduleInterval); + processors.add(processor); + return processor; } - public void checkEmptyAndClearProcessResult() { - - assertEquals("the number of outputs:", 0, processed.size()); - processed.clear(); + // get the captured processor assuming that only one processor gets returned from this supplier + public MockProcessor theCapturedProcessor() { + return capturedProcessors(1).get(0); } - public void checkAndClearPunctuateResult(PunctuationType type, long... expected) { - ArrayList punctuated = type == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime; - assertEquals("the number of outputs:", expected.length, punctuated.size()); + // get the captured processors with the expected number + public List> capturedProcessors(final int expectedNumberOfProcessors) { + assertEquals(expectedNumberOfProcessors, processors.size()); - for (int i = 0; i < expected.length; i++) { - assertEquals("output[" + i + "]:", expected[i], (long) punctuated.get(i)); - } - - processed.clear(); + return processors; } - } diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 6b5d47a..e931c7e 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -57,10 +57,6 @@ public class NoOpProcessorContext extends AbstractProcessorContext { } @Override - public void schedule(final long interval) { - } - - @Override public void forward(final K key, final V value) { forwardedValues.put(key, value); } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala index 8f0ae93..4cee0ac 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala @@ -21,7 +21,6 @@ package org.apache.kafka.streams.scala package kstream import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _} -import org.apache.kafka.common.serialization.Serde import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionConversions._ diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 94c36ad..d3ccaec 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -284,14 +284,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { override def init(context: ProcessorContext): Unit = transformerSupplier.init(context) - @deprecated ("Please use Punctuator functional interface at https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/Punctuator.html instead", "0.1.3") // scalastyle:ignore - override def punctuate(timestamp: Long): KeyValue[K1, V1] = { - transformerSupplier.punctuate(timestamp) match { - case (k1, v1) => KeyValue.pair[K1, V1](k1, v1) - case _ => null - } - } - override def close(): Unit = transformerSupplier.close() } } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala index 1aa1978..226192f 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala @@ -23,7 +23,6 @@ package kstream import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ, _} import org.apache.kafka.streams.state.WindowStore import org.apache.kafka.common.utils.Bytes -import org.apache.kafka.common.serialization.Serde import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionConversions._ diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index 15b2da6..c387c36 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -361,14 +361,6 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S }; } - @Override - public void schedule(final long interval) { - throw new UnsupportedOperationException( - "schedule() is deprecated and not supported in Mock. " + - "Use schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) instead." - ); - } - /** * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}. * diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java index 8c5ec46..934e043 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java @@ -361,10 +361,6 @@ public class MockProcessorContextTest { } @Override - public void punctuate(final long timestamp) { - } - - @Override public void close() { } }; diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 077b8ca..5259ef2 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -185,10 +185,6 @@ public class TopologyTestDriverTest { private boolean closed = false; private final List processedRecords = new ArrayList<>(); - MockProcessor() { - this(Collections.emptySet()); - } - MockProcessor(final Collection punctuations) { this.punctuations = punctuations; } @@ -208,10 +204,6 @@ public class TopologyTestDriverTest { context.forward(key, value); } - @SuppressWarnings("deprecation") - @Override - public void punctuate(long timestamp) {} // deprecated - @Override public void close() { closed = true; @@ -840,9 +832,6 @@ public class TopologyTestDriverTest { } @Override - public void punctuate(final long timestamp) {} - - @Override public void close() {} } @@ -870,9 +859,6 @@ public class TopologyTestDriverTest { } @Override - public void punctuate(final long timestamp) {} - - @Override public void close() {} }; } -- To stop receiving notification emails like this one, please contact guozhang@apache.org.