kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [1/2] kafka git commit: KAFKA-5949; User Callback Exceptions need to be handled properly
Date Thu, 28 Sep 2017 10:00:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2703fda52 -> e5f2471c5


http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index f3135d5..ede6dd4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -105,7 +105,7 @@ public class ProcessorStateManagerTest {
         final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
 
         try {
-            stateMgr.register(persistentStore, true, batchingRestoreCallback);
+            stateMgr.register(persistentStore, batchingRestoreCallback);
             stateMgr.updateStandbyStates(persistentStorePartition, Collections.singletonList(consumerRecord));
             assertThat(batchingRestoreCallback.getRestoredRecords().size(), is(1));
             assertTrue(batchingRestoreCallback.getRestoredRecords().contains(expectedKeyValue));
@@ -123,7 +123,7 @@ public class ProcessorStateManagerTest {
         final ProcessorStateManager stateMgr = getStandByStateManager(taskId);
 
         try {
-            stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+            stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
             stateMgr.updateStandbyStates(persistentStorePartition, Collections.singletonList(consumerRecord));
             assertThat(persistentStore.keys.size(), is(1));
             assertTrue(persistentStore.keys.contains(intKey));
@@ -153,7 +153,7 @@ public class ProcessorStateManagerTest {
                 logContext);
 
         try {
-            stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+            stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new TopicPartition(persistentStoreTopicName,
2)));
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@@ -180,7 +180,7 @@ public class ProcessorStateManagerTest {
                 logContext);
 
         try {
-            stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
+            stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
             assertTrue(changelogReader.wasRegistered(new TopicPartition(nonPersistentStoreTopicName,
2)));
         } finally {
             stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
@@ -229,9 +229,9 @@ public class ProcessorStateManagerTest {
                 logContext);
 
         try {
-            stateMgr.register(store1, true, store1.stateRestoreCallback);
-            stateMgr.register(store2, true, store2.stateRestoreCallback);
-            stateMgr.register(store3, true, store3.stateRestoreCallback);
+            stateMgr.register(store1, store1.stateRestoreCallback);
+            stateMgr.register(store2, store2.stateRestoreCallback);
+            stateMgr.register(store3, store3.stateRestoreCallback);
 
             final Map<TopicPartition, Long> changeLogOffsets = stateMgr.checkpointed();
 
@@ -261,7 +261,7 @@ public class ProcessorStateManagerTest {
             false,
                 logContext);
         try {
-            stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
+            stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
 
             assertNull(stateMgr.getStore("noSuchStore"));
             assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName));
@@ -299,8 +299,8 @@ public class ProcessorStateManagerTest {
             // make sure the checkpoint file isn't deleted
             assertTrue(checkpointFile.exists());
 
-            stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
-            stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
+            stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
+            stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
         } finally {
             // close the state manager with the ack'ed offsets
             stateMgr.flush();
@@ -330,7 +330,7 @@ public class ProcessorStateManagerTest {
             changelogReader,
             false,
                 logContext);
-        stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback);
+        stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
 
@@ -349,7 +349,7 @@ public class ProcessorStateManagerTest {
             changelogReader,
             false,
                 logContext);
-        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
         stateMgr.close(null);
         final Map<TopicPartition, Long> read = checkpoint.read();
         assertThat(read, equalTo(offsets));
@@ -366,7 +366,7 @@ public class ProcessorStateManagerTest {
             changelogReader,
             false,
                 logContext);
-        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
 
         stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L));
         final Map<TopicPartition, Long> read = checkpoint.read();
@@ -385,7 +385,7 @@ public class ProcessorStateManagerTest {
             false,
                 logContext);
 
-        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
         final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
         stateMgr.updateStandbyStates(persistentStorePartition,
                                      Collections.singletonList(
@@ -416,7 +416,7 @@ public class ProcessorStateManagerTest {
             false,
                 logContext);
 
-        stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback);
+        stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
         stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L));
 
         final Map<TopicPartition, Long> read = checkpoint.read();
@@ -435,7 +435,7 @@ public class ProcessorStateManagerTest {
             false,
                 logContext);
 
-        stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
+        stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
 
         stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 987L));
 
@@ -457,7 +457,7 @@ public class ProcessorStateManagerTest {
                 logContext);
 
         try {
-            stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME,
true), true, null);
+            stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME,
true), null);
             fail("should have thrown illegal argument exception when store name same as checkpoint
file");
         } catch (final IllegalArgumentException e) {
             //pass
@@ -476,10 +476,10 @@ public class ProcessorStateManagerTest {
             false,
                 logContext);
 
-        stateManager.register(mockStateStore, false, null);
+        stateManager.register(mockStateStore, null);
 
         try {
-            stateManager.register(mockStateStore, false, null);
+            stateManager.register(mockStateStore, null);
             fail("should have thrown illegal argument exception when store with same name
already registered");
         } catch (final IllegalArgumentException e) {
             // pass
@@ -506,7 +506,7 @@ public class ProcessorStateManagerTest {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        stateManager.register(stateStore, false, stateStore.stateRestoreCallback);
+        stateManager.register(stateStore, stateStore.stateRestoreCallback);
 
         try {
             stateManager.flush();
@@ -535,7 +535,7 @@ public class ProcessorStateManagerTest {
                 throw new RuntimeException("KABOOM!");
             }
         };
-        stateManager.register(stateStore, false, stateStore.stateRestoreCallback);
+        stateManager.register(stateStore, stateStore.stateRestoreCallback);
 
         try {
             stateManager.close(Collections.<TopicPartition, Long>emptyMap());
@@ -571,8 +571,8 @@ public class ProcessorStateManagerTest {
                 flushedStore.set(true);
             }
         };
-        stateManager.register(stateStore1, false, stateStore1.stateRestoreCallback);
-        stateManager.register(stateStore2, false, stateStore2.stateRestoreCallback);
+        stateManager.register(stateStore1, stateStore1.stateRestoreCallback);
+        stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
 
         try {
             stateManager.flush();
@@ -606,8 +606,8 @@ public class ProcessorStateManagerTest {
                 closedStore.set(true);
             }
         };
-        stateManager.register(stateStore1, false, stateStore1.stateRestoreCallback);
-        stateManager.register(stateStore2, false, stateStore2.stateRestoreCallback);
+        stateManager.register(stateStore1, stateStore1.stateRestoreCallback);
+        stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
 
         try {
             stateManager.close(Collections.<TopicPartition, Long>emptyMap());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
new file mode 100644
index 0000000..3a4efba
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class RecordDeserializerTest {
+
+    private final ConsumerRecord<byte[], byte[]> rawRecord = new ConsumerRecord<>("topic",
+        1,
+        1,
+        10,
+        TimestampType.LOG_APPEND_TIME,
+        5,
+        3,
+        5,
+        new byte[0],
+        new byte[0]);
+
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() {
+        final RecordDeserializer recordDeserializer = new RecordDeserializer(
+            new TheSourceNode(false, false, "key", "value"), null, new LogContext());
+        final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(null,
rawRecord);
+        assertEquals(rawRecord.topic(), record.topic());
+        assertEquals(rawRecord.partition(), record.partition());
+        assertEquals(rawRecord.offset(), record.offset());
+        assertEquals(rawRecord.checksum(), record.checksum());
+        assertEquals("key", record.key());
+        assertEquals("value", record.value());
+        assertEquals(rawRecord.timestamp(), record.timestamp());
+        assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+    }
+
+    static class TheSourceNode extends SourceNode {
+        private final boolean keyThrowsException;
+        private final boolean valueThrowsException;
+        private final Object key;
+        private final Object value;
+
+        TheSourceNode(final boolean keyThrowsException, final boolean valueThrowsException)
{
+            this(keyThrowsException, valueThrowsException, null, null);
+        }
+
+        @SuppressWarnings("unchecked")
+        TheSourceNode(final boolean keyThrowsException,
+                      final boolean valueThrowsException,
+                      final Object key,
+                      final Object value) {
+            super("", Collections.EMPTY_LIST, null, null);
+            this.keyThrowsException = keyThrowsException;
+            this.valueThrowsException = valueThrowsException;
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public Object deserializeKey(final String topic, final Headers headers, final byte[]
data) {
+            if (keyThrowsException) {
+                throw new RuntimeException();
+            }
+            return key;
+        }
+
+        @Override
+        public Object deserializeValue(final String topic, final Headers headers, final byte[]
data) {
+            if (valueThrowsException) {
+                throw new RuntimeException();
+            }
+            return value;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index c33a9c4..c7af928 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.TimestampType;
@@ -47,6 +44,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class RecordQueueTest {
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
     private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
@@ -56,12 +56,20 @@ public class RecordQueueTest {
     final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes("anyName",
Bytes.class, Bytes.class),
             new RecordCollectorImpl(null, null,  new LogContext("record-queue-test ")));
     private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics,
intDeserializer, intDeserializer);
-    private final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1),
-            mockSourceNodeWithMetrics,
-            timestampExtractor, new LogAndFailExceptionHandler(), context);
-    private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(new TopicPartition(topics[0],
1),
-            mockSourceNodeWithMetrics,
-            timestampExtractor, new LogAndContinueExceptionHandler(), context);
+    private final RecordQueue queue = new RecordQueue(
+        new TopicPartition(topics[0], 1),
+        mockSourceNodeWithMetrics,
+        timestampExtractor,
+        new LogAndFailExceptionHandler(),
+        context,
+        new LogContext());
+    private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(
+        new TopicPartition(topics[0], 1),
+        mockSourceNodeWithMetrics,
+        timestampExtractor,
+        new LogAndContinueExceptionHandler(),
+        context,
+        new LogContext());
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
@@ -196,7 +204,8 @@ public class RecordQueueTest {
                                                   new MockSourceNode<>(topics, intDeserializer,
intDeserializer),
                                                   new FailOnInvalidTimestamp(),
                                                   new LogAndContinueExceptionHandler(),
-                                  null);
+                                                  null,
+                                                  new LogContext());
         queue.addRawRecords(records);
     }
 
@@ -209,7 +218,8 @@ public class RecordQueueTest {
                                                   new MockSourceNode<>(topics, intDeserializer,
intDeserializer),
                                                   new LogAndSkipOnInvalidTimestamp(),
                                                   new LogAndContinueExceptionHandler(),
-                                  null);
+                                                  null,
+                                                  new LogContext());
         queue.addRawRecords(records);
 
         assertEquals(0, queue.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
deleted file mode 100644
index 821dbe9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.junit.Test;
-
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-
-public class SourceNodeRecordDeserializerTest {
-
-    private final ConsumerRecord<byte[], byte[]> rawRecord = new ConsumerRecord<>("topic",
-                                                                                  1,
-                                                                                  1,
-                                                                                  10,
-                                                                                  TimestampType.LOG_APPEND_TIME,
-                                                                                  5,
-                                                                                  3,
-                                                                                  5,
-                                                                                  new byte[0],
-                                                                                  new byte[0]);
-
-
-    @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeyFailsToDeserialize() {
-        final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
-                new TheSourceNode(true, false), null);
-        recordDeserializer.deserialize(rawRecord);
-    }
-
-    @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeyValueFailsToDeserialize() {
-        final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
-                new TheSourceNode(false, true), null);
-        recordDeserializer.deserialize(rawRecord);
-    }
-
-    @Test
-    public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() {
-        final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
-                new TheSourceNode(false, false, "key", "value"), null);
-        final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);
-        assertEquals(rawRecord.topic(), record.topic());
-        assertEquals(rawRecord.partition(), record.partition());
-        assertEquals(rawRecord.offset(), record.offset());
-        assertEquals(rawRecord.checksum(), record.checksum());
-        assertEquals("key", record.key());
-        assertEquals("value", record.value());
-        assertEquals(rawRecord.timestamp(), record.timestamp());
-        assertEquals(TimestampType.CREATE_TIME, record.timestampType());
-    }
-
-    static class TheSourceNode extends SourceNode {
-        private final boolean keyThrowsException;
-        private final boolean valueThrowsException;
-        private final Object key;
-        private final Object value;
-
-        TheSourceNode(final boolean keyThrowsException, final boolean valueThrowsException)
{
-            this(keyThrowsException, valueThrowsException, null, null);
-        }
-
-        @SuppressWarnings("unchecked")
-        TheSourceNode(final boolean keyThrowsException,
-                      final boolean valueThrowsException,
-                      final Object key,
-                      final Object value) {
-            super("", Collections.EMPTY_LIST, null, null);
-            this.keyThrowsException = keyThrowsException;
-            this.valueThrowsException = valueThrowsException;
-            this.key = key;
-            this.value = value;
-        }
-
-        @Override
-        public Object deserializeKey(final String topic, final Headers headers, final byte[]
data) {
-            if (keyThrowsException) {
-                throw new RuntimeException();
-            }
-            return key;
-        }
-
-        @Override
-        public Object deserializeValue(final String topic, final Headers headers, final byte[]
data) {
-            if (valueThrowsException) {
-                throw new RuntimeException();
-            }
-            return value;
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 7a7b119..86a1af1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -335,7 +335,7 @@ public class StandbyTaskTest {
         restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
                 new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
         final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
-        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().count("my-store");
+        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().count();
 
         final StreamsConfig config = createConfig(baseDir);
         final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
index b9e6fee..47a0015 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
@@ -33,7 +33,7 @@ public class StateManagerStub implements StateManager {
     }
 
     @Override
-    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback
stateRestoreCallback) {}
+    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback)
{}
 
     @Override
     public void flush() {}

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index c782790..62da23b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -40,7 +40,7 @@ public class StateRestorerTest {
 
     @Before
     public void setUp() {
-        compositeRestoreListener.setGlobalRestoreListener(reportingListener);
+        compositeRestoreListener.setUserRestoreListener(reportingListener);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 5a3fa69..5da0a64 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -58,7 +58,7 @@ public class StoreChangelogReaderTest {
 
     @Before
     public void setUp() {
-        restoreListener.setGlobalRestoreListener(stateRestoreListener);
+        restoreListener.setUserRestoreListener(stateRestoreListener);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index d9a215c..cd37fab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -31,6 +32,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
 import org.apache.kafka.streams.processor.PartitionGrouper;
@@ -38,6 +40,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
 import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -781,7 +784,7 @@ public class StreamPartitionAssignorTest {
             // force repartitioning for aggregation
             .selectKey(new KeyValueMapper<Object, Object, Object>() {
                 @Override
-                public Object apply(Object key, Object value) {
+                public Object apply(final Object key, final Object value) {
                     return null;
                 }
             })
@@ -789,14 +792,14 @@ public class StreamPartitionAssignorTest {
 
             // Task 2 (should get created):
             // create repartioning and changelog topic as task 1 exists
-            .count("count")
+            .count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count"))
 
             // force repartitioning for join, but second join input topic unknown
             // -> internal repartitioning topic should not get created
             .toStream()
             .map(new KeyValueMapper<Object, Long, KeyValue<Object, Object>>()
{
                 @Override
-                public KeyValue<Object, Object> apply(Object key, Long value) {
+                public KeyValue<Object, Object> apply(final Object key, final Long
value) {
                     return null;
                 }
             });
@@ -809,7 +812,7 @@ public class StreamPartitionAssignorTest {
             // -> thus should not create internal repartitioning topic
             .selectKey(new KeyValueMapper<Object, Object, Object>() {
                 @Override
-                public Object apply(Object key, Object value) {
+                public Object apply(final Object key, final Object value) {
                     return null;
                 }
             })
@@ -820,7 +823,7 @@ public class StreamPartitionAssignorTest {
                 stream1,
                 new ValueJoiner() {
                     @Override
-                    public Object apply(Object value1, Object value2) {
+                    public Object apply(final Object value1, final Object value2) {
                         return null;
                     }
                 },
@@ -923,7 +926,7 @@ public class StreamPartitionAssignorTest {
         final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
         internalTopologyBuilder.setApplicationId(applicationId);
 
-        builder.stream("topic1").groupByKey().count("count");
+        builder.stream("topic1").groupByKey().count();
 
         final UUID uuid = UUID.randomUUID();
         mockThreadDataProvider(Collections.<TaskId>emptySet(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 91dd422..bbed615 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -573,7 +573,7 @@ public class StreamTaskTest {
         final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName,
null, null) {
             @Override
             public void init(final ProcessorContext context, final StateStore root) {
-                context.register(root, true, null);
+                context.register(root, false, null);
             }
 
             @Override
@@ -639,7 +639,7 @@ public class StreamTaskTest {
         final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName,
null, null) {
             @Override
             public void init(final ProcessorContext context, final StateStore root) {
-                context.register(root, true, null);
+                context.register(root, false, null);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
index b4b67f7..6677084 100644
--- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java
@@ -51,7 +51,7 @@ public class GlobalStateManagerStub implements GlobalStateManager {
     }
 
     @Override
-    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback
stateRestoreCallback) {
+    public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback)
{
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index edbff13..37a8ab3 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -147,7 +147,9 @@ public class MockProcessorContext extends AbstractProcessorContext implements
Re
     }
 
     @Override
-    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback
func) {
+    public void register(final StateStore store,
+                         final boolean deprecatedAndIgnoredLoggingEnabled,
+                         final StateRestoreCallback func) {
         storeMap.put(store.name(), store);
         restoreFuncs.put(store.name(), func);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index 5d6a4fa..a4b3118 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -27,26 +27,25 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
 
+@SuppressWarnings("deprecation")
 public class MockStateStoreSupplier implements StateStoreSupplier {
     private String name;
     private boolean persistent;
     private boolean loggingEnabled;
-    private MockStateStore stateStore;
 
-    public MockStateStoreSupplier(String name, boolean persistent) {
+    public MockStateStoreSupplier(final String name,
+                                  final boolean persistent) {
         this(name, persistent, true);
     }
 
-    public MockStateStoreSupplier(String name, boolean persistent, boolean loggingEnabled)
{
+    public MockStateStoreSupplier(final String name,
+                                  final boolean persistent,
+                                  final boolean loggingEnabled) {
         this.name = name;
         this.persistent = persistent;
         this.loggingEnabled = loggingEnabled;
     }
 
-    public MockStateStoreSupplier(final MockStateStore stateStore) {
-        this.stateStore = stateStore;
-    }
-
     @Override
     public String name() {
         return name;
@@ -54,14 +53,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
 
     @Override
     public StateStore get() {
-        if (stateStore != null) {
-            return stateStore;
-        }
-        if (loggingEnabled) {
-            return new MockStateStore(name, persistent).enableLogging();
-        } else {
-            return new MockStateStore(name, persistent);
-        }
+        return new MockStateStore(name, persistent);
     }
 
     @Override
@@ -78,30 +70,26 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
         private final String name;
         private final boolean persistent;
 
-        public boolean loggingEnabled = false;
         public boolean initialized = false;
         public boolean flushed = false;
         public boolean closed = true;
         public final ArrayList<Integer> keys = new ArrayList<>();
 
-        public MockStateStore(String name, boolean persistent) {
+        public MockStateStore(final String name,
+                              final boolean persistent) {
             this.name = name;
             this.persistent = persistent;
         }
 
-        public MockStateStore enableLogging() {
-            loggingEnabled = true;
-            return this;
-        }
-
         @Override
         public String name() {
             return name;
         }
 
         @Override
-        public void init(ProcessorContext context, StateStore root) {
-            context.register(root, loggingEnabled, stateRestoreCallback);
+        public void init(final ProcessorContext context,
+                         final StateStore root) {
+            context.register(root, false, stateRestoreCallback);
             initialized = true;
             closed = false;
         }
@@ -130,7 +118,8 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
             private final Deserializer<Integer> deserializer = new IntegerDeserializer();
 
             @Override
-            public void restore(byte[] key, byte[] value) {
+            public void restore(final byte[] key,
+                                final byte[] value) {
                 keys.add(deserializer.deserialize("", key));
             }
         };

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 1b9cfed..bc56866 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -84,7 +84,9 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
-    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback
stateRestoreCallback) {
+    public void register(final StateStore store,
+                         final boolean deprecatedAndIgnoredLoggingEnabled,
+                         final StateRestoreCallback stateRestoreCallback) {
         // no-op
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index babf704..1d91b52 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -227,8 +227,9 @@ public class ProcessorTopologyTestDriver {
                                                                                    stateRestoreListener);
             globalStateTask = new GlobalStateUpdateTask(globalTopology,
                                                         new GlobalProcessorContextImpl(config,
stateManager, streamsMetrics, cache),
-                                                        stateManager, new LogAndContinueExceptionHandler()
-            );
+                                                        stateManager,
+                                                        new LogAndContinueExceptionHandler(),
+                                                        new LogContext());
             globalStateTask.initialize();
         }
 


Mime
View raw message