kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7161: check invariant: oldValue is in the state (#5366)
Date Wed, 01 Aug 2018 18:02:32 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aa48791  KAFKA-7161: check invariant: oldValue is in the state (#5366)
aa48791 is described below

commit aa4879129770798f0e2cee385ab848b64dbe9498
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Wed Aug 1 13:02:27 2018 -0500

    KAFKA-7161: check invariant: oldValue is in the state (#5366)
    
    Reviewers: Vasily Sulatskov <redvasily@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
---
 .../kstream/internals/KTableReduceTest.java        | 81 ++++++++++++++++++++++
 1 file changed, 81 insertions(+)

diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
new file mode 100644
index 0000000..05b74dc
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class KTableReduceTest {
+
+    @Test
+    public void shouldAddAndSubtract() {
+        final AbstractProcessorContext context = new InternalMockProcessorContext();
+
+        final Processor<String, Change<Set<String>>> reduceProcessor =
+            new KTableReduce<String, Set<String>>(
+                "myStore",
+                this::unionNotNullArgs,
+                this::differenceNotNullArgs
+            ).get();
+
+
+        final InMemoryKeyValueStore<String, Set<String>> myStore =
+            new InMemoryKeyValueStore<>("myStore", null, null);
+
+        context.register(myStore, null);
+        reduceProcessor.init(context);
+        context.setCurrentNode(new ProcessorNode<>("reduce", reduceProcessor, singleton("myStore")));
+
+        reduceProcessor.process("A", new Change<>(singleton("a"), null));
+        assertEquals(singleton("a"), myStore.get("A"));
+        reduceProcessor.process("A", new Change<>(singleton("b"), singleton("a")));
+        assertEquals(singleton("b"), myStore.get("A"));
+        reduceProcessor.process("A", new Change<>(null, singleton("b")));
+        assertEquals(emptySet(), myStore.get("A"));
+    }
+
+    private Set<String> differenceNotNullArgs(final Set<String> left, final Set<String>
right) {
+        assertNotNull(left);
+        assertNotNull(right);
+
+        final HashSet<String> strings = new HashSet<>(left);
+        strings.removeAll(right);
+        return strings;
+    }
+
+    private Set<String> unionNotNullArgs(final Set<String> left, final Set<String>
right) {
+        assertNotNull(left);
+        assertNotNull(right);
+
+        final HashSet<String> strings = new HashSet<>();
+        strings.addAll(left);
+        strings.addAll(right);
+        return strings;
+    }
+}


Mime
View raw message