kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4468: Correctly calculate the window end timestamp after read from state stores
Date Tue, 12 Sep 2017 23:42:10 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9ed8bf273 -> 90b4b07e6


KAFKA-4468: Correctly calculate the window end timestamp after read from state stores

I have decided to use the following approach to fixing this bug:

1) Since the Window Size in WindowedDeserializer was originally unknown, I have initialized
a field _windowSize_ and created a constructor to allow it to be instantiated

2) The default size for __windowSize__ is _Long.MAX_VALUE_. If that is the case, then the
deserialize method will return an Unlimited Window, or else will return Timed one.

3) Temperature Demo was modified to demonstrate how to use this new constructor, given
that the window size is known.

Author: Richard Yu <richardyu@Richards-Air.attlocal.net>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>

Closes #3745 from ConcurrencyPractitioner/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/90b4b07e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/90b4b07e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/90b4b07e

Branch: refs/heads/trunk
Commit: 90b4b07e6e3b44edfa41eebea83a674cb6f61922
Parents: 9ed8bf2
Author: Richard Yu <richardyu@Richards-Air.attlocal.net>
Authored: Tue Sep 12 16:42:06 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Sep 12 16:42:06 2017 -0700

----------------------------------------------------------------------
 .../examples/temperature/TemperatureDemo.java   |  2 +-
 .../kstream/internals/WindowedDeserializer.java | 40 +++++++++++++++-----
 .../state/internals/WindowStoreUtils.java       |  2 +-
 .../WindowedStreamPartitionerTest.java          | 24 +++++++++---
 4 files changed, 51 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/90b4b07e/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
index 1c2045e..2039ca5 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/temperature/TemperatureDemo.java
@@ -114,7 +114,7 @@ public class TemperatureDemo {
                 });
 
         WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(Serdes.String().serializer());
-        WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer());
+        WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(Serdes.String().deserializer(),
TEMPERATURE_WINDOW_SIZE);
         Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,
windowedDeserializer);
 
         // need to override key serde to Windowed<String> type

http://git-wip-us.apache.org/repos/asf/kafka/blob/90b4b07e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
index 1cb8d9c..67fee49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedDeserializer.java
@@ -19,7 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.internals.WindowStoreUtils;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
@@ -33,16 +35,30 @@ import java.util.Map;
 public class WindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
 
     private static final int TIMESTAMP_SIZE = 8;
-
+    private final Long windowSize;
+    
     private Deserializer<T> inner;
-
+    
     // Default constructor needed by Kafka
-    public WindowedDeserializer() {}
+    public WindowedDeserializer() {
+        this(null, Long.MAX_VALUE);
+    }
+    
+    public WindowedDeserializer(final Long windowSize) {
+       this(null, windowSize);
+    }
+    
+    public WindowedDeserializer(final Deserializer<T> inner) {
+        this(inner, Long.MAX_VALUE);
+    }
 
-    public WindowedDeserializer(Deserializer<T> inner) {
+    public WindowedDeserializer(final Deserializer<T> inner,
+                                final long windowSize) {
         this.inner = inner;
+        this.windowSize = windowSize;
     }
-
+    
+    @SuppressWarnings("unchecked")
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
         if (inner == null) {
@@ -66,11 +82,11 @@ public class WindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
         byte[] bytes = new byte[data.length - TIMESTAMP_SIZE];
 
         System.arraycopy(data, 0, bytes, 0, bytes.length);
-
+        
         long start = ByteBuffer.wrap(data).getLong(data.length - TIMESTAMP_SIZE);
-
-        // always read as unlimited window
-        return new Windowed<T>(inner.deserialize(topic, bytes), new UnlimitedWindow(start));
+        
+        Window timeWindow = windowSize != Long.MAX_VALUE ? WindowStoreUtils.timeWindowForSize(start,
windowSize) : new UnlimitedWindow(start);
+        return new Windowed<T>(inner.deserialize(topic, bytes), timeWindow);
     }
 
 
@@ -78,9 +94,13 @@ public class WindowedDeserializer<T> implements Deserializer<Windowed<T>>
{
     public void close() {
         inner.close();
     }
-
+    
     // Only for testing
     public Deserializer<T> innerDeserializer() {
         return inner;
     }
+    
+    public Long getWindowSize() {
+        return this.windowSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/90b4b07e/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
index ed79947..317ce22 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreUtils.java
@@ -79,7 +79,7 @@ public class WindowStoreUtils {
      * Safely construct a time window of the given size,
      * taking care of bounding endMs to Long.MAX_VALUE if necessary
      */
-    static TimeWindow timeWindowForSize(final long startMs, final long windowSize) {
+    public static TimeWindow timeWindowForSize(final long startMs, final long windowSize)
{
         final long endMs = startMs + windowSize;
         return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/90b4b07e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 98bb346..d3510a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -20,8 +20,8 @@ import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -92,7 +92,7 @@ public class WindowedStreamPartitionerTest {
 
         defaultPartitioner.close();
     }
-
+    
     @Test
     public void testWindowedSerializerNoArgConstructors() {
         Map<String, String> props = new HashMap<>();
@@ -118,11 +118,10 @@ public class WindowedStreamPartitionerTest {
         windowedSerializer.close();
         windowedSerializer1.close();
     }
-
+    
     @Test
     public void testWindowedDeserializerNoArgConstructors() {
         Map<String, String> props = new HashMap<>();
-        // test key[value].deserializer.inner.class takes precedence over serializer.inner.class
         WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>();
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
@@ -138,10 +137,25 @@ public class WindowedStreamPartitionerTest {
         props.remove("value.deserializer.inner.class");
         WindowedDeserializer<?> windowedDeserializer1 = new WindowedDeserializer<>();
         windowedDeserializer1.configure(props, false);
-        Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
+        final Deserializer<?> inner1 = windowedDeserializer1.innerDeserializer();
         assertNotNull("Inner deserializer should be not null", inner1);
         assertTrue("Inner deserializer type should be ByteArrayDeserializer", inner1 instanceof
ByteArrayDeserializer);
         windowedDeserializer.close();
         windowedDeserializer1.close();
     }
+    
+    @Test
+    public void testWindowDeserializeExpectedWindowSize() {
+        final long randomLong = 5000000;
+        final Map<String, String> props = new HashMap<>();
+        final WindowedDeserializer<StringSerializer> windowedDeserializer = new WindowedDeserializer<>(randomLong);
+        props.put("key.deserializer.inner.class", "org.apache.kafka.common.serialization.StringDeserializer");
+        windowedDeserializer.configure(props, true);
+        //test for deserializer expected window end time
+        final byte[] byteValues = stringSerializer.serialize(topicName, "dummy string");
//dummy string, serves no real purpose
+        final Windowed<?> windowed = windowedDeserializer.deserialize(topicName, byteValues);
+        final long actualSize = windowed.window().end() - windowed.window().start(); //find
actual window time
+        assertEquals(randomLong, actualSize); //testing if window size matches up with expected
one
+        windowedDeserializer.close();
+    }
 }


Mime
View raw message