kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bbej...@apache.org
Subject [kafka] branch trunk updated: MINOR: fix bypasses in ChangeLogging stores (#6266)
Date Thu, 14 Feb 2019 18:39:24 GMT
This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af91eee  MINOR: fix bypasses in ChangeLogging stores (#6266)
af91eee is described below

commit af91eeeb793af2f5b873c48ef8ff636e8b485308
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Thu Feb 14 12:39:10 2019 -0600

    MINOR: fix bypasses in ChangeLogging stores (#6266)
    
    The change-logging stores should not bypass methods in underlying stores.
    
    If some of you have a minute, can you take a quick look at this? I happened to notice
during some other refactoring that the change-logging store layer sometimes bypasses the underlying
store and instead calls across to a different layer.
    
    It seems unexpected that it should do so, and it might actually cause problems. There
was one spot where it's impossible to avoid it (in the windowed store), but I added a note
justifying why we bypass the underlying store.
    
    Thanks,
    -John
    
    * MINOR: fix bypasses in ChangeLogging stores
    
    * fix test
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>,
Bill Bejeck <bbejeck@gmail.com>
---
 .../streams/state/internals/ChangeLoggingKeyValueBytesStore.java   | 7 ++++---
 .../streams/state/internals/ChangeLoggingSessionBytesStore.java    | 4 ++--
 .../streams/state/internals/ChangeLoggingWindowBytesStore.java     | 4 ++++
 .../state/internals/ChangeLoggingSessionBytesStoreTest.java        | 4 ++--
 4 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index d5f5ad2..7567e78 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -40,7 +40,7 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore<KeyValueS
                      final StateStore root) {
         super.init(context, root);
         final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(),
name());
-        this.changeLogger = new StoreChangeLogger<>(name(), context, new StateSerdes<>(topic,
Serdes.Bytes(), Serdes.ByteArray()));
+        changeLogger = new StoreChangeLogger<>(name(), context, new StateSerdes<>(topic,
Serdes.Bytes(), Serdes.ByteArray()));
 
         // if the inner store is an LRU cache, add the eviction listener to log removed record
         if (wrapped() instanceof MemoryLRUCache) {
@@ -66,9 +66,10 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore<KeyValueS
     @Override
     public byte[] putIfAbsent(final Bytes key,
                               final byte[] value) {
-        final byte[] previous = get(key);
+        final byte[] previous = wrapped().putIfAbsent(key, value);
         if (previous == null) {
-            put(key, value);
+            // then it was absent
+            changeLogger.logChange(key, value);
         }
         return previous;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 1ed163b..8fe8609 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -81,11 +81,11 @@ class ChangeLoggingSessionBytesStore extends WrappedStateStore<SessionStore<Byte
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
-        return findSessions(key, 0, Long.MAX_VALUE);
+        return wrapped().fetch(key);
     }
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to) {
-        return findSessions(from, to, 0, Long.MAX_VALUE);
+        return wrapped().fetch(from, to);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index a614f92..3cddb33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -74,6 +74,10 @@ class ChangeLoggingWindowBytesStore extends WrappedStateStore<WindowStore<Bytes,
 
     @Override
     public void put(final Bytes key, final byte[] value) {
+        // Note: It's incorrect to bypass the wrapped store here by delegating to another
method,
+        // but we have no alternative. We must send a timestamped key to the changelog, which
means
+        // we need to know what timestamp gets used for the record. Hopefully, we can deprecate
this
+        // method in the future to resolve the situation.
         put(key, value, context.timestamp());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 6c1ab19..94eee76 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -113,7 +113,7 @@ public class ChangeLoggingSessionBytesStoreTest {
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetching() {
-        EasyMock.expect(inner.findSessions(bytesKey, 0, Long.MAX_VALUE)).andReturn(KeyValueIterators.<Windowed<Bytes>,
byte[]>emptyIterator());
+        EasyMock.expect(inner.fetch(bytesKey)).andReturn(KeyValueIterators.<Windowed<Bytes>,
byte[]>emptyIterator());
 
         init();
 
@@ -123,7 +123,7 @@ public class ChangeLoggingSessionBytesStoreTest {
 
     @Test
     public void shouldDelegateToUnderlyingStoreWhenFetchingRange() {
-        EasyMock.expect(inner.findSessions(bytesKey, bytesKey, 0, Long.MAX_VALUE)).andReturn(KeyValueIterators.<Windowed<Bytes>,
byte[]>emptyIterator());
+        EasyMock.expect(inner.fetch(bytesKey, bytesKey)).andReturn(KeyValueIterators.<Windowed<Bytes>,
byte[]>emptyIterator());
 
         init();
 


Mime
View raw message