kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: rename SessionStore.findSessionsToMerge to findSessions
Date Tue, 10 Jan 2017 17:38:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 275c5e1df -> fe82330f0


MINOR: rename SessionStore.findSessionsToMerge to findSessions

Rename `SessionStore.findSessionsToMerge` to `findSessions`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2339 from dguy/minor-findsession-rename


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe82330f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe82330f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe82330f

Branch: refs/heads/trunk
Commit: fe82330f0be1d478b134732a9d29c229b37afe3b
Parents: 275c5e1
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Jan 10 09:38:30 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jan 10 09:38:30 2017 -0800

----------------------------------------------------------------------
 .../internals/KStreamSessionWindowAggregate.java    |  6 +++---
 .../apache/kafka/streams/state/SessionStore.java    |  2 +-
 .../state/internals/CachingSessionStore.java        |  8 ++++----
 .../state/internals/RocksDBSessionStore.java        |  4 ++--
 .../KStreamSessionWindowAggregateProcessorTest.java | 16 ++++++++--------
 .../state/internals/CachingSessionStoreTest.java    | 12 ++++++------
 .../state/internals/RocksDBSessionStoreTest.java    | 10 +++++-----
 7 files changed, 29 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fe82330f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 1af1f48..bb86f52 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -92,8 +92,8 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
             TimeWindow mergedWindow = newTimeWindow;
             T agg = initializer.apply();
 
-            try (final KeyValueIterator<Windowed<K>, T> iterator = store.findSessionsToMerge(key,
timestamp - windows.inactivityGap(),
-                                                                                        
      timestamp + windows.inactivityGap())) {
+            try (final KeyValueIterator<Windowed<K>, T> iterator = store.findSessions(key,
timestamp - windows.inactivityGap(),
+                                                                                      timestamp
+ windows.inactivityGap())) {
                 while (iterator.hasNext()) {
                     final KeyValue<Windowed<K>, T> next = iterator.next();
                     merged.add(next);
@@ -149,7 +149,7 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
 
         @Override
         public T get(final Windowed<K> key) {
-            try (KeyValueIterator<Windowed<K>, T> iter = store.findSessionsToMerge(key.key(),
key.window().end(), key.window().end())) {
+            try (KeyValueIterator<Windowed<K>, T> iter = store.findSessions(key.key(),
key.window().end(), key.window().end())) {
                 if (!iter.hasNext()) {
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe82330f/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index 39658a3..bb82f6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -30,7 +30,7 @@ public interface SessionStore<K, AGG> extends StateStore, ReadOnlySessionStore<K
      * Fetch any sessions with the matching key and the sessions end is &le earliestEndTime
and the sessions
      * start is &ge latestStartTime
      */
-    KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key, long
earliestSessionEndTime, final long latestSessionStartTime);
+    KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, long earliestSessionEndTime,
final long latestSessionStartTime);
 
     /**
      * Remove the session aggregated with provided {@link Windowed} key from the store

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe82330f/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index a012c63..17c4ee0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -56,9 +56,9 @@ class CachingSessionStore<K, AGG>  implements SessionStore<K, AGG>,
CachedStateS
         this.keySchema = new SessionKeySchema();
     }
 
-    public KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key,
-                                                                  final long earliestSessionEndTime,
-                                                                  final long latestSessionStartTime)
{
+    public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+                                                           final long earliestSessionEndTime,
+                                                           final long latestSessionStartTime)
{
         validateStoreOpen();
         final Bytes binarySessionId = Bytes.wrap(keySerde.serializer().serialize(name, key));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(name,
@@ -89,7 +89,7 @@ class CachingSessionStore<K, AGG>  implements SessionStore<K, AGG>,
CachedStateS
 
     @Override
     public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
-        return findSessionsToMerge(key, 0, Long.MAX_VALUE);
+        return findSessions(key, 0, Long.MAX_VALUE);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe82330f/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index 73c825c..a8ddc73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -47,7 +47,7 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K, AGG>
{
 
     @SuppressWarnings("unchecked")
     @Override
-    public KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(final K key,
final long earliestSessionEndTime, final long latestSessionStartTime) {
+    public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final
long earliestSessionEndTime, final long latestSessionStartTime) {
         final KeyValueIterator<Bytes, byte[]> bytesIterator = bytesStore.fetch(Bytes.wrap(serdes.rawKey(key)),
earliestSessionEndTime, latestSessionStartTime);
         return new SessionStoreIterator(bytesIterator, serdes);
     }
@@ -100,7 +100,7 @@ class RocksDBSessionStore<K, AGG> implements SessionStore<K,
AGG> {
 
     @Override
     public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
-        return findSessionsToMerge(key, 0, Long.MAX_VALUE);
+        return findSessions(key, 0, Long.MAX_VALUE);
     }
 
     private static class SessionStoreIterator<K, AGG> implements KeyValueIterator<Windowed<K>,
AGG> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe82330f/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index c107c3e..ba95522 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -125,7 +125,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setTime(500);
         processor.process("john", "second");
 
-        final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessionsToMerge("john",
0, 2000);
+        final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions("john",
0, 2000);
         assertTrue(values.hasNext());
         assertEquals(Long.valueOf(2), values.next().value);
     }
@@ -136,19 +136,19 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setTime(0);
         final String sessionId = "mel";
         processor.process(sessionId, "first");
-        assertTrue(sessionStore.findSessionsToMerge(sessionId, 0, 0).hasNext());
+        assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());
 
         // move time beyond gap
         context.setTime(GAP_MS + 1);
         processor.process(sessionId, "second");
-        assertTrue(sessionStore.findSessionsToMerge(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext());
+        assertTrue(sessionStore.findSessions(sessionId, GAP_MS + 1, GAP_MS + 1).hasNext());
         // should still exist as not within gap
-        assertTrue(sessionStore.findSessionsToMerge(sessionId, 0, 0).hasNext());
+        assertTrue(sessionStore.findSessions(sessionId, 0, 0).hasNext());
         // move time back
         context.setTime(GAP_MS / 2);
         processor.process(sessionId, "third");
 
-        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessionsToMerge(sessionId,
0, GAP_MS + 1);
+        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions(sessionId,
0, GAP_MS + 1);
         final KeyValue<Windowed<String>, Long> kv = iterator.next();
 
         assertEquals(Long.valueOf(3), kv.value);
@@ -160,7 +160,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         context.setTime(0);
         processor.process("mel", "first");
         processor.process("mel", "second");
-        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessionsToMerge("mel",
0, 0);
+        final KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("mel",
0, 0);
         assertEquals(Long.valueOf(2L), iterator.next().value);
         assertFalse(iterator.hasNext());
     }
@@ -196,14 +196,14 @@ public class KStreamSessionWindowAggregateProcessorTest {
         processor.process("a", "1");
 
         // first ensure it is in the store
-        final KeyValueIterator<Windowed<String>, Long> a1 = sessionStore.findSessionsToMerge("a",
0, 0);
+        final KeyValueIterator<Windowed<String>, Long> a1 = sessionStore.findSessions("a",
0, 0);
         assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L),
a1.next());
 
         context.setTime(100);
         processor.process("a", "2");
         // a1 from above should have been removed
         // should have merged session in store
-        final KeyValueIterator<Windowed<String>, Long> a2 = sessionStore.findSessionsToMerge("a",
0, 100);
+        final KeyValueIterator<Windowed<String>, Long> a2 = sessionStore.findSessions("a",
0, 100);
         assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 100)), 2L),
a2.next());
         assertFalse(a2.hasNext());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe82330f/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index cb6f87e..d453316 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -72,8 +72,8 @@ public class CachingSessionStoreTest {
         cachingStore.put(new Windowed<>("aa", new TimeWindow(0, 0)), 1L);
         cachingStore.put(new Windowed<>("b", new TimeWindow(0, 0)), 1L);
 
-        final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessionsToMerge("a",
0, 0);
-        final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessionsToMerge("b",
0, 0);
+        final KeyValueIterator<Windowed<String>, Long> a = cachingStore.findSessions("a",
0, 0);
+        final KeyValueIterator<Windowed<String>, Long> b = cachingStore.findSessions("b",
0, 0);
 
         assertEquals(KeyValue.pair(new Windowed<>("a", new TimeWindow(0, 0)), 1L),
a.next());
         assertEquals(KeyValue.pair(new Windowed<>("b", new TimeWindow(0, 0)), 1L),
b.next());
@@ -115,7 +115,7 @@ public class CachingSessionStoreTest {
     @Test
     public void shouldQueryItemsInCacheAndStore() throws Exception {
         final List<KeyValue<Windowed<String>, Long>> added = addSessionsUntilOverflow("a");
-        final KeyValueIterator<Windowed<String>, Long> iterator = cachingStore.findSessionsToMerge("a",
0, added.size() * 10);
+        final KeyValueIterator<Windowed<String>, Long> iterator = cachingStore.findSessions("a",
0, added.size() * 10);
         final List<KeyValue<Windowed<String>, Long>> actual = toList(iterator);
         assertEquals(added, actual);
     }
@@ -129,7 +129,7 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
         cachingStore.remove(a);
         cachingStore.flush();
-        final KeyValueIterator<Windowed<String>, Long> rangeIter = cachingStore.findSessionsToMerge("a",
0, 0);
+        final KeyValueIterator<Windowed<String>, Long> rangeIter = cachingStore.findSessions("a",
0, 0);
         assertFalse(rangeIter.hasNext());
     }
 
@@ -142,7 +142,7 @@ public class CachingSessionStoreTest {
         cachingStore.put(a2, 2L);
         cachingStore.put(a3, 3L);
         cachingStore.flush();
-        final KeyValueIterator<Windowed<String>, Long> results = cachingStore.findSessionsToMerge("a",
0, Segments.MIN_SEGMENT_INTERVAL * 2);
+        final KeyValueIterator<Windowed<String>, Long> results = cachingStore.findSessions("a",
0, Segments.MIN_SEGMENT_INTERVAL * 2);
         assertEquals(a1, results.next().key);
         assertEquals(a2, results.next().key);
         assertEquals(a3, results.next().key);
@@ -167,7 +167,7 @@ public class CachingSessionStoreTest {
     @Test(expected = InvalidStateStoreException.class)
     public void shouldThrowIfTryingToFindMergeSessionFromClosedCachingStore() throws Exception
{
         cachingStore.close();
-        cachingStore.findSessionsToMerge("a", 0, Long.MAX_VALUE);
+        cachingStore.findSessions("a", 0, Long.MAX_VALUE);
     }
 
     @Test(expected = InvalidStateStoreException.class)

http://git-wip-us.apache.org/repos/asf/kafka/blob/fe82330f/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 11766c7..a664e3b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -78,7 +78,7 @@ public class RocksDBSessionStoreTest {
         final List<KeyValue<Windowed<String>, Long>> expected
                 = Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
 
-        final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessionsToMerge(key,
0, 1000L);
+        final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(key,
0, 1000L);
         assertEquals(expected, toList(values));
     }
 
@@ -107,7 +107,7 @@ public class RocksDBSessionStoreTest {
         final String key = "a";
         sessionStore.put(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L);
         sessionStore.put(new Windowed<>(key, new TimeWindow(1000L, 1000L)), 2L);
-        final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessionsToMerge(key,
-1, 1000L);
+        final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions(key,
-1, 1000L);
 
         final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
                 KeyValue.pair(new Windowed<>(key, new TimeWindow(0L, 0L)), 1L),
@@ -121,9 +121,9 @@ public class RocksDBSessionStoreTest {
         sessionStore.put(new Windowed<>("a", new TimeWindow(1500, 2500)), 2L);
 
         sessionStore.remove(new Windowed<>("a", new TimeWindow(0, 1000)));
-        assertFalse(sessionStore.findSessionsToMerge("a", 0, 1000L).hasNext());
+        assertFalse(sessionStore.findSessions("a", 0, 1000L).hasNext());
 
-        assertTrue(sessionStore.findSessionsToMerge("a", 1500, 2500).hasNext());
+        assertTrue(sessionStore.findSessions("a", 1500, 2500).hasNext());
     }
 
     @Test
@@ -138,7 +138,7 @@ public class RocksDBSessionStoreTest {
         sessionStore.put(session3, 3L);
         sessionStore.put(session4, 4L);
         sessionStore.put(session5, 5L);
-        final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessionsToMerge("a",
150, 300);
+        final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions("a",
150, 300);
         assertEquals(session2, results.next().key);
         assertEquals(session3, results.next().key);
         assertFalse(results.hasNext());


Mime
View raw message