kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7420: Global store surrounded by read only implementation (#5865)
Date Wed, 05 Dec 2018 19:26:12 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ec501f3  KAFKA-7420: Global store surrounded by read only implementation (#5865)
ec501f3 is described below

commit ec501f305e53a09072580fb3824048c170d32a48
Author: Nikolay <nizhikov@apache.org>
AuthorDate: Wed Dec 5 22:25:52 2018 +0300

    KAFKA-7420: Global store surrounded by read only implementation (#5865)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Kamal Chandraprakash (@kamalcph),
Bill Bejeck <bill@confluent.io>
---
 .../processor/internals/ProcessorContextImpl.java  | 181 ++++++++++++++
 .../internals/ProcessorContextImplTest.java        | 260 +++++++++++++++++++++
 2 files changed, 441 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 913e34e..c79ec35 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -19,13 +19,20 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.time.Duration;
@@ -65,6 +72,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements
Re
     /**
      * @throws StreamsException if an attempt is made to access this state store from an
unknown node
      */
+    @SuppressWarnings("unchecked")
     @Override
     public StateStore getStateStore(final String name) {
         if (currentNode() == null) {
@@ -73,6 +81,14 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements
Re
 
         final StateStore global = stateManager.getGlobalStore(name);
         if (global != null) {
+            if (global instanceof KeyValueStore) {
+                return new KeyValueStoreReadOnlyDecorator((KeyValueStore) global);
+            } else if (global instanceof WindowStore) {
+                return new WindowStoreReadOnlyDecorator((WindowStore) global);
+            } else if (global instanceof SessionStore) {
+                return new SessionStoreReadOnlyDecorator((SessionStore) global);
+            }
+
             return global;
         }
 
@@ -180,4 +196,169 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements
Re
         return streamTimeSupplier.get();
     }
 
+    private abstract static class StateStoreReadOnlyDecorator<T extends StateStore>
implements StateStore {
+        static final String ERROR_MESSAGE = "Global store is read only";
+
+        final T underlying;
+
+        StateStoreReadOnlyDecorator(final T underlying) {
+            this.underlying = underlying;
+        }
+
+        @Override
+        public String name() {
+            return underlying.name();
+        }
+
+        @Override
+        public void init(final ProcessorContext context, final StateStore root) {
+            underlying.init(context, root);
+        }
+
+        @Override
+        public void flush() {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void close() {
+            underlying.close();
+        }
+
+        @Override
+        public boolean persistent() {
+            return underlying.persistent();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return underlying.isOpen();
+        }
+    }
+
+    private static class KeyValueStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<KeyValueStore<K,
V>> implements KeyValueStore<K, V> {
+        KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> underlying) {
+            super(underlying);
+        }
+
+        @Override
+        public V get(final K key) {
+            return underlying.get(key);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(final K from, final K to) {
+            return underlying.range(from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return underlying.all();
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return underlying.approximateNumEntries();
+        }
+
+        @Override
+        public void put(final K key, final V value) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public V putIfAbsent(final K key, final V value) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void putAll(final List entries) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public V delete(final K key) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+    }
+
+    private static class WindowStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<WindowStore<K,
V>> implements WindowStore<K, V> {
+        WindowStoreReadOnlyDecorator(final WindowStore<K, V> underlying) {
+            super(underlying);
+        }
+
+        @Override
+        public void put(final K key, final V value) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void put(final K key, final V value, final long windowStartTimestamp) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public V fetch(final K key, final long time) {
+            return underlying.fetch(key, time);
+        }
+
+        @Deprecated
+        @Override
+        public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final
long timeTo) {
+            return underlying.fetch(key, timeFrom, timeTo);
+        }
+
+        @Deprecated
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to,
final long timeFrom, final long timeTo) {
+            return underlying.fetch(from, to, timeFrom, timeTo);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> all() {
+            return underlying.all();
+        }
+
+        @Deprecated
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
+            return underlying.fetchAll(timeFrom, timeTo);
+        }
+    }
+
+    private static class SessionStoreReadOnlyDecorator<K, AGG> extends StateStoreReadOnlyDecorator<SessionStore<K,
AGG>> implements SessionStore<K, AGG> {
+        SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> underlying) {
+            super(underlying);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final
long earliestSessionEndTime, final long latestSessionStartTime) {
+            return underlying.findSessions(key, earliestSessionEndTime, latestSessionStartTime);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
+            return underlying.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
+        }
+
+        @Override
+        public void remove(final Windowed sessionKey) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public void put(final Windowed<K> sessionKey, final AGG aggregate) {
+            throw new UnsupportedOperationException(ERROR_MESSAGE);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
+            return underlying.fetch(key);
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K
to) {
+            return underlying.fetch(from, to);
+        }
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
new file mode 100644
index 0000000..fa5f597
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.junit.Before;
+import org.junit.Test;
+
+import static java.util.Collections.emptySet;
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ProcessorContextImplTest {
+    private ProcessorContextImpl context;
+
+    private static final String KEY = "key";
+    private static final long VAL = 42L;
+    private static final String STORE_NAME = "underlying-store";
+
+    private boolean initExecuted;
+    private boolean closeExecuted;
+    private KeyValueIterator<String, Long> rangeIter;
+    private KeyValueIterator<String, Long> allIter;
+
+    private List<KeyValueIterator<Windowed<String>, Long>> iters = new
ArrayList<>(7);
+    private WindowStoreIterator<Long> windowStoreIter;
+
+    @Before
+    public void setup() {
+        rangeIter = mock(KeyValueIterator.class);
+        allIter = mock(KeyValueIterator.class);
+        windowStoreIter = mock(WindowStoreIterator.class);
+
+        for (int i = 0; i < 7; i++) {
+            iters.add(i, mock(KeyValueIterator.class));
+        }
+
+        final StreamsConfig streamsConfig = mock(StreamsConfig.class);
+        expect(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG)).andReturn("add-id");
+        expect(streamsConfig.defaultValueSerde()).andReturn(Serdes.ByteArray());
+        expect(streamsConfig.defaultKeySerde()).andReturn(Serdes.ByteArray());
+        replay(streamsConfig);
+
+        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
+
+        expect(stateManager.getGlobalStore("KeyValueStore")).andReturn(keyValueStoreMock());
+        expect(stateManager.getGlobalStore("WindowStore")).andReturn(windowStoreMock());
+        expect(stateManager.getGlobalStore("SessionStore")).andReturn(sessionStoreMock());
+
+        replay(stateManager);
+
+        context = new ProcessorContextImpl(
+            mock(TaskId.class),
+            mock(StreamTask.class),
+            streamsConfig,
+            mock(RecordCollector.class),
+            stateManager,
+            mock(StreamsMetricsImpl.class),
+            mock(ThreadCache.class)
+        );
+
+        context.setCurrentNode(new ProcessorNode<String, Long>("fake", null, emptySet()));
+    }
+
+    @Test
+    public void testKeyValueStore() {
+        doTest("KeyValueStore", (Consumer<KeyValueStore<String, Long>>) store
-> {
+            checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put");
+            checkThrowsUnsupportedOperation(() -> store.putIfAbsent("1", 1L), "putIfAbsent");
+            checkThrowsUnsupportedOperation(() -> store.putAll(Collections.emptyList()),
"putAll");
+            checkThrowsUnsupportedOperation(() -> store.delete("1"), "delete");
+
+            assertEquals((Long) VAL, store.get(KEY));
+            assertEquals(rangeIter, store.range("one", "two"));
+            assertEquals(allIter, store.all());
+            assertEquals(VAL, store.approximateNumEntries());
+        });
+    }
+
+    @Test
+    public void testWindowStore() {
+        doTest("WindowStore", (Consumer<WindowStore<String, Long>>) store ->
{
+            checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put");
+            checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put");
+
+            assertEquals(iters.get(0), store.fetchAll(0L, 0L));
+            assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
+            assertEquals(iters.get(1), store.fetch(KEY, KEY, 0L, 1L));
+            assertEquals((Long) VAL, store.fetch(KEY, 1L));
+            assertEquals(iters.get(2), store.all());
+        });
+    }
+
+    @Test
+    public void testSessionStore() {
+        doTest("SessionStore", (Consumer<SessionStore<String, Long>>) store ->
{
+            checkThrowsUnsupportedOperation(() -> store.remove(null), "remove");
+            checkThrowsUnsupportedOperation(() -> store.put(null, null), "put");
+
+            assertEquals(iters.get(3), store.findSessions(KEY, 1L, 2L));
+            assertEquals(iters.get(4), store.findSessions(KEY, KEY, 1L, 2L));
+            assertEquals(iters.get(5), store.fetch(KEY));
+            assertEquals(iters.get(6), store.fetch(KEY, KEY));
+        });
+    }
+
+    private KeyValueStore<String, Long> keyValueStoreMock() {
+        final KeyValueStore<String, Long> keyValueStoreMock = mock(KeyValueStore.class);
+
+        initStateStoreMock(keyValueStoreMock);
+
+        expect(keyValueStoreMock.get(KEY)).andReturn(VAL);
+        expect(keyValueStoreMock.approximateNumEntries()).andReturn(VAL);
+
+        expect(keyValueStoreMock.range("one", "two")).andReturn(rangeIter);
+        expect(keyValueStoreMock.all()).andReturn(allIter);
+
+        replay(keyValueStoreMock);
+
+        return keyValueStoreMock;
+    }
+
+    private WindowStore<String, Long> windowStoreMock() {
+        final WindowStore<String, Long> windowStore = mock(WindowStore.class);
+
+        initStateStoreMock(windowStore);
+
+        expect(windowStore.fetchAll(anyLong(), anyLong())).andReturn(iters.get(0));
+        expect(windowStore.fetch(anyString(), anyString(), anyLong(), anyLong())).andReturn(iters.get(1));
+        expect(windowStore.fetch(anyString(), anyLong(), anyLong())).andReturn(windowStoreIter);
+        expect(windowStore.fetch(anyString(), anyLong())).andReturn(VAL);
+        expect(windowStore.all()).andReturn(iters.get(2));
+
+        replay(windowStore);
+
+        return windowStore;
+    }
+
+    private SessionStore<String, Long> sessionStoreMock() {
+        final SessionStore<String, Long> sessionStore = mock(SessionStore.class);
+
+        initStateStoreMock(sessionStore);
+
+        expect(sessionStore.findSessions(anyString(), anyLong(), anyLong())).andReturn(iters.get(3));
+        expect(sessionStore.findSessions(anyString(), anyString(), anyLong(), anyLong())).andReturn(iters.get(4));
+        expect(sessionStore.fetch(anyString())).andReturn(iters.get(5));
+        expect(sessionStore.fetch(anyString(), anyString())).andReturn(iters.get(6));
+
+        replay(sessionStore);
+
+        return sessionStore;
+    }
+
+    private void initStateStoreMock(final StateStore windowStore) {
+        expect(windowStore.name()).andReturn(STORE_NAME);
+        expect(windowStore.persistent()).andReturn(true);
+        expect(windowStore.isOpen()).andReturn(true);
+
+        windowStore.init(null, null);
+        expectLastCall().andAnswer(() -> {
+            initExecuted = true;
+            return null;
+        });
+
+        windowStore.close();
+        expectLastCall().andAnswer(() -> {
+            closeExecuted = true;
+            return null;
+        });
+    }
+
+    private <T extends StateStore> void doTest(final String name, final Consumer<T>
checker) {
+        final Processor processor = new Processor<String, Long>() {
+            @Override
+            @SuppressWarnings("unchecked")
+            public void init(final ProcessorContext context) {
+                final T store = (T) context.getStateStore(name);
+
+                checkStateStoreMethods(store);
+
+                checker.accept(store);
+
+            }
+
+            @Override
+            public void process(final String k, final Long v) {
+                //No-op.
+            }
+
+            @Override
+            public void close() {
+                //No-op.
+            }
+        };
+
+        processor.init(context);
+    }
+
+    private void checkStateStoreMethods(final StateStore store) {
+        checkThrowsUnsupportedOperation(store::flush, "flush");
+
+        assertEquals(STORE_NAME, store.name());
+        assertTrue(store.persistent());
+        assertTrue(store.isOpen());
+
+        store.init(null, null);
+        assertTrue(initExecuted);
+
+        store.close();
+        assertTrue(closeExecuted);
+    }
+
+    private void checkThrowsUnsupportedOperation(final Runnable check, final String name)
{
+        try {
+            check.run();
+            fail(name + " should throw exception");
+        } catch (final UnsupportedOperationException e) {
+            //ignore.
+        }
+    }
+}


Mime
View raw message