kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6878 Switch the order of underlying.init and initInternal (#4988)
Date Thu, 10 May 2018 00:07:38 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new a7664f2  KAFKA-6878 Switch the order of underlying.init and initInternal (#4988)
a7664f2 is described below

commit a7664f22fa7f22cc724864f073e2074f4ff0ba91
Author: tedyu <yuzhihong@gmail.com>
AuthorDate: Wed May 9 17:06:47 2018 -0700

    KAFKA-6878 Switch the order of underlying.init and initInternal (#4988)
    
    This is continuation of #4978.
    From Guozhang:
    
    I think to fix this issue, in init we could consider switching the steps of 1 and 2:
    
    initInternal(context);
    underlying.init(context, root);
    
    since
    
    volatile boolean open = false;
    
    it should be sufficient. In this case the check on step 3) will fail if underlying.init
is not completed and we will throw InvalidStateStoreException.
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
---
 .../kafka/streams/state/internals/CachingKeyValueStore.java    |  2 +-
 .../kafka/streams/state/internals/CachingSessionStore.java     |  2 +-
 .../kafka/streams/state/internals/CachingWindowStore.java      | 10 +++++++---
 3 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 9eebc16..5db3f68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -60,8 +60,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore
im
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        underlying.init(context, root);
         initInternal(context);
+        underlying.init(context, root);
         // save the stream thread as we only ever want to trigger a flush
         // when the stream thread is the current thread.
         streamThread = Thread.currentThread();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 31b9d75..022f6f3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -64,8 +64,8 @@ class CachingSessionStore<K, AGG> extends WrappedStateStore.AbstractStateStore
i
 
     public void init(final ProcessorContext context, final StateStore root) {
         topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name());
-        bytesStore.init(context, root);
         initInternal((InternalProcessorContext) context);
+        bytesStore.init(context, root);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index ad0bd99..a78978b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -67,8 +67,8 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
 
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        underlying.init(context, root);
         initInternal(context);
+        underlying.init(context, root);
         keySchema.init(context.applicationId());
     }
 
@@ -163,7 +163,9 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
         validateStoreOpen();
 
         final WindowStoreIterator<byte[]> underlyingIterator = underlying.fetch(key,
timeFrom, timeTo);
-
+        if (cache == null) {
+            return underlyingIterator;
+        }
         final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRangeFixedSize(key,
timeFrom));
         final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRangeFixedSize(key,
timeTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom,
cacheKeyTo);
@@ -186,7 +188,9 @@ class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore
impl
         validateStoreOpen();
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator =
underlying.fetch(from, to, timeFrom, timeTo);
-
+        if (cache == null) {
+            return underlyingIterator;
+        }
         final Bytes cacheKeyFrom = cacheFunction.cacheKey(keySchema.lowerRange(from, timeFrom));
         final Bytes cacheKeyTo = cacheFunction.cacheKey(keySchema.upperRange(to, timeTo));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name, cacheKeyFrom,
cacheKeyTo);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message