kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7778: Add KTable.suppress to Scala API (#6314)
Date Mon, 15 Apr 2019 23:27:30 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b7d7f75  KAFKA-7778: Add KTable.suppress to Scala API (#6314)
b7d7f75 is described below

commit b7d7f7590de405d8d527c32ca9f067e073e9d83b
Author: Casey Green <greenc421@gmail.com>
AuthorDate: Mon Apr 15 18:27:19 2019 -0500

    KAFKA-7778: Add KTable.suppress to Scala API (#6314)
    
    Detailed description
    
    * Adds KTable.suppress to the Scala API.
    * Fixed count in KGroupedStream, SessionWindowedKStream, and TimeWindowedKStream so that
the value serde gets passed down to the KTable returned by the internal mapValues method.
    * Suppress API support for Java 1.8 + Scala 2.11
    
    Testing strategy
    
    I added unit tests covering:
    
    * Windowed KTable.count.suppress w/ Suppressed.untilTimeLimit
    * Windowed KTable.count.suppress w/ Suppressed.untilWindowCloses
    * Non-windowed KTable.count.suppress w/ Suppressed.untilTimeLimit
    * Session-windowed KTable.count.suppress w/ Suppressed.untilWindowCloses
    
    Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../apache/kafka/streams/kstream/Suppressed.java   |  15 +-
 .../internals/suppress/BufferConfigInternal.java   |   2 +-
 .../internals/suppress/EagerBufferConfigImpl.java  |   6 +-
 .../streams/scala/kstream/KGroupedStream.scala     |  11 +-
 .../kafka/streams/scala/kstream/KTable.scala       |  12 ++
 .../scala/kstream/SessionWindowedKStream.scala     |  11 +-
 .../kafka/streams/scala/kstream/Suppressed.scala   | 128 ++++++++++++
 .../scala/kstream/TimeWindowedKStream.scala        |  11 +-
 .../kafka/streams/scala/kstream/KTableTest.scala   | 224 +++++++++++++++++++++
 .../streams/scala/kstream/SuppressedTest.scala     |  96 +++++++++
 10 files changed, 500 insertions(+), 16 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index b5d7937..c80eaec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -33,11 +33,20 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>>
{
 
     }
 
+    /**
+     * Marker interface for a buffer configuration that will strictly enforce size constraints
+     * (bytes and/or number of records) on the buffer, so it is suitable for reducing duplicate
+     * results downstream, but does not promise to eliminate them entirely.
+     */
+    interface EagerBufferConfig extends BufferConfig<EagerBufferConfig> {
+
+    }
+
     interface BufferConfig<BC extends BufferConfig<BC>> {
         /**
          * Create a size-constrained buffer in terms of the maximum number of keys it will
store.
          */
-        static BufferConfig<?> maxRecords(final long recordLimit) {
+        static EagerBufferConfig maxRecords(final long recordLimit) {
             return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
         }
 
@@ -49,7 +58,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>>
{
         /**
          * Create a size-constrained buffer in terms of the maximum number of bytes it will
use.
          */
-        static BufferConfig<?> maxBytes(final long byteLimit) {
+        static EagerBufferConfig maxBytes(final long byteLimit) {
             return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
         }
 
@@ -108,7 +117,7 @@ public interface Suppressed<K> extends NamedOperation<Suppressed<K>>
{
          * This buffer is "not strict" in the sense that it may emit early, so it is suitable
for reducing
          * duplicate results downstream, but does not promise to eliminate them.
          */
-        BufferConfig emitEarlyWhenFull();
+        EagerBufferConfig emitEarlyWhenFull();
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
index 10675ef..2087945 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
@@ -43,7 +43,7 @@ abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC>>
impl
     }
 
     @Override
-    public Suppressed.BufferConfig emitEarlyWhenFull() {
+    public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
         return new EagerBufferConfigImpl(maxRecords(), maxBytes());
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index e94abc1..1c1b30c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -20,7 +20,7 @@ import org.apache.kafka.streams.kstream.Suppressed;
 
 import java.util.Objects;
 
-public class EagerBufferConfigImpl extends BufferConfigInternal {
+public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.EagerBufferConfig>
implements Suppressed.EagerBufferConfig {
 
     private final long maxRecords;
     private final long maxBytes;
@@ -31,12 +31,12 @@ public class EagerBufferConfigImpl extends BufferConfigInternal {
     }
 
     @Override
-    public Suppressed.BufferConfig withMaxRecords(final long recordLimit) {
+    public Suppressed.EagerBufferConfig withMaxRecords(final long recordLimit) {
         return new EagerBufferConfigImpl(recordLimit, maxBytes);
     }
 
     @Override
-    public Suppressed.BufferConfig withMaxBytes(final long byteLimit) {
+    public Suppressed.EagerBufferConfig withMaxBytes(final long byteLimit) {
         return new EagerBufferConfigImpl(maxRecords, byteLimit);
     }
 
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 5168805..98fb12b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -20,7 +20,8 @@
 package org.apache.kafka.streams.scala
 package kstream
 
-import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, _}
+import org.apache.kafka.streams.kstream.internals.KTableImpl
+import org.apache.kafka.streams.kstream.{KGroupedStream => KGroupedStreamJ, KTable =>
KTableJ, _}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionsCompatConversions._
 
@@ -46,9 +47,13 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KGroupedStream#count`
    */
   def count()(implicit materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K,
Long] = {
-    val c: KTable[K, java.lang.Long] =
+    val javaCountTable: KTableJ[K, java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayKeyValueStore]])
-    c.mapValues[Long](Long2long _)
+    val tableImpl = javaCountTable.asInstanceOf[KTableImpl[K, ByteArrayKeyValueStore, java.lang.Long]]
+    javaCountTable.mapValues[Long](
+      ((l: java.lang.Long) => Long2long(l)).asValueMapper,
+      Materialized.`with`[K, Long, ByteArrayKeyValueStore](tableImpl.keySerde(), Serdes.Long)
+    )
   }
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 9ac27ee..20ee08b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -160,6 +160,18 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
     inner.toStream[KR](mapper.asKeyValueMapper)
 
   /**
+   * Suppress some updates from this changelog stream, determined by the supplied [[Suppressed]]
configuration.
+   *
+   * This controls what updates downstream table and stream operations will receive.
+   *
+   * @param suppressed Configuration object determining what, if any, updates to suppress.
+   * @return A new KTable with the desired suppression characteristics.
+   * @see `org.apache.kafka.streams.kstream.KTable#suppress`
+   */
+  def suppress(suppressed: Suppressed[_ >: K]): KTable[K, V] =
+    inner.suppress(suppressed)
+
+  /**
    * Create a new `KTable` by transforming the value of each record in this `KTable` into
a new value, (with possibly new type).
    * Transform the value of each input record into a new value (with possible new type) of
the output record.
    * A `ValueTransformerWithKey` (provided by the given `ValueTransformerWithKeySupplier`)
is applied to each input
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index 6571df9..a3e8ae2 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -20,7 +20,8 @@
 package org.apache.kafka.streams.scala
 package kstream
 
-import org.apache.kafka.streams.kstream.{SessionWindowedKStream => SessionWindowedKStreamJ,
_}
+import org.apache.kafka.streams.kstream.internals.KTableImpl
+import org.apache.kafka.streams.kstream.{KTable => KTableJ, SessionWindowedKStream =>
SessionWindowedKStreamJ, _}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionsCompatConversions._
 
@@ -60,9 +61,13 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K,
V]) {
    * @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
    */
   def count()(implicit materialized: Materialized[K, Long, ByteArraySessionStore]): KTable[Windowed[K],
Long] = {
-    val c: KTable[Windowed[K], java.lang.Long] =
+    val javaCountTable: KTableJ[Windowed[K], java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]])
-    c.mapValues[Long](Long2long _)
+    val tableImpl = javaCountTable.asInstanceOf[KTableImpl[Windowed[K], ByteArraySessionStore,
java.lang.Long]]
+    javaCountTable.mapValues[Long](
+      ((l: java.lang.Long) => Long2long(l)).asValueMapper,
+      Materialized.`with`[Windowed[K], Long, ByteArrayKeyValueStore](tableImpl.keySerde(),
Serdes.Long)
+    )
   }
 
   /**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala
new file mode 100644
index 0000000..2fdc09d
--- /dev/null
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+import java.time.Duration
+
+import org.apache.kafka.streams.kstream.{Windowed, Suppressed => SupressedJ}
+import org.apache.kafka.streams.kstream.Suppressed.{
+  EagerBufferConfig,
+  StrictBufferConfig,
+  BufferConfig => BufferConfigJ
+}
+import org.apache.kafka.streams.kstream.internals.suppress.{
+  EagerBufferConfigImpl,
+  FinalResultsSuppressionBuilder,
+  StrictBufferConfigImpl,
+  SuppressedInternal
+}
+
+/**
+ * Duplicates the static factory methods inside the Java interface [[org.apache.kafka.streams.kstream.Suppressed]].
+ *
+ * This is required for compatibility w/ Scala 2.11 + Java 1.8 because the Scala 2.11 compiler
doesn't support the use
+ * of static methods inside Java interfaces.
+ *
+ * TODO: Deprecate this class if support for Scala 2.11 + Java 1.8 is dropped.
+ */
+object Suppressed {
+
+  /**
+   * Configure the suppression to emit only the "final results" from the window.
+   *
+   * By default all Streams operators emit results whenever new results are available.
+   * This includes windowed operations.
+   *
+   * This configuration will instead emit just one result per key for each window, guaranteeing
+   * to deliver only the final result. This option is suitable for use cases in which the
business logic
+   * requires a hard guarantee that only the final result is propagated. For example, sending
alerts.
+   *
+   * To accomplish this, the operator will buffer events from the window until the window
close (that is,
+   * until the end-time passes, and additionally until the grace period expires). Since windowed
operators
+   * are required to reject late events for a window whose grace period is expired, there
is an additional
+   * guarantee that the final results emitted from this suppression will match any queriable
state upstream.
+   *
+   * @param bufferConfig A configuration specifying how much space to use for buffering intermediate
results.
+   *                     This is required to be a "strict" config, since it would violate
the "final results"
+   *                     property to emit early and then issue an update later.
+   * @tparam K The [[Windowed]] key type for the KTable to apply this suppression to.
+   * @return a "final results" mode suppression configuration
+   * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
+   */
+  def untilWindowCloses[K](bufferConfig: StrictBufferConfig): SupressedJ[Windowed[K]] =
+    new FinalResultsSuppressionBuilder[Windowed[K]](null, bufferConfig)
+
+  /**
+   * Configure the suppression to wait `timeToWaitForMoreEvents` amount of time after receiving
a record
+   * before emitting it further downstream. If another record for the same key arrives in
the mean time, it replaces
+   * the first record in the buffer but does <em>not</em> re-start the timer.
+   *
+   * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
+   * @param bufferConfig A configuration specifying how much space to use for buffering intermediate
results.
+   * @tparam K The key type for the KTable to apply this suppression to.
+   * @return a suppression configuration
+   * @see [[org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit]]
+   */
+  def untilTimeLimit[K](timeToWaitForMoreEvents: Duration, bufferConfig: BufferConfigJ[_]):
SupressedJ[K] =
+    new SuppressedInternal[K](null, timeToWaitForMoreEvents, bufferConfig, null, false)
+
+  /**
+   * Duplicates the static factory methods inside the Java interface
+   * [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig]].
+   */
+  object BufferConfig {
+
+    /**
+     * Create a size-constrained buffer in terms of the maximum number of keys it will store.
+     *
+     * @param recordLimit maximum number of keys to buffer.
+     * @return size-constrained buffer in terms of the maximum number of keys it will store.
+     * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords]]
+     */
+    def maxRecords(recordLimit: Long): EagerBufferConfig =
+      new EagerBufferConfigImpl(recordLimit, Long.MaxValue)
+
+    /**
+     * Create a size-constrained buffer in terms of the maximum number of bytes it will use.
+     *
+     * @param byteLimit maximum number of bytes to buffer.
+     * @return size-constrained buffer in terms of the maximum number of bytes it will use.
+     * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes]]
+     */
+    def maxBytes(byteLimit: Long): EagerBufferConfig =
+      new EagerBufferConfigImpl(Long.MaxValue, byteLimit)
+
+    /**
+     * Create a buffer unconstrained by size (either keys or bytes).
+     *
+     * As a result, the buffer will consume as much memory as it needs, dictated by the time
bound.
+     *
+     * If there isn't enough heap available to meet the demand, the application will encounter
an
+     * [[OutOfMemoryError]] and shut down (not guaranteed to be a graceful exit). Also, note
that
+     * JVM processes under extreme memory pressure may exhibit poor GC behavior.
+     *
+     * This is a convenient option if you doubt that your buffer will be that large, but
also don't
+     * wish to pick particular constraints, such as in testing.
+     *
+     * This buffer is "strict" in the sense that it will enforce the time bound or crash.
+     * It will never emit early.
+     *
+     * @return a buffer unconstrained by size (either keys or bytes).
+     * @see [[org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded]]
+     */
+    def unbounded(): StrictBufferConfig = new StrictBufferConfigImpl()
+  }
+}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index d84416e..c160600 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -20,7 +20,8 @@
 package org.apache.kafka.streams.scala
 package kstream
 
-import org.apache.kafka.streams.kstream.{TimeWindowedKStream => TimeWindowedKStreamJ,
_}
+import org.apache.kafka.streams.kstream.internals.KTableImpl
+import org.apache.kafka.streams.kstream.{KTable => KTableJ, TimeWindowedKStream =>
TimeWindowedKStreamJ, _}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionsCompatConversions._
 
@@ -59,9 +60,13 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V])
{
    * @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#count`
    */
   def count()(implicit materialized: Materialized[K, Long, ByteArrayWindowStore]): KTable[Windowed[K],
Long] = {
-    val c: KTable[Windowed[K], java.lang.Long] =
+    val javaCountTable: KTableJ[Windowed[K], java.lang.Long] =
       inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArrayWindowStore]])
-    c.mapValues[Long](Long2long _)
+    val tableImpl = javaCountTable.asInstanceOf[KTableImpl[Windowed[K], ByteArrayWindowStore,
java.lang.Long]]
+    javaCountTable.mapValues[Long](
+      ((l: java.lang.Long) => Long2long(l)).asValueMapper,
+      Materialized.`with`[Windowed[K], Long, ByteArrayKeyValueStore](tableImpl.keySerde(),
Serdes.Long)
+    )
   }
 
   /**
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index dc080f1..f2de3de 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -18,8 +18,12 @@
  */
 package org.apache.kafka.streams.scala.kstream
 
+import java.time.Duration
+
+import org.apache.kafka.streams.kstream.{SessionWindows, TimeWindows, Windowed}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
 import org.apache.kafka.streams.scala.utils.TestDriver
 import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore, StreamsBuilder}
 import org.junit.runner.RunWith
@@ -139,4 +143,224 @@ class KTableTest extends FlatSpec with Matchers with TestDriver {
 
     testDriver.close()
   }
+
+  "windowed KTable#suppress" should "correctly suppress results using Suppressed.untilTimeLimit"
in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+    val window = TimeWindows.of(Duration.ofSeconds(1L))
+    val suppression = Suppressed.untilTimeLimit[Windowed[String]](Duration.ofSeconds(2L),
BufferConfig.unbounded())
+
+    val table: KTable[Windowed[String], Long] = builder
+      .stream[String, String](sourceTopic)
+      .groupByKey
+      .windowedBy(window)
+      .count
+      .suppress(suppression)
+
+    table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    {
+      // publish key=1 @ time 0 => count==1
+      testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // publish key=1 @ time 1 => count==2
+      testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // move event time past the first window, but before the suppression window
+      testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // move event time riiiight before suppression window ends
+      testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // publish a late event before suppression window terminates => count==3
+      testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // move event time right past the suppression window of the first window.
+      testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L)
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "0:1000:1"
+      record.value shouldBe 3L
+    }
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
+  "windowed KTable#suppress" should "correctly suppress results using Suppressed.untilWindowCloses"
in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+    val window = TimeWindows.of(Duration.ofSeconds(1L)).grace(Duration.ofSeconds(1L))
+    val suppression = Suppressed.untilWindowCloses[String](BufferConfig.unbounded())
+
+    val table: KTable[Windowed[String], Long] = builder
+      .stream[String, String](sourceTopic)
+      .groupByKey
+      .windowedBy(window)
+      .count
+      .suppress(suppression)
+
+    table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    {
+      // publish key=1 @ time 0 => count==1
+      testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // publish key=1 @ time 1 => count==2
+      testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // move event time past the window, but before the grace period
+      testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // move event time riiiight before grace period ends
+      testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // publish a late event before grace period terminates => count==3
+      testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // move event time right past the grace period of the first window.
+      testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L)
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "0:1000:1"
+      record.value shouldBe 3L
+    }
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
+  "session windowed KTable#suppress" should "correctly suppress results using Suppressed.untilWindowCloses"
in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+    // Very similar to SuppressScenarioTest.shouldSupportFinalResultsForSessionWindows
+    val window = SessionWindows.`with`(Duration.ofMillis(5L)).grace(Duration.ofMillis(10L))
+    val suppression = Suppressed.untilWindowCloses[String](BufferConfig.unbounded())
+
+    val table: KTable[Windowed[String], Long] = builder
+      .stream[String, String](sourceTopic)
+      .groupByKey
+      .windowedBy(window)
+      .count
+      .suppress(suppression)
+
+    table.toStream((k, _) => s"${k.window().start()}:${k.window().end()}:${k.key()}").to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    {
+      // first window
+      testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 0L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // first window
+      testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 1L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // new window, but grace period hasn't ended for first window
+      testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 8L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // late event for first window, included since grade period hasn't passed
+      testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 2L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // push stream time forward to flush other events through
+      testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 30L)
+      // too-late event should get dropped from the stream
+      testDriver.pipeRecord(sourceTopic, ("k1", "v1"), 3L)
+      // should now have to results
+      val r1 = testDriver.readRecord[String, Long](sinkTopic)
+      r1.key shouldBe "0:2:k1"
+      r1.value shouldBe 3L
+      val r2 = testDriver.readRecord[String, Long](sinkTopic)
+      r2.key shouldBe "8:8:k1"
+      r2.value shouldBe 1
+    }
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
+  "non-windowed KTable#suppress" should "correctly suppress results using Suppressed.untilTimeLimit"
in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+    val suppression = Suppressed.untilTimeLimit[String](Duration.ofSeconds(2L), BufferConfig.unbounded())
+
+    val table: KTable[String, Long] = builder
+      .stream[String, String](sourceTopic)
+      .groupByKey
+      .count
+      .suppress(suppression)
+
+    table.toStream.to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    {
+      // publish key=1 @ time 0 => count==1
+      testDriver.pipeRecord(sourceTopic, ("1", "value1"), 0L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // publish key=1 @ time 1 => count==2
+      testDriver.pipeRecord(sourceTopic, ("1", "value2"), 1L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // move event time past the window, but before the grace period
+      testDriver.pipeRecord(sourceTopic, ("2", "value1"), 1001L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // move event time riiiight before grace period ends
+      testDriver.pipeRecord(sourceTopic, ("2", "value2"), 1999L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // publish a late event before grace period terminates => count==3
+      testDriver.pipeRecord(sourceTopic, ("1", "value3"), 999L)
+      Option(testDriver.readRecord[String, Long](sinkTopic)) shouldBe None
+    }
+    {
+      // move event time right past the grace period of the first window.
+      testDriver.pipeRecord(sourceTopic, ("2", "value3"), 2001L)
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe 3L
+    }
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
 }
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala
new file mode 100644
index 0000000..5df8347
--- /dev/null
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+import java.time.Duration
+
+import org.apache.kafka.streams.kstream.internals.suppress.{
+  BufferFullStrategy,
+  EagerBufferConfigImpl,
+  FinalResultsSuppressionBuilder,
+  StrictBufferConfigImpl,
+  SuppressedInternal
+}
+import org.apache.kafka.streams.scala.kstream.Suppressed.BufferConfig
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+
+@RunWith(classOf[JUnitRunner])
+class SuppressedTest extends FlatSpec with Matchers {
+
+  "Suppressed.untilWindowCloses" should "produce the correct suppression" in {
+    val bufferConfig = BufferConfig.unbounded()
+    val suppression = Suppressed.untilWindowCloses[String](bufferConfig)
+    suppression shouldEqual new FinalResultsSuppressionBuilder(null, bufferConfig)
+    suppression.withName("soup") shouldEqual new FinalResultsSuppressionBuilder("soup", bufferConfig)
+  }
+
+  "Suppressed.untilTimeLimit" should "produce the correct suppression" in {
+    val bufferConfig = BufferConfig.unbounded()
+    val duration = Duration.ofMillis(1)
+    Suppressed.untilTimeLimit[String](duration, bufferConfig) shouldEqual
+      new SuppressedInternal[String](null, duration, bufferConfig, null, false)
+  }
+
+  "BufferConfig.maxRecords" should "produce the correct buffer config" in {
+    BufferConfig.maxRecords(4) shouldEqual new EagerBufferConfigImpl(4, Long.MaxValue)
+    BufferConfig.maxRecords(4).withMaxBytes(5) shouldEqual new EagerBufferConfigImpl(4, 5)
+  }
+
+  "BufferConfig.maxBytes" should "produce the correct buffer config" in {
+    BufferConfig.maxBytes(4) shouldEqual new EagerBufferConfigImpl(Long.MaxValue, 4)
+    BufferConfig.maxBytes(4).withMaxRecords(5) shouldEqual new EagerBufferConfigImpl(5, 4)
+  }
+
+  "BufferConfig.unbounded" should "produce the correct buffer config" in {
+    BufferConfig.unbounded() shouldEqual
+      new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, BufferFullStrategy.SHUT_DOWN)
+  }
+
+  "BufferConfig" should "support very long chains of factory methods" in {
+    val bc1 = BufferConfig
+      .unbounded()
+      .emitEarlyWhenFull()
+      .withMaxRecords(3L)
+      .withMaxBytes(4L)
+      .withMaxRecords(5L)
+      .withMaxBytes(6L)
+    bc1 shouldEqual new EagerBufferConfigImpl(5L, 6L)
+    bc1.shutDownWhenFull() shouldEqual new StrictBufferConfigImpl(5L, 6L, BufferFullStrategy.SHUT_DOWN)
+
+    val bc2 = BufferConfig
+      .maxBytes(4)
+      .withMaxRecords(5)
+      .withMaxBytes(6)
+      .withNoBound()
+      .withMaxBytes(7)
+      .withMaxRecords(8)
+
+    bc2 shouldEqual new StrictBufferConfigImpl(8L, 7L, BufferFullStrategy.SHUT_DOWN)
+    bc2.withNoBound() shouldEqual BufferConfig.unbounded()
+
+    val bc3 = BufferConfig
+      .maxRecords(5L)
+      .withMaxBytes(10L)
+      .emitEarlyWhenFull()
+      .withMaxRecords(11L)
+
+    bc3 shouldEqual new EagerBufferConfigImpl(11L, 10L)
+  }
+}


Mime
View raw message