kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-5651; Follow-up: add with method to Materialized
Date Fri, 06 Oct 2017 22:38:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 105ab47ed -> 23a014052


http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index e75ef5b..619ee96 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -143,6 +143,20 @@ public class SessionWindowedKStreamImplTest {
 
     @SuppressWarnings("unchecked")
     @Test
+    public void shouldMaterializeWithoutSpecifyingSerdes() {
+        stream.count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("count-store"));
+
+        processData();
+        final SessionStore<String, Long> store = (SessionStore<String, Long>)
driver.allStateStores().get("count-store");
+        final List<KeyValue<Windowed<String>, Long>> data = StreamsTestUtils.toList(store.fetch("1",
"2"));
+        assertThat(data, equalTo(Arrays.asList(
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(10, 15)), 2L),
+                KeyValue.pair(new Windowed<>("1", new SessionWindow(600, 600)), 1L),
+                KeyValue.pair(new Windowed<>("2", new SessionWindow(600, 600)), 1L))));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
     public void shouldMaterializeReduced() {
         stream.reduce(MockReducer.STRING_ADDER,
                       Materialized.<String, String, SessionStore<Bytes, byte[]>>as("reduced")
@@ -249,7 +263,7 @@ public class SessionWindowedKStreamImplTest {
     }
 
     private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(),
0);
+        driver.setUp(builder, TestUtils.tempDirectory(), 0);
         driver.setTime(10);
         driver.process(TOPIC, "1", "1");
         driver.setTime(15);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 93bcf33..286a823 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -227,7 +227,7 @@ public class TimeWindowedKStreamImplTest {
     }
 
     private void processData() {
-        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(),
0);
+        driver.setUp(builder, TestUtils.tempDirectory(), 0);
         driver.setTime(10);
         driver.process(TOPIC, "1", "1");
         driver.setTime(15);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23a01405/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 21a5d57..9ba86ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
 import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
 import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.StateStore;
@@ -32,20 +33,30 @@ import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
 import org.hamcrest.CoreMatchers;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
 
-
+@RunWith(EasyMockRunner.class)
 public class KeyValueStoreMaterializerTest {
 
+    private final String storePrefix = "prefix";
+    @Mock(type = MockType.NICE)
+    private InternalNameProvider nameProvider;
+
     @Test
     public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>>
materialized
-                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("store"));
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("store"),
+                                             nameProvider,
+                                             storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -60,7 +71,7 @@ public class KeyValueStoreMaterializerTest {
     public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>>
materialized
                 = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("store")
-                                                     .withCachingDisabled());
+                                                     .withCachingDisabled(), nameProvider,
storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -72,7 +83,7 @@ public class KeyValueStoreMaterializerTest {
     public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>>
materialized
                 = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("store")
-                                                     .withLoggingDisabled());
+                                                     .withLoggingDisabled(), nameProvider,
storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -86,7 +97,7 @@ public class KeyValueStoreMaterializerTest {
         final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>>
materialized
                 = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("store")
                                                      .withCachingDisabled()
-                                                     .withLoggingDisabled());
+                                                     .withLoggingDisabled(), nameProvider,
storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -104,7 +115,7 @@ public class KeyValueStoreMaterializerTest {
         EasyMock.replay(supplier);
 
         final MaterializedInternal<String, Integer, KeyValueStore<Bytes, byte[]>>
materialized
-                = new MaterializedInternal<>(Materialized.<String, Integer>as(supplier));
+                = new MaterializedInternal<>(Materialized.<String, Integer>as(supplier),
nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, Integer> materializer = new KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, Integer>> builder = materializer.materialize();
         final KeyValueStore<String, Integer> built = builder.build();


Mime
View raw message