kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4642: Improve test coverage of ProcessorStateManager
Date Thu, 02 Feb 2017 04:17:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 063d534c5 -> 3e85f131e


KAFKA-4642: Improve test coverage of ProcessorStateManager

Most of the exception paths weren't covered. Now they are.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Guozhang Wang

Closes #2442 from dguy/KAFKA-4642


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3e85f131
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3e85f131
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3e85f131

Branch: refs/heads/trunk
Commit: 3e85f131e6af5d2ef34499d0b2aca7afca85d79c
Parents: 063d534
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Feb 1 20:17:06 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Feb 1 20:17:06 2017 -0800

----------------------------------------------------------------------
 .../internals/ProcessorStateManagerTest.java    | 237 ++++++++++++++++++-
 1 file changed, 231 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3e85f131/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 602601a..ba27c53 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -24,10 +24,13 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -42,24 +45,27 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public class ProcessorStateManagerTest {
 
-    private File baseDir;
-    private StateDirectory stateDirectory;
-
     public static class MockRestoreConsumer extends MockConsumer<byte[], byte[]> {
         private final Serializer<Integer> serializer = new IntegerSerializer();
 
@@ -196,6 +202,15 @@ public class ProcessorStateManagerTest {
     private final String nonPersistentStoreName = "nonPersistentStore";
     private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId,
persistentStoreName);
     private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId,
nonPersistentStoreName);
+    private final String storeName = "mockStateStore";
+    private final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId,
storeName);
+    private final TopicPartition changelogTopicPartition = new TopicPartition(changelogTopic,
0);
+    private final TaskId taskId = new TaskId(0, 1);
+    private final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
+    private final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(storeName,
true);
+    private File baseDir;
+    private StateDirectory stateDirectory;
+
 
     @Before
     public void setup() {
@@ -498,14 +513,11 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception {
-        final TaskId taskId = new TaskId(0, 1);
         final File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME);
         // write an empty checkpoint file
         final OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile);
         oldCheckpoint.write(Collections.<TopicPartition, Long>emptyMap());
 
-        final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer();
-
         restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList(
                 new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0],
new Node[0])
         ));
@@ -520,4 +532,217 @@ public class ProcessorStateManagerTest {
         assertFalse(checkpointFile.exists());
     }
 
+    @Test
+    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws Exception {
+        final File taskDirectory = stateDirectory.directoryForTask(taskId);
+        final FileChannel channel = FileChannel.open(new File(taskDirectory,
+                                                              StateDirectory.LOCK_FILE_NAME).toPath(),
+                                                     StandardOpenOption.CREATE,
+                                                     StandardOpenOption.WRITE);
+        // lock the task directory
+        final FileLock lock = channel.lock();
+
+        try {
+            new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory,
Collections.<String, String>emptyMap());
+            fail("Should have thrown LockException");
+        } catch (final LockException e) {
+           // pass
+        } finally {
+            lock.release();
+            channel.close();
+        }
+    }
+
+    @Test
+    public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName()
throws Exception {
+        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
+                                                                             noPartitions,
+                                                                             restoreConsumer,
+                                                                             false,
+                                                                             stateDirectory,
+                                                                             Collections.<String,
String>emptyMap());
+
+        try {
+            stateManager.register(new MockStateStoreSupplier.MockStateStore(ProcessorStateManager.CHECKPOINT_FILE_NAME,
true), true, null);
+            fail("should have thrown illegal argument exception when store name same as checkpoint
file");
+        } catch (final IllegalArgumentException e) {
+            //pass
+        }
+    }
+
+    @Test
+    public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered()
throws Exception {
+        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
+                                                                             noPartitions,
+                                                                             restoreConsumer,
+                                                                             false,
+                                                                             stateDirectory,
+                                                                             Collections.<String,
String>emptyMap());
+        stateManager.register(mockStateStore, false, null);
+
+        try {
+            stateManager.register(mockStateStore, false, null);
+            fail("should have thrown illegal argument exception when store with same name
already registered");
+        } catch (final IllegalArgumentException e) {
+            // pass
+        }
+        
+    }
+
+    @Test
+    public void shouldThrowStreamsExceptionWhenRestoreConsumerThrowsTimeoutException() throws
Exception {
+        final MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer() {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                throw new TimeoutException("KABOOM!");
+            }
+        };
+        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
+                                                                             noPartitions,
+                                                                             mockRestoreConsumer,
+                                                                             false,
+                                                                             stateDirectory,
+                                                                             Collections.singletonMap(storeName,
changelogTopic));
+        try {
+            stateManager.register(mockStateStore, false, null);
+            fail("should have thrown StreamsException due to timeout exception");
+        } catch (final StreamsException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldThrowStreamsExceptionWhenRestoreConsumerReturnsNullPartitions() throws
Exception {
+        final MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer() {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                return null;
+            }
+        };
+        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
+                                                                             noPartitions,
+                                                                             mockRestoreConsumer,
+                                                                             false,
+                                                                             stateDirectory,
+                                                                             Collections.singletonMap(storeName,
changelogTopic));
+        try {
+            stateManager.register(mockStateStore, false, null);
+            fail("should have thrown StreamsException due to timeout exception");
+        } catch (final StreamsException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldThrowStreamsExceptionWhenPartitionForTopicNotFound() throws Exception
{
+        final MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer() {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                return Collections.singletonList(new PartitionInfo(changelogTopic, 0, null,
null, null));
+            }
+        };
+        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
+                                                                             Collections.singleton(new
TopicPartition(changelogTopic, 1)),
+                                                                             mockRestoreConsumer,
+                                                                             false,
+                                                                             stateDirectory,
+                                                                             Collections.singletonMap(storeName,
changelogTopic));
+
+        try {
+            stateManager.register(mockStateStore, false, null);
+            fail("should have thrown StreamsException due to partition for topic not found");
+        } catch (final StreamsException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionWhenRestoringStateAndSubscriptionsNonEmpty()
throws Exception {
+        final MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer() {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                return Collections.singletonList(new PartitionInfo(changelogTopic, 0, null,
null, null));
+            }
+        };
+        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
+                                                                             Collections.singleton(changelogTopicPartition),
+                                                                             mockRestoreConsumer,
+                                                                             false,
+                                                                             stateDirectory,
+                                                                             Collections.singletonMap(storeName,
changelogTopic));
+
+        mockRestoreConsumer.subscribe(Collections.singleton("sometopic"));
+
+        try {
+            stateManager.register(mockStateStore, false, null);
+            fail("should throw IllegalStateException when restore consumer has non-empty
subscriptions");
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldThrowIllegalStateExceptionWhenRestoreConsumerPositionGreaterThanEndOffset()
throws Exception {
+        final AtomicInteger position = new AtomicInteger(10);
+        final MockRestoreConsumer mockRestoreConsumer = new MockRestoreConsumer() {
+            @Override
+            public synchronized long position(final TopicPartition partition) {
+                // need to make the end position change to trigger the exception
+                return position.getAndIncrement();
+            }
+        };
+
+        mockRestoreConsumer.updatePartitions(changelogTopic, Collections.singletonList(new
PartitionInfo(changelogTopic, 0, null, null, null)));
+
+        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
+                                                                             Collections.singleton(changelogTopicPartition),
+                                                                             mockRestoreConsumer,
+                                                                             false,
+                                                                             stateDirectory,
+                                                                             Collections.singletonMap(storeName,
changelogTopic));
+
+        stateManager.putOffsetLimit(changelogTopicPartition, 1);
+        // add a record with an offset less than the limit of 1
+        mockRestoreConsumer.bufferRecord(new ConsumerRecord<>(changelogTopic, 0, 0,
1, 1));
+
+
+        try {
+            stateManager.register(mockStateStore, false, mockStateStore.stateRestoreCallback);
+            fail("should have thrown IllegalStateException as end offset has changed");
+        } catch (final IllegalStateException e) {
+            // pass
+        }
+
+    }
+
+    @Test
+    public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws
Exception {
+        restoreConsumer.updatePartitions(changelogTopic, Collections.singletonList(new PartitionInfo(changelogTopic,
0, null, null, null)));
+
+        final ProcessorStateManager stateManager = new ProcessorStateManager(taskId,
+                                                                             Collections.singleton(changelogTopicPartition),
+                                                                             restoreConsumer,
+                                                                             false,
+                                                                             stateDirectory,
+                                                                             Collections.singletonMap(storeName,
changelogTopic));
+
+        final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore(storeName,
true) {
+            @Override
+            public void close() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+        stateManager.putOffsetLimit(changelogTopicPartition, 1);
+        restoreConsumer.bufferRecord(new ConsumerRecord<>(changelogTopic, 0, 1, 1,
1));
+        stateManager.register(stateStore, false, stateStore.stateRestoreCallback);
+
+        try {
+            stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+            fail("Should throw ProcessorStateException if store close throws exception");
+        } catch (final ProcessorStateException e) {
+            // pass
+        }
+    }
+
+
 }


Mime
View raw message