kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3805: Check if DB is null.
Date Thu, 16 Jun 2016 23:18:14 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 9ba2fdf8b -> 8d38c115a


KAFKA-3805: Check if DB is null.

- Check if DB is null before flushing or closing. In some cases, a state store is closed twice.
This happens in `StreamTask.close()` where both `node.close()` and `super.close` (in `ProcessorManager`)
are called in a sequence. If the user's processor defines a `close` that closes the underlying
state store, then the second close will be redundant.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Andrés Gómez, Ismael Juma, Guozhang Wang

Closes #1485 from enothereska/KAFKA-3805-locks

(cherry picked from commit 751fe9309011b99f60c1cb03c23a47d0444dce05)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d38c115
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d38c115
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d38c115

Branch: refs/heads/0.10.0
Commit: 8d38c115ab6e61496eee84f790220f1643a1a804
Parents: 9ba2fdf
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Thu Jun 16 16:18:02 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jun 16 16:18:11 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/processor/StateStore.java | 4 +++-
 .../streams/processor/internals/ProcessorStateManager.java  | 4 ++++
 .../apache/kafka/streams/state/internals/RocksDBStore.java  | 9 +++++++++
 3 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8d38c115/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index f79e6f6..68f3644 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -46,7 +46,9 @@ public interface StateStore {
     void flush();
 
     /**
-     * Close the storage engine
+     * Close the storage engine.
+     * Note that this function needs to be idempotent since it may be called
+     * several times on the same state store.
      */
     void close();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d38c115/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1d97384..92b1069 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -134,6 +134,8 @@ public class ProcessorStateManager {
             retry--;
             lock = lockStateDirectory(channel);
         }
+        // TODO: closing the channel here risks releasing all locks on the file
+        // see {@link https://issues.apache.org/jira/browse/KAFKA-3812}
         if (lock == null) {
             channel.close();
         }
@@ -336,6 +338,8 @@ public class ProcessorStateManager {
      */
     public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
         try {
+            // attempting to flush and close the stores, just in case they
+            // are not closed by a ProcessorNode yet
             if (!stores.isEmpty()) {
                 log.debug("Closing stores.");
                 for (Map.Entry<String, StateStore> entry : stores.entrySet()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d38c115/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 37609a0..a6dc881 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -404,6 +404,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
 
     @Override
     public void flush() {
+        if (db == null) {
+            return;
+        }
+
         // flush of the cache entries if necessary
         flushCache();
 
@@ -424,6 +428,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K,
V> {
 
     @Override
     public void close() {
+
+        if (db == null) {
+            return;
+        }
+
         flush();
         db.close();
     }


Mime
View raw message