kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5172: Fix fetchPrevious to find the correct session
Date Mon, 08 May 2017 05:15:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e71dce89c -> e472ee7b6


KAFKA-5172: Fix fetchPrevious to find the correct session

Change fetchPrevious to use findSessions with the proper key and timestamps rather than using
fetch.

Author: Kyle Winkelman <kyle.winkelman@optum.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #2972 from KyleWinkelman/CachingSessionStore-fetchPrevious


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e472ee7b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e472ee7b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e472ee7b

Branch: refs/heads/trunk
Commit: e472ee7b613dbcab2ba1f5b6b384fa713f3906d0
Parents: e71dce8
Author: Kyle Winkelman <kyle.winkelman@optum.com>
Authored: Sun May 7 22:15:40 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun May 7 22:15:40 2017 -0700

----------------------------------------------------------------------
 .../state/internals/CachingSessionStore.java    | 12 ++++++----
 .../internals/CachingSessionStoreTest.java      | 25 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e472ee7b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index bebd118..00d4a4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
@@ -56,7 +57,6 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore
i
         this.keySchema = new SessionKeySchema();
     }
 
-    @SuppressWarnings("unchecked")
     public void init(final ProcessorContext context, final StateStore root) {
         topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
         bytesStore.init(context, root);
@@ -128,21 +128,23 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore
i
         context.setRecordContext(entry.recordContext());
         try {
             final Windowed<K> key = SessionKeySerde.from(binaryKey.get(), keySerde.deserializer(),
topic);
+            final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
             if (flushListener != null) {
                 final AGG newValue = serdes.valueFrom(entry.newValue());
-                final AGG oldValue = fetchPrevious(binaryKey);
+                final AGG oldValue = fetchPrevious(rawKey, key.window());
                 if (!(newValue == null && oldValue == null)) {
                     flushListener.apply(key, newValue, oldValue);
                 }
             }
-            bytesStore.put(new Windowed<>(Bytes.wrap(serdes.rawKey(key.key())), key.window()),
entry.newValue());
+            bytesStore.put(new Windowed<>(rawKey, key.window()), entry.newValue());
         } finally {
             context.setRecordContext(current);
         }
     }
 
-    private AGG fetchPrevious(final Bytes key) {
-        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore.fetch(key))
{
+    private AGG fetchPrevious(final Bytes rawKey, final Window window) {
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = bytesStore
+                .findSessions(rawKey, window.start(), window.end())) {
             if (!iterator.hasNext()) {
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e472ee7b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index d316ae2..f8eec1c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.CacheFlushListener;
+import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -162,6 +164,29 @@ public class CachingSessionStoreTest {
     }
 
     @Test
+    public void shouldForwardChangedValuesDuringFlush() throws Exception {
+        final Windowed<String> a = new Windowed<>("a", new SessionWindow(0, 0));
+        final List<KeyValue<Windowed<String>, Change<Long>>> flushed
= new ArrayList<>();
+        cachingStore.setFlushListener(new CacheFlushListener<Windowed<String>, Long>()
{
+                @Override
+                public void apply(final Windowed<String> key, final Long newValue,
final Long oldValue) {
+                    flushed.add(KeyValue.pair(key, new Change<>(newValue, oldValue)));
+                }
+            });
+        
+        cachingStore.put(a, 1L);
+        cachingStore.flush();
+        
+        cachingStore.put(a, 2L);
+        cachingStore.flush();
+
+        cachingStore.remove(a);
+        cachingStore.flush();
+
+        assertEquals(flushed, Arrays.asList(KeyValue.pair(a, new Change<>(1L, null)),
KeyValue.pair(a, new Change<>(2L, 1L)), KeyValue.pair(a, new Change<>(null, 2L))));
+    }
+
+    @Test
     public void shouldClearNamespaceCacheOnClose() throws Exception {
         final Windowed<String> a1 = new Windowed<>("a", new SessionWindow(0,
0));
         cachingStore.put(a1, 1L);


Mime
View raw message