kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4649: Improve test coverage GlobalStateManagerImpl
Date Fri, 03 Feb 2017 17:11:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk da082900b -> 47517da40


KAFKA-4649: Improve test coverage GlobalStateManagerImpl

Add coverage for exception paths in `initialize()`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2452 from dguy/kafka-4649


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/47517da4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/47517da4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/47517da4

Branch: refs/heads/trunk
Commit: 47517da40dcb37d91925c1888ce01d06c28ce4cc
Parents: da08290
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Feb 3 09:10:58 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Feb 3 09:10:58 2017 -0800

----------------------------------------------------------------------
 .../internals/GlobalStateManagerImplTest.java   | 45 +++++++++++++-------
 1 file changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/47517da4/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index db51cef..8d89690 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -68,6 +68,7 @@ public class GlobalStateManagerImplTest {
     private MockConsumer<byte[], byte[]> consumer;
     private File checkpointFile;
     private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+    private ProcessorTopology topology;
 
     @Before
     public void before() throws IOException {
@@ -80,12 +81,12 @@ public class GlobalStateManagerImplTest {
         storeToProcessorNode.put(store1, new MockProcessorNode(-1));
         store2 = new NoOpReadOnlyStore("t2-store");
         storeToProcessorNode.put(store2, new MockProcessorNode(-1));
-        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
-                                                                 Collections.<String,
SourceNode>emptyMap(),
-                                                                 Collections.<String,
SinkNode>emptyMap(),
-                                                                 Collections.<StateStore>emptyList(),
-                                                                 storeToTopic,
-                                                                 Arrays.<StateStore>asList(store1,
store2));
+        topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
+                                         Collections.<String, SourceNode>emptyMap(),
+                                         Collections.<String, SinkNode>emptyMap(),
+                                         Collections.<StateStore>emptyList(),
+                                         storeToTopic,
+                                         Arrays.<StateStore>asList(store1, store2));
 
         context = new NoOpProcessorContext();
         stateDirPath = TestUtils.tempDirectory().getPath();
@@ -107,7 +108,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test(expected = LockException.class)
-    public void shouldThrowStreamsExceptionIfCantGetLock() throws Exception {
+    public void shouldThrowLockExceptionIfCantGetLock() throws Exception {
         final StateDirectory stateDir = new StateDirectory("appId", stateDirPath);
         try {
             stateDir.lockGlobalState(1);
@@ -135,10 +136,7 @@ public class GlobalStateManagerImplTest {
 
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws Exception
{
-        final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
-        try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) {
-            stream.write("0\n1\nblah".getBytes());
-        }
+        writeCorruptCheckpoint();
         stateManager.initialize(context);
     }
 
@@ -361,10 +359,7 @@ public class GlobalStateManagerImplTest {
 
     @Test
     public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exception {
-        final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
-        try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) {
-            stream.write("0\n1\nblah".getBytes());
-        }
+        writeCorruptCheckpoint();
         try {
             stateManager.initialize(context);
         } catch (StreamsException e) {
@@ -379,6 +374,26 @@ public class GlobalStateManagerImplTest {
         }
     }
 
+
+    @Test(expected = LockException.class)
+    public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() throws
Exception {
+        stateManager = new GlobalStateManagerImpl(topology, consumer, new StateDirectory("appId",
stateDirPath) {
+            @Override
+            public boolean lockGlobalState(final int retry) throws IOException {
+                throw new IOException("KABOOM!");
+            }
+        });
+
+        stateManager.initialize(context);
+    }
+
+    private void writeCorruptCheckpoint() throws IOException {
+        final File checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
+        try (final FileOutputStream stream = new FileOutputStream(checkpointFile)) {
+            stream.write("0\n1\nfoo".getBytes());
+        }
+    }
+
     private void initializeConsumer(final long numRecords, final long startOffset, final
TopicPartition topicPartition) {
         final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
         startOffsets.put(topicPartition, 1L);


Mime
View raw message