kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4646: Improve test coverage AbstractProcessorContext
Date Fri, 03 Feb 2017 17:16:00 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 47517da40 -> d95f22c12


KAFKA-4646: Improve test coverage AbstractProcessorContext

Exception paths in `register()`, `topic()`, `partition()`, `offset()`, and `timestamp()`,
were not covered by any existing tests

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Matthias J. Sax

Closes #2447 from dguy/KAFKA-4646


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d95f22c1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d95f22c1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d95f22c1

Branch: refs/heads/trunk
Commit: d95f22c12c36a92f9c71a78cabf897d65ddef65f
Parents: 47517da
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Feb 3 09:15:56 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Feb 3 09:15:56 2017 -0800

----------------------------------------------------------------------
 .../internals/AbstractProcessorContext.java     |   5 +-
 .../internals/AbstractProcessorContextTest.java | 173 +++++++++++++++++++
 .../processor/internals/StateManagerStub.java   |  63 +++++++
 .../org/apache/kafka/test/StreamsTestUtils.java |   7 +
 4 files changed, 246 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d95f22c1/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index df9cacb..163c936 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.util.Map;
+import java.util.Objects;
 
 
 public abstract class AbstractProcessorContext implements InternalProcessorContext {
@@ -95,7 +96,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         if (initialized) {
             throw new IllegalStateException("Can only create state stores during initialization.");
         }
-
+        Objects.requireNonNull(store, "store must not be null");
         stateManager.register(store, loggingEnabled, stateRestoreCallback);
     }
 
@@ -108,7 +109,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
             throw new IllegalStateException("This should not happen as topic() should only
be called while a record is processed");
         }
 
-        String topic = recordContext.topic();
+        final String topic = recordContext.topic();
 
         if (topic.equals(NONEXIST_TOPIC)) {
             return null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d95f22c1/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
new file mode 100644
index 0000000..dee3e96
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.test.MockStateStoreSupplier;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+public class AbstractProcessorContextTest {
+
+    private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics());
+    private final AbstractProcessorContext context = new TestProcessorContext(metrics);
+    private final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("store",
false);
+    private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(),
1, "foo");
+
+    @Before
+    public void before() {
+        context.setRecordContext(recordContext);
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionOnRegisterWhenContextIsInitialized() throws
Exception {
+        context.initialized();
+        try {
+            context.register(stateStore, false, null);
+            fail("should throw illegal state exception when context already initialized");
+        } catch (IllegalStateException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldNotThrowIllegalStateExceptionOnRegisterWhenContextIsNotInitialized()
throws Exception {
+        context.register(stateStore, false, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnRegisterIfStateStoreIsNull() {
+        context.register(null, false, null);
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() throws Exception
{
+        context.setRecordContext(null);
+        try {
+            context.topic();
+            fail("should throw illegal state exception when record context is null");
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldReturnTopicFromRecordContext() throws Exception {
+        assertThat(context.topic(), equalTo(recordContext.topic()));
+    }
+
+    @Test
+    public void shouldReturnNullIfTopicEqualsNonExistTopic() throws Exception {
+        context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
+        assertThat(context.topic(), nullValue());
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionOnPartitionIfNoRecordContext() throws Exception
{
+        context.setRecordContext(null);
+        try {
+            context.partition();
+            fail("should throw illegal state exception when record context is null");
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldReturnPartitionFromRecordContext() throws Exception {
+        assertThat(context.partition(), equalTo(recordContext.partition()));
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionOnOffsetIfNoRecordContext() throws Exception
{
+        context.setRecordContext(null);
+        try {
+            context.offset();
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldReturnOffsetFromRecordContext() throws Exception {
+        assertThat(context.offset(), equalTo(recordContext.offset()));
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionOnTimestampIfNoRecordContext() throws Exception
{
+        context.setRecordContext(null);
+        try {
+            context.timestamp();
+            fail("should throw illegal state exception when record context is null");
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldReturnTimestampFromRecordContext() throws Exception {
+        assertThat(context.timestamp(), equalTo(recordContext.timestamp()));
+    }
+
+
+    private static class TestProcessorContext extends AbstractProcessorContext {
+        public TestProcessorContext(final MockStreamsMetrics metrics) {
+            super(new TaskId(0, 0), "appId", new StreamsConfig(minimalStreamsConfig()), metrics,
new StateManagerStub(), new ThreadCache("name", 0, metrics));
+        }
+
+        @Override
+        public StateStore getStateStore(final String name) {
+            return null;
+        }
+
+        @Override
+        public void schedule(final long interval) {
+
+        }
+
+        @Override
+        public <K, V> void forward(final K key, final V value) {
+
+        }
+
+        @Override
+        public <K, V> void forward(final K key, final V value, final int childIndex)
{
+
+        }
+
+        @Override
+        public <K, V> void forward(final K key, final V value, final String childName)
{
+
+        }
+
+        @Override
+        public void commit() {
+
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d95f22c1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
new file mode 100644
index 0000000..3f48059
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class StateManagerStub implements StateManager {
+    @Override
+    public File baseDir() {
+        return null;
+    }
+
+    @Override
+    public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback
stateRestoreCallback) {
+
+    }
+
+    @Override
+    public void flush(final InternalProcessorContext context) {
+
+    }
+
+    @Override
+    public void close(final Map<TopicPartition, Long> offsets) throws IOException {
+
+    }
+
+    @Override
+    public StateStore getGlobalStore(final String name) {
+        return null;
+    }
+
+    @Override
+    public StateStore getStore(final String name) {
+        return null;
+    }
+
+    @Override
+    public Map<TopicPartition, Long> checkpointedOffsets() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d95f22c1/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 73c1b63..a2f3efe 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -57,6 +57,13 @@ public class StreamsTestUtils {
                 new Properties());
     }
 
+    public static Properties minimalStreamsConfig() {
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "anyserver:9092");
+        return properties;
+    }
+
 
     public static <K, V> List<KeyValue<K, V>> toList(final Iterator<KeyValue<K,
V>> iterator) {
         final List<KeyValue<K, V>> results = new ArrayList<>();


Mime
View raw message