kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store (#4801)
Date Tue, 05 Jun 2018 18:49:42 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef41369  KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes
store (#4801)
ef41369 is described below

commit ef413699b658c939fa0a3930ddbface69bedd26e
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Tue Jun 5 14:49:36 2018 -0400

    KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store (#4801)
    
    While using an iterator from IQ, it's possible to get an InvalidStateStoreException if
the StreamThread closes the store during a range query.
    
    Added a unit test to SegmentIteratorTest for this condition.
    
    Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Guozhang Wang <wangguoz@gmail.com>
---
 .../streams/state/internals/RocksDBStore.java      | 54 +++++++++++++---------
 .../streams/state/internals/SegmentIterator.java   | 12 ++++-
 .../state/internals/RocksDBWindowStoreTest.java    | 19 ++------
 .../state/internals/SegmentIteratorTest.java       | 14 ++++++
 4 files changed, 60 insertions(+), 39 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d2b8cd2..ff6c56a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.AbstractIterator;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
@@ -441,46 +442,46 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
         }
     }
 
-    private class RocksDbIterator implements KeyValueIterator<Bytes, byte[]> {
+    private class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>>
implements KeyValueIterator<Bytes, byte[]> {
         private final String storeName;
         private final RocksIterator iter;
 
         private volatile boolean open = true;
 
+        private KeyValue<Bytes, byte[]> next;
+
         RocksDbIterator(final String storeName,
                         final RocksIterator iter) {
             this.iter = iter;
             this.storeName = storeName;
         }
 
-        byte[] peekRawKey() {
-            return iter.key();
-        }
-
-        private KeyValue<Bytes, byte[]> getKeyValue() {
-            return new KeyValue<>(new Bytes(iter.key()), iter.value());
-        }
-
         @Override
         public synchronized boolean hasNext() {
             if (!open) {
                 throw new InvalidStateStoreException(String.format("RocksDB store %s has
closed", storeName));
             }
-
-            return iter.isValid();
+            return super.hasNext();
         }
 
-        /**
-         * @throws NoSuchElementException if no next element exist
-         */
         @Override
         public synchronized KeyValue<Bytes, byte[]> next() {
-            if (!hasNext())
-                throw new NoSuchElementException();
+            return super.next();
+        }
 
-            final KeyValue<Bytes, byte[]> entry = this.getKeyValue();
-            iter.next();
-            return entry;
+        @Override
+        public KeyValue<Bytes, byte[]> makeNext() {
+            if (!iter.isValid()) {
+                return allDone();
+            } else {
+                next = this.getKeyValue();
+                iter.next();
+                return next;
+            }
+        }
+
+        private KeyValue<Bytes, byte[]> getKeyValue() {
+            return new KeyValue<>(new Bytes(iter.key()), iter.value());
         }
 
         @Override
@@ -500,7 +501,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
             if (!hasNext()) {
                 throw new NoSuchElementException();
             }
-            return new Bytes(iter.key());
+            return next.key;
         }
     }
 
@@ -524,8 +525,17 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>
{
         }
 
         @Override
-        public synchronized boolean hasNext() {
-            return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey)
<= 0;
+        public KeyValue<Bytes, byte[]> makeNext() {
+            final KeyValue<Bytes, byte[]> next = super.makeNext();
+
+            if (next == null) {
+                return allDone();
+            } else {
+                if (comparator.compare(next.key.get(), this.rawToKey) <= 0)
+                    return next;
+                else
+                    return allDone();
+            }
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 099cba1..331ffdb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -66,7 +66,7 @@ class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
     @Override
     public boolean hasNext() {
         boolean hasNext = false;
-        while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator))
|| !currentSegment.isOpen())
+        while ((currentIterator == null || !(hasNext = hasNextConditionHasNext()) || !currentSegment.isOpen())
                 && segments.hasNext()) {
             close();
             currentSegment = segments.next();
@@ -83,6 +83,16 @@ class SegmentIterator implements KeyValueIterator<Bytes, byte[]>
{
         return currentIterator != null && hasNext;
     }
 
+    private boolean hasNextConditionHasNext() {
+        boolean hasNext = false;
+        try {
+            hasNext = hasNextCondition.hasNext(currentIterator);
+        } catch (InvalidStateStoreException e) {
+            //already closed so ignore
+        }
+        return hasNext;
+    }
+
     public KeyValue<Bytes, byte[]> next() {
         if (!hasNext()) {
             throw new NoSuchElementException();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index be4ede8..c436e9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -61,7 +60,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 @SuppressWarnings("PointlessArithmeticExpression")
 public class RocksDBWindowStoreTest {
@@ -747,7 +745,7 @@ public class RocksDBWindowStoreTest {
     }
 
     @Test
-    public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext()
{
+    public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext()
{
         windowStore = createWindowStore(context);
         context.setRecordContext(createRecordContext(0));
         windowStore.put(1, "one", 1L);
@@ -757,20 +755,9 @@ public class RocksDBWindowStoreTest {
         final WindowStoreIterator<String> iterator = windowStore.fetch(1, 1L, 3L);
         assertTrue(iterator.hasNext());
         windowStore.close();
-        try {
-            //noinspection ResultOfMethodCallIgnored
-            iterator.hasNext();
-            fail("should have thrown InvalidStateStoreException on closed store");
-        } catch (final InvalidStateStoreException e) {
-            // ok
-        }
 
-        try {
-            iterator.next();
-            fail("should have thrown InvalidStateStoreException on closed store");
-        } catch (final InvalidStateStoreException e) {
-            // ok
-        }
+        assertFalse(iterator.hasNext());
+
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 7a7b266..8045964 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -31,6 +31,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.NoSuchElementException;
 
 import static org.junit.Assert.assertEquals;
@@ -105,6 +106,19 @@ public class SegmentIteratorTest {
     }
 
     @Test
+    public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() {
+        iterator = new SegmentIterator(Collections.singletonList(segmentOne).iterator(),
+                                       hasNextCondition,
+                                       Bytes.wrap("a".getBytes()),
+                                       Bytes.wrap("z".getBytes()));
+
+
+        iterator.currentIterator = segmentOne.all();
+        segmentOne.close();
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
     public void shouldOnlyIterateOverSegmentsInRange() {
         iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(),
                 hasNextCondition,

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message