kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4648: Improve test coverage StreamTask
Date Wed, 08 Feb 2017 19:26:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk effa19c04 -> b62170370


KAFKA-4648: Improve test coverage StreamTask

Provide test coverage for exception paths in: `schedule()`, `closeTopology()`, and `punctuate()`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2451 from dguy/kafka-4640


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6217037
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6217037
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6217037

Branch: refs/heads/trunk
Commit: b621703706118b383153da60937c1d31d518d5b8
Parents: effa19c
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Feb 8 11:26:45 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Feb 8 11:26:45 2017 -0800

----------------------------------------------------------------------
 .../streams/processor/internals/StreamTask.java |   6 +
 .../processor/internals/StreamTaskTest.java     | 192 ++++++++++++-------
 .../apache/kafka/test/MockProcessorNode.java    |  21 +-
 .../org/apache/kafka/test/MockSourceNode.java   |   7 +
 4 files changed, 157 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b6217037/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7375fb5..e0dc2dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -391,6 +392,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
         return new ProcessorRecordContext(currRecord.timestamp, currRecord.offset(), currRecord.partition(),
currRecord.topic());
     }
 
+    // Visible for testing
+    ProcessorContext processorContext() {
+        return processorContext;
+    }
+
     /**
      * Produces a string representation contain useful information about a StreamTask.
      * This is useful in debugging scenarios.

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6217037/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index f2ad3a2..15b1d25 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -59,6 +60,9 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -81,7 +85,7 @@ public class StreamTaskTest {
     private final MockProcessorNode<Integer, Integer>  processor = new MockProcessorNode<>(10L);
 
     private final ProcessorTopology topology = new ProcessorTopology(
-            Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2, (ProcessorNode)
processor),
+            Arrays.<ProcessorNode>asList(source1, source2, processor),
             new HashMap<String, SourceNode>() {
                 {
                     put(topic1[0], source1);
@@ -92,9 +96,22 @@ public class StreamTaskTest {
             Collections.<StateStore>emptyList(),
             Collections.<String, String>emptyMap(),
             Collections.<StateStore>emptyList());
+    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false,
bytesSerializer, bytesSerializer);
+    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    private final byte[] recordValue = intSerializer.serialize(null, 10);
+    private final byte[] recordKey = intSerializer.serialize(null, 1);
+    private final String applicationId = "applicationId";
+    private final Metrics metrics = new Metrics();
+    private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics);
+    private final TaskId taskId00 = new TaskId(0, 0);
+    private final MockTime time = new MockTime();
     private File baseDir;
     private StateDirectory stateDirectory;
-    private RecordCollectorImpl recordCollector;
+    private RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "taskId");
+    private ThreadCache testCache =  new ThreadCache("testCache", 0, streamsMetrics);
+    private StreamsConfig config;
+    private StreamTask task;
 
     private StreamsConfig createConfig(final File baseDir) throws Exception {
         return new StreamsConfig(new Properties() {
@@ -108,36 +125,36 @@ public class StreamTaskTest {
         });
     }
 
-    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-    private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false,
bytesSerializer, bytesSerializer);
-    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
 
-    private final byte[] recordValue = intSerializer.serialize(null, 10);
-    private final byte[] recordKey = intSerializer.serialize(null, 1);
 
 
     @Before
-    public void setup() {
+    public void setup() throws Exception {
         consumer.assign(Arrays.asList(partition1, partition2));
         source1.addChild(processor);
         source2.addChild(processor);
         baseDir = TestUtils.tempDirectory();
+        config = createConfig(baseDir);
         stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime());
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
+                              restoreStateConsumer, config, streamsMetrics, stateDirectory,
null, time, recordCollector);
     }
 
     @After
     public void cleanup() {
+        if (task != null) {
+            try {
+                task.close();
+            } catch (Exception e) {
+                // ignore exceptions
+            }
+        }
         Utils.delete(baseDir);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testProcessOrder() throws Exception {
-        StreamsConfig config = createConfig(baseDir);
-        recordCollector = new RecordCollectorImpl(producer, "taskId");
-        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology,
consumer,
-            restoreStateConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory,
null, new MockTime(), recordCollector);
-
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -173,18 +190,10 @@ public class StreamTaskTest {
         assertEquals(0, task.process());
         assertEquals(3, source1.numReceived);
         assertEquals(3, source2.numReceived);
-
-        task.close();
-
     }
 
     @Test
     public void testMetrics() throws Exception {
-        StreamsConfig config = createConfig(baseDir);
-        recordCollector = new RecordCollectorImpl(producer, "taskId");
-        Metrics metrics = new Metrics();
-        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology,
consumer,
-            restoreStateConsumer, config, new MockStreamsMetrics(metrics), stateDirectory,
null, new MockTime(), recordCollector);
         String name = task.id().toString();
         String[] entities = {"all", name};
         String operation = "commit";
@@ -208,10 +217,6 @@ public class StreamTaskTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testPauseResume() throws Exception {
-        StreamsConfig config = createConfig(baseDir);
-        StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology,
consumer,
-            restoreStateConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory,
null, new MockTime(), recordCollector);
-
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
@@ -260,18 +265,11 @@ public class StreamTaskTest {
         assertEquals(1, source2.numReceived);
 
         assertEquals(0, consumer.paused().size());
-
-        task.close();
-
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testMaybePunctuate() throws Exception {
-        StreamsConfig config = createConfig(baseDir);
-        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology,
consumer,
-            restoreStateConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory,
null, new MockTime(), recordCollector);
-
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -324,14 +322,11 @@ public class StreamTaskTest {
 
         processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
 
-        task.close();
-
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() throws Exception
{
-        final StreamsConfig config = createConfig(baseDir);
         final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer,
intDeserializer) {
 
             @Override
@@ -343,7 +338,6 @@ public class StreamTaskTest {
         final List<ProcessorNode> processorNodes = Collections.<ProcessorNode>singletonList(processorNode);
         final Map<String, SourceNode> sourceNodes
                 = Collections.<String, SourceNode>singletonMap(topic1[0], processorNode);
-        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
         final ProcessorTopology topology = new ProcessorTopology(processorNodes,
                                                                  sourceNodes,
                                                                  Collections.<String,
SinkNode>emptyMap(),
@@ -351,14 +345,16 @@ public class StreamTaskTest {
                                                                  Collections.<String,
String>emptyMap(),
                                                                  Collections.<StateStore>emptyList());
 
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
-            topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory,
new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
+        task.close();
+
+        task  = new StreamTask(taskId00, applicationId, partitions,
+                                                     topology, consumer, restoreStateConsumer,
config, streamsMetrics, stateDirectory, testCache, time, recordCollector);
         final int offset = 20;
-        streamTask.addRecords(partition1, Collections.singletonList(
+        task.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
 
         try {
-            streamTask.process();
+            task.process();
             fail("Should've thrown StreamsException");
         } catch (StreamsException e) {
             final String message = e.getMessage();
@@ -373,7 +369,6 @@ public class StreamTaskTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuating()
throws Exception {
-        final StreamsConfig config = createConfig(baseDir);
         final ProcessorNode punctuator = new ProcessorNode("test", new AbstractProcessor()
{
             @Override
             public void init(final ProcessorContext context) {
@@ -390,39 +385,21 @@ public class StreamTaskTest {
                 throw new KafkaException("KABOOM!");
             }
         }, Collections.<String>emptySet());
-
-        final List<ProcessorNode> processorNodes = Collections.singletonList(punctuator);
-
-
-        final ProcessorTopology topology = new ProcessorTopology(processorNodes,
-                                                                 Collections.<String,
SourceNode>emptyMap(),
-                                                                 Collections.<String,
SinkNode>emptyMap(),
-                                                                 Collections.<StateStore>emptyList(),
-                                                                 Collections.<String,
String>emptyMap(),
-                                                                 Collections.<StateStore>emptyList());
-        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
topology, consumer,
-                                                     restoreStateConsumer, config, streamsMetrics,
stateDirectory,
-                                                     new ThreadCache("testCache", 0, streamsMetrics),
new MockTime(), recordCollector);
+        punctuator.init(new NoOpProcessorContext());
 
         try {
-            streamTask.punctuate(punctuator, 1);
+            task.punctuate(punctuator, 1);
             fail("Should've thrown StreamsException");
         } catch (StreamsException e) {
             final String message = e.getMessage();
             assertTrue("message=" + message + " should contain processor", message.contains("processor=test"));
+            assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue());
         }
 
     }
 
     @Test
     public void shouldFlushRecordCollectorOnFlushState() throws Exception {
-        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
-                                                                 Collections.<String,
SourceNode>emptyMap(),
-                                                                 Collections.<String,
SinkNode>emptyMap(),
-                                                                 Collections.<StateStore>emptyList(),
-                                                                 Collections.<String,
String>emptyMap(),
-                                                                 Collections.<StateStore>emptyList());
         final AtomicBoolean flushed = new AtomicBoolean(false);
         final NoOpRecordCollector recordCollector = new NoOpRecordCollector() {
             @Override
@@ -431,12 +408,99 @@ public class StreamTaskTest {
             }
         };
         final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
-        final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "appId", partitions,
topology, consumer,
-            restoreStateConsumer, createConfig(baseDir), streamsMetrics, stateDirectory,
new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector);
+        final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology,
consumer,
+                                                     restoreStateConsumer, createConfig(baseDir),
streamsMetrics,
+                                                     stateDirectory, testCache, time, recordCollector);
         streamTask.flushState();
         assertTrue(flushed.get());
     }
 
+    @Test
+    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled()
throws Exception {
+        ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processor);
+        try {
+            task.punctuate(processor, 10);
+            fail("Should throw illegal state exception as current node is not null");
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldCallPunctuateOnPassedInProcessorNode() throws Exception {
+        task.punctuate(processor, 5);
+        assertThat(processor.punctuatedAt, equalTo(5L));
+        task.punctuate(processor, 10);
+        assertThat(processor.punctuatedAt, equalTo(10L));
+    }
+
+    @Test
+    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() throws
Exception {
+        task.punctuate(processor, 5);
+        assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() throws Exception
{
+        task.schedule(1);
+    }
+
+    @Test
+    public void shouldNotThrowIExceptionOnScheduleIfCurrentNodeIsNotNull() throws Exception
{
+        ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processor);
+        task.schedule(1);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseTopology() throws Exception
{
+        task.close();
+        task = createTaskThatThrowsExceptionOnClose();
+        try {
+            task.closeTopology();
+            fail("should have thrown runtime exception");
+        } catch (RuntimeException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void shouldCloseAllProcessorNodesWhenExceptionsRaised() throws Exception {
+        task.close();
+        task = createTaskThatThrowsExceptionOnClose();
+        try {
+            task.closeTopology();
+        } catch (RuntimeException e) {
+            // expected
+        }
+        assertTrue(processor.closed);
+        assertTrue(source1.closed);
+        assertTrue(source2.closed);
+    }
+
+    @SuppressWarnings("unchecked")
+    private StreamTask createTaskThatThrowsExceptionOnClose() {
+        final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer,
intDeserializer) {
+            @Override
+            public void close() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        final List<ProcessorNode> processorNodes = Arrays.asList(processorNode, processor,
source1, source2);
+        final Map<String, SourceNode> sourceNodes
+                = Collections.<String, SourceNode>singletonMap(topic1[0], processorNode);
+        final ProcessorTopology topology = new ProcessorTopology(processorNodes,
+                                                                 sourceNodes,
+                                                                 Collections.<String,
SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList(),
+                                                                 Collections.<String,
String>emptyMap(),
+                                                                 Collections.<StateStore>emptyList());
+
+
+        return new StreamTask(taskId00, applicationId, partitions,
+                              topology, consumer, restoreStateConsumer, config, streamsMetrics,
stateDirectory, testCache, time, recordCollector);
+    }
+
     private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[],
byte[]>... recs) {
         return Arrays.asList(recs);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6217037/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index d4c8334..3ba3603 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -25,12 +25,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
 
-    public static final String NAME = "MOCK-PROCESS-";
-    public static final AtomicInteger INDEX = new AtomicInteger(1);
-
-    public int numReceived = 0;
+    private static final String NAME = "MOCK-PROCESS-";
+    private static final AtomicInteger INDEX = new AtomicInteger(1);
 
     public final MockProcessorSupplier<K, V> supplier;
+    public boolean closed;
+    public long punctuatedAt;
     public boolean initialized;
 
     public MockProcessorNode(long scheduleInterval) {
@@ -51,7 +51,18 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K,
V> {
 
     @Override
     public void process(K key, V value) {
-        this.numReceived++;
         processor().process(key, value);
     }
+
+    @Override
+    public void punctuate(final long timestamp) {
+        super.punctuate(timestamp);
+        this.punctuatedAt = timestamp;
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        this.closed = true;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6217037/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 4e0d21a..f4f84fd 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -35,6 +35,7 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> {
     public final ArrayList<K> keys = new ArrayList<>();
     public final ArrayList<V> values = new ArrayList<>();
     public boolean initialized;
+    public boolean closed;
 
     public MockSourceNode(String[] topics, Deserializer<K> keyDeserializer, Deserializer<V>
valDeserializer) {
         super(NAME + INDEX.getAndIncrement(), Arrays.asList(topics), keyDeserializer, valDeserializer);
@@ -52,4 +53,10 @@ public class MockSourceNode<K, V> extends SourceNode<K, V>
{
         super.init(context);
         initialized = true;
     }
+
+    @Override
+    public void close() {
+        super.close();
+        this.closed = true;
+    }
 }


Mime
View raw message