kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: improve Streams error message (#5975)
Date Mon, 17 Dec 2018 12:57:14 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c441528  MINOR: improve Streams error message (#5975)
c441528 is described below

commit c441528b93a05da58ad1a8599c4de35e2c14ff16
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Mon Dec 17 13:57:01 2018 +0100

    MINOR: improve Streams error message (#5975)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../apache/kafka/streams/state/internals/RocksDBStore.java  | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 8c28115..e19bb6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -42,6 +42,8 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -66,6 +68,8 @@ import java.util.regex.Pattern;
  */
 public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
 
+    private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class);
+
     private static final Pattern SST_FILE_EXTENSION = Pattern.compile(".*\\.sst");
 
     private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
@@ -422,8 +426,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
         synchronized (openIterators) {
             iterators = new HashSet<>(openIterators);
         }
-        for (final KeyValueIterator iterator : iterators) {
-            iterator.close();
+        if (iterators.size() != 0) {
+            log.warn("Closing {} open iterators for store {}", iterators.size(), name);
+            for (final KeyValueIterator iterator : iterators) {
+                iterator.close();
+            }
         }
     }
 
@@ -444,7 +451,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
         @Override
         public synchronized boolean hasNext() {
             if (!open) {
-                throw new InvalidStateStoreException(String.format("RocksDB store %s has
closed", storeName));
+                throw new InvalidStateStoreException(String.format("RocksDB iterator for
store %s has closed", storeName));
             }
             return super.hasNext();
         }


Mime
View raw message