kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/5] kafka git commit: KAFKA-3452 Follow-up: Optimize ByteStore Scenarios
Date Fri, 03 Feb 2017 19:12:54 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d95f22c12 -> 7ebc5da60


http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 499d823..7b112e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -52,17 +52,17 @@ public class ThreadCacheTest {
                 toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(),
""),
             new MockStreamsMetrics(new Metrics()));
 
-        for (int i = 0; i < toInsert.size(); i++) {
-            byte[] key = toInsert.get(i).key.getBytes();
-            byte[] value = toInsert.get(i).value.getBytes();
+        for (KeyValue<String, String> kvToInsert : toInsert) {
+            Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
+            byte[] value = kvToInsert.value.getBytes();
             cache.put(name, key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
         }
 
-        for (int i = 0; i < toInsert.size(); i++) {
-            byte[] key = toInsert.get(i).key.getBytes();
+        for (KeyValue<String, String> kvToInsert : toInsert) {
+            Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             LRUCacheEntry entry = cache.get(name, key);
-            assertEquals(entry.isDirty, true);
-            assertEquals(new String(entry.value), toInsert.get(i).value);
+            assertEquals(entry.isDirty(), true);
+            assertEquals(new String(entry.value), kvToInsert.value);
         }
         assertEquals(cache.gets(), 5);
         assertEquals(cache.puts(), 5);
@@ -73,10 +73,8 @@ public class ThreadCacheTest {
     private void checkOverheads(double entryFactor, double systemFactor, long desiredCacheSize,
int keySizeBytes,
                             int valueSizeBytes) {
         Runtime runtime = Runtime.getRuntime();
-        byte[] key = new byte[keySizeBytes];
-        byte[] value = new byte[valueSizeBytes];
         final String name = "name";
-        long numElements = desiredCacheSize / memoryCacheEntrySize(key, value, "");
+        long numElements = desiredCacheSize / memoryCacheEntrySize(new byte[keySizeBytes],
new byte[valueSizeBytes], "");
 
         System.gc();
         long prevRuntimeMemory = runtime.totalMemory() - runtime.freeMemory();
@@ -86,8 +84,8 @@ public class ThreadCacheTest {
         assertEquals(size, 0);
         for (int i = 0; i < numElements; i++) {
             String keyStr = "K" + i;
-            key = keyStr.getBytes();
-            value = new byte[valueSizeBytes];
+            Bytes key = Bytes.wrap(keyStr.getBytes());
+            byte[] value = new byte[valueSizeBytes];
             cache.put(name, key, new LRUCacheEntry(value, true, 1L, 1L, 1, ""));
         }
 
@@ -144,7 +142,7 @@ public class ThreadCacheTest {
     @Test
     public void evict() throws IOException {
         final List<KeyValue<String, String>> received = new ArrayList<>();
-        List<KeyValue<String, String>> expected = Arrays.asList(
+        List<KeyValue<String, String>> expected = Collections.singletonList(
                 new KeyValue<>("K1", "V1"));
 
         List<KeyValue<String, String>> toInsert = Arrays.asList(
@@ -168,10 +166,9 @@ public class ThreadCacheTest {
 
         });
 
-
-        for (int i = 0; i < toInsert.size(); i++) {
-            byte[] key = toInsert.get(i).key.getBytes();
-            byte[] value = toInsert.get(i).value.getBytes();
+        for (KeyValue<String, String> kvToInsert : toInsert) {
+            final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
+            final byte[] value = kvToInsert.value.getBytes();
             cache.put(namespace, key, new LRUCacheEntry(value, true, 1, 1, 1, ""));
         }
 
@@ -186,16 +183,16 @@ public class ThreadCacheTest {
     @Test
     public void shouldDelete() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
-        final byte[] key = new byte[]{0};
+        final Bytes key = Bytes.wrap(new byte[]{0});
 
-        cache.put("name", key, dirtyEntry(key));
-        assertEquals(key, cache.delete("name", key).value);
+        cache.put("name", key, dirtyEntry(key.get()));
+        assertEquals(key.get(), cache.delete("name", key).value);
         assertNull(cache.get("name", key));
     }
 
     @Test
     public void shouldNotFlushAfterDelete() throws Exception {
-        final byte[] key = new byte[]{0};
+        final Bytes key = Bytes.wrap(new byte[]{0});
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         final String namespace = "namespace";
@@ -205,8 +202,8 @@ public class ThreadCacheTest {
                 received.addAll(dirty);
             }
         });
-        cache.put(namespace, key, dirtyEntry(key));
-        assertEquals(key, cache.delete(namespace, key).value);
+        cache.put(namespace, key, dirtyEntry(key.get()));
+        assertEquals(key.get(), cache.delete(namespace, key).value);
 
         // flushing should have no further effect
         cache.flush(namespace);
@@ -216,63 +213,63 @@ public class ThreadCacheTest {
 
     @Test
     public void shouldNotBlowUpOnNonExistentKeyWhenDeleting() throws Exception {
+        final Bytes key = Bytes.wrap(new byte[]{0});
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
-        final byte[] key = new byte[]{0};
 
-        cache.put("name", key, dirtyEntry(key));
-        assertNull(cache.delete("name", new byte[]{1}));
+        cache.put("name", key, dirtyEntry(key.get()));
+        assertNull(cache.delete("name", Bytes.wrap(new byte[]{1})));
     }
 
     @Test
     public void shouldNotBlowUpOnNonExistentNamespaceWhenDeleting() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
-        assertNull(cache.delete("name", new byte[]{1}));
+        assertNull(cache.delete("name", Bytes.wrap(new byte[]{1})));
     }
 
     @Test
     public void shouldNotClashWithOverlappingNames() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
-        final byte[] nameByte = new byte[]{0};
-        final byte[] name1Byte = new byte[]{1};
-        cache.put("name", nameByte, dirtyEntry(nameByte));
-        cache.put("name1", nameByte, dirtyEntry(name1Byte));
+        final Bytes nameByte = Bytes.wrap(new byte[]{0});
+        final Bytes name1Byte = Bytes.wrap(new byte[]{1});
+        cache.put("name", nameByte, dirtyEntry(nameByte.get()));
+        cache.put("name1", nameByte, dirtyEntry(name1Byte.get()));
 
-        assertArrayEquals(nameByte, cache.get("name", nameByte).value);
-        assertArrayEquals(name1Byte, cache.get("name1", nameByte).value);
+        assertArrayEquals(nameByte.get(), cache.get("name", nameByte).value);
+        assertArrayEquals(name1Byte.get(), cache.get("name1", nameByte).value);
     }
 
     @Test
     public void shouldPeekNextKey() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
-        final byte[] theByte = {0};
+        final Bytes theByte = Bytes.wrap(new byte[]{0});
         final String namespace = "streams";
-        cache.put(namespace, theByte, dirtyEntry(theByte));
-        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte,
new byte[]{1});
-        assertEquals(Bytes.wrap(theByte), iterator.peekNextKey());
-        assertEquals(Bytes.wrap(theByte), iterator.peekNextKey());
+        cache.put(namespace, theByte, dirtyEntry(theByte.get()));
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte,
Bytes.wrap(new byte[]{1}));
+        assertEquals(theByte, iterator.peekNextKey());
+        assertEquals(theByte, iterator.peekNextKey());
     }
 
     @Test
     public void shouldGetSameKeyAsPeekNext() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
-        final byte[] theByte = {0};
+        final Bytes theByte = Bytes.wrap(new byte[]{0});
         final String namespace = "streams";
-        cache.put(namespace, theByte, dirtyEntry(theByte));
-        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte,
new byte[]{1});
+        cache.put(namespace, theByte, dirtyEntry(theByte.get()));
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte,
Bytes.wrap(new byte[]{1}));
         assertEquals(iterator.peekNextKey(), iterator.next().key);
     }
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowIfNoPeekNextKey() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
-        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", new byte[]{0},
new byte[]{1});
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", Bytes.wrap(new
byte[]{0}), Bytes.wrap(new byte[]{1}));
         iterator.peekNextKey();
     }
 
     @Test
     public void shouldReturnFalseIfNoNextKey() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new
Metrics()));
-        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", new byte[]{0},
new byte[]{1});
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", Bytes.wrap(new
byte[]{0}), Bytes.wrap(new byte[]{1}));
         assertFalse(iterator.hasNext());
     }
 
@@ -282,9 +279,9 @@ public class ThreadCacheTest {
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
         final String namespace = "streams";
         for (final byte[] aByte : bytes) {
-            cache.put(namespace, aByte, dirtyEntry(aByte));
+            cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte));
         }
-        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, new
byte[]{1}, new byte[]{4});
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new
byte[]{1}), Bytes.wrap(new byte[]{4}));
         int bytesIndex = 1;
         while (iterator.hasNext()) {
             Bytes peekedKey = iterator.peekNextKey();
@@ -309,13 +306,13 @@ public class ThreadCacheTest {
         });
         byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}};
         for (int i = 0; i < 5; i++) {
-            cache.put(namespace, bytes[i], dirtyEntry(bytes[i]));
+            cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i]));
         }
         assertEquals(5, cache.size());
 
-        final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, new
byte[]{0}, new byte[]{5});
+        final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new
byte[]{0}), Bytes.wrap(new byte[]{5}));
         // should evict byte[] {0}
-        cache.put(namespace, new byte[]{6}, dirtyEntry(new byte[]{6}));
+        cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
 
         assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
     }
@@ -334,9 +331,9 @@ public class ThreadCacheTest {
         });
         final List<byte[]> expected = Arrays.asList(new byte[]{0}, new byte[]{1}, new
byte[]{2});
         for (byte[] bytes : expected) {
-            cache.put("1", bytes, dirtyEntry(bytes));
+            cache.put("1", Bytes.wrap(bytes), dirtyEntry(bytes));
         }
-        cache.put("2", new byte[]{4}, dirtyEntry(new byte[]{4}));
+        cache.put("2", Bytes.wrap(new byte[]{4}), dirtyEntry(new byte[]{4}));
 
         cache.flush("1");
         assertEquals(expected, received);
@@ -356,9 +353,9 @@ public class ThreadCacheTest {
         });
         final List<byte[]> toInsert =  Arrays.asList(new byte[]{0}, new byte[]{1},
new byte[]{2});
         for (byte[] bytes : toInsert) {
-            cache.put("1", bytes, cleanEntry(bytes));
+            cache.put("1", Bytes.wrap(bytes), cleanEntry(bytes));
         }
-        cache.put("2", new byte[]{4}, cleanEntry(new byte[]{4}));
+        cache.put("2", Bytes.wrap(new byte[]{4}), cleanEntry(new byte[]{4}));
 
         cache.flush("1");
         assertEquals(Collections.EMPTY_LIST, received);
@@ -374,7 +371,7 @@ public class ThreadCacheTest {
                 received.addAll(dirty);
             }
         });
-        cache.put(namespace, new byte[]{0}, dirtyEntry(new byte[]{0}));
+        cache.put(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{0}));
         assertEquals(1, received.size());
 
         // flushing should have no further effect
@@ -406,21 +403,22 @@ public class ThreadCacheTest {
             }
         });
 
-        cache.putAll(namespace, Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new
byte[]{5})),
-            KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6}))));
+        cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new
byte[]{5})),
+                                              KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new
byte[]{6}))));
 
         assertEquals(cache.evicts(), 2);
+        assertEquals(received.size(), 2);
     }
 
     @Test
     public void shouldPutAll() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new
Metrics()));
 
-        cache.putAll("name", Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new byte[]{5})),
-                                           KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6}))));
+        cache.putAll("name", Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new
byte[]{5})),
+                                           KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new
byte[]{6}))));
 
-        assertArrayEquals(new byte[]{5}, cache.get("name", new byte[]{0}).value);
-        assertArrayEquals(new byte[]{6}, cache.get("name", new byte[]{1}).value);
+        assertArrayEquals(new byte[]{5}, cache.get("name", Bytes.wrap(new byte[]{0})).value);
+        assertArrayEquals(new byte[]{6}, cache.get("name", Bytes.wrap(new byte[]{1})).value);
     }
 
     @Test
@@ -433,13 +431,13 @@ public class ThreadCacheTest {
                 received.addAll(dirty);
             }
         });
-        cache.put("name", new byte[] {1}, cleanEntry(new byte[]{0}));
+        cache.put("name", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0}));
         assertEquals(0, received.size());
     }
     @Test
     public void shouldPutIfAbsent() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new
Metrics()));
-        final byte[] key = {10};
+        final Bytes key = Bytes.wrap(new byte[]{10});
         final byte[] value = {30};
         assertNull(cache.putIfAbsent("n", key, dirtyEntry(value)));
         assertArrayEquals(value, cache.putIfAbsent("n", key, dirtyEntry(new byte[]{8})).value);
@@ -458,11 +456,12 @@ public class ThreadCacheTest {
             }
         });
 
-        cache.putIfAbsent(namespace, new byte[]{0}, dirtyEntry(new byte[]{5}));
-        cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
-        cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+        cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5}));
+        cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}));
+        cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}));
 
         assertEquals(cache.evicts(), 3);
+        assertEquals(received.size(), 3);
     }
 
     @Test
@@ -475,7 +474,7 @@ public class ThreadCacheTest {
             public void apply(final List<ThreadCache.DirtyEntry> dirty) {
                 // put an item into an empty cache when the total cache size
                 // is already > than maxCacheSizeBytes
-                threadCache.put("other", new byte[]{0}, dirtyEntry(new byte[2]));
+                threadCache.put("other", Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
             }
         });
         threadCache.addDirtyEntryFlushListener("other", new ThreadCache.DirtyEntryFlushListener()
{
@@ -491,29 +490,29 @@ public class ThreadCacheTest {
             }
         });
 
-        threadCache.put("another", new byte[]{1}, dirtyEntry(new byte[1]));
-        threadCache.put("name", new byte[]{1}, dirtyEntry(new byte[1]));
+        threadCache.put("another", Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
+        threadCache.put("name", Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
         // Put a large item such that when the eldest item is removed
         // cache sizeInBytes() > maxCacheSizeBytes
         int remaining = (int) (maxCacheSizeInBytes - threadCache.sizeBytes());
-        threadCache.put("name", new byte[]{2}, dirtyEntry(new byte[remaining + 100]));
+        threadCache.put("name", Bytes.wrap(new byte[]{2}), dirtyEntry(new byte[remaining
+ 100]));
     }
 
     @Test
     public void shouldCleanupNamedCacheOnClose() throws Exception {
         final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new
Metrics()));
-        cache.put("one", new byte[]{1}, cleanEntry(new byte[] {1}));
-        cache.put("two", new byte[]{1}, cleanEntry(new byte[] {1}));
+        cache.put("one", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
+        cache.put("two", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
         assertEquals(cache.size(), 2);
         cache.close("two");
         assertEquals(cache.size(), 1);
-        assertNull(cache.get("two", new byte[] {1}));
+        assertNull(cache.get("two", Bytes.wrap(new byte[]{1})));
     }
 
     @Test
     public void shouldReturnNullIfKeyIsNull() throws Exception {
         final ThreadCache threadCache = new ThreadCache("testCache", 10, new MockStreamsMetrics(new
Metrics()));
-        threadCache.put("one", new byte[]{1}, cleanEntry(new byte[] {1}));
+        threadCache.put("one", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1}));
         assertNull(threadCache.get("one", null));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
index e0cb3ae..3a40b44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreUtilsTest.java
@@ -19,6 +19,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.junit.Test;
 
@@ -32,13 +33,12 @@ public class WindowStoreUtilsTest {
         final String key = "key1";
         final long timestamp = 99L;
         final int seqNum = 3;
-        byte[] bytes = WindowStoreUtils.toBinaryKey(key, timestamp, seqNum, serdes);
-        final String parsedKey = WindowStoreUtils.keyFromBinaryKey(bytes, serdes);
-        final long parsedTs = WindowStoreUtils.timestampFromBinaryKey(bytes);
-        final int parsedSeqNum = WindowStoreUtils.sequenceNumberFromBinaryKey(bytes);
+        Bytes bytes = WindowStoreUtils.toBinaryKey(key, timestamp, seqNum, serdes);
+        final String parsedKey = WindowStoreUtils.keyFromBinaryKey(bytes.get(), serdes);
+        final long parsedTs = WindowStoreUtils.timestampFromBinaryKey(bytes.get());
+        final int parsedSeqNum = WindowStoreUtils.sequenceNumberFromBinaryKey(bytes.get());
         assertEquals(key, parsedKey);
         assertEquals(timestamp, parsedTs);
         assertEquals(seqNum, parsedSeqNum);
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7ebc5da6/streams/src/test/java/org/apache/kafka/test/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/InMemoryKeyValueStore.java b/streams/src/test/java/org/apache/kafka/test/InMemoryKeyValueStore.java
deleted file mode 100644
index fc9b32b..0000000
--- a/streams/src/test/java/org/apache/kafka/test/InMemoryKeyValueStore.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements.  See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License.  You may
obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
-
-public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
-    private final TreeMap<K, V> map = new TreeMap<>();
-    private final String name;
-    private boolean open = true;
-
-    public InMemoryKeyValueStore(final String name) {
-        this.name = name;
-    }
-
-    @Override
-    public void put(final K key, final V value) {
-        map.put(key, value);
-    }
-
-    @Override
-    public V putIfAbsent(final K key, final V value) {
-        V orig = map.get(key);
-        if (orig == null) {
-            map.put(key, value);
-        }
-        return orig;
-    }
-
-    @Override
-    public void putAll(final List<KeyValue<K, V>> entries) {
-        for (KeyValue<K, V> entry : entries) {
-            map.put(entry.key, entry.value);
-        }
-    }
-
-    @Override
-    public V delete(final K key) {
-        return map.remove(key);
-    }
-
-    @Override
-    public long approximateNumEntries() {
-        return map.size();
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public void init(final ProcessorContext context, final StateStore root) {
-        // no-op
-    }
-
-    @Override
-    public void flush() {
-        //no-op
-    }
-
-    @Override
-    public void close() {
-        open = false;
-    }
-
-    @Override
-    public boolean persistent() {
-        return false;
-    }
-
-    @Override
-    public boolean isOpen() {
-        return open;
-    }
-
-    @Override
-    public V get(final K key) {
-        return map.get(key);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(final K from, final K to) {
-        return new DelegatingPeekingKeyValueIterator<>(name, new TheIterator(this.map.subMap(from,
true, to, true).entrySet().iterator()));
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return new DelegatingPeekingKeyValueIterator<>(name(), new TheIterator(map.entrySet().iterator()));
-    }
-
-    private class TheIterator implements KeyValueIterator<K, V> {
-
-        private final Iterator<Map.Entry<K, V>> underlying;
-
-        public TheIterator(final Iterator<Map.Entry<K, V>> iterator) {
-            this.underlying = iterator;
-        }
-
-        @Override
-        public void close() {
-
-        }
-
-        @Override
-        public K peekNextKey() {
-            throw new UnsupportedOperationException("peekNextKey not supported");
-        }
-
-        @Override
-        public boolean hasNext() {
-            return underlying.hasNext();
-        }
-
-        @Override
-        public KeyValue<K, V> next() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
-            }
-            Map.Entry<K, V> next = underlying.next();
-            return new KeyValue<>(next.getKey(), next.getValue());
-        }
-
-        @Override
-        public void remove() {
-
-        }
-    }
-}


Mime
View raw message