kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: set up temp directories properly in StreamTaskTest
Date Thu, 15 Oct 2015 18:45:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a4dbf9010 -> 50a076d1e


MINOR: set up temp directories properly in StreamTaskTest

guozhangwang
StreamTaskTest did not set up a temp directory for each test. This occasionally caused interference
between tests through state directory locking.

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang

Closes #317 from ymatsuda/fix_StreamTaskTest


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/50a076d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/50a076d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/50a076d1

Branch: refs/heads/trunk
Commit: 50a076d1e9929d82107b3086f664a234a1c7b9f6
Parents: a4dbf90
Author: Yasuhiro Matsuda <yasuhiro@confluent.io>
Authored: Thu Oct 15 11:49:44 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Oct 15 11:49:44 2015 -0700

----------------------------------------------------------------------
 .../processor/internals/StreamTaskTest.java     | 216 ++++++++++---------
 1 file changed, 118 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/50a076d1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index f93093c..92b8684 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -27,11 +27,14 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamingConfig;
 import org.apache.kafka.test.MockSourceNode;
 import org.junit.Test;
 import org.junit.Before;
 
+import java.io.File;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -61,22 +64,25 @@ public class StreamTaskTest {
             }
         });
 
-    private final StreamingConfig config = new StreamingConfig(new Properties() {
-        {
-            setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-            setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-            setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-            setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-            setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-            setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
-            setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
-        }
-    });
+    private StreamingConfig createConfig(final File baseDir) throws Exception {
+        return new StreamingConfig(new Properties() {
+            {
+                setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+                setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+                setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
+                setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
+                setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
+                setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+            }
+        });
+    }
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false,
bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
- 
+
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
 
@@ -88,96 +94,110 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testProcessOrder() {
-        StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions,
topology, config, null);
-
-        task.addRecords(partition1, records(
-            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey,
recordValue),
-            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey,
recordValue),
-            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey,
recordValue)
-        ));
-
-        task.addRecords(partition2, records(
-            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, recordKey,
recordValue),
-            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey,
recordValue),
-            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey,
recordValue)
-        ));
-
-        assertEquals(task.process(), 5);
-        assertEquals(source1.numReceived, 1);
-        assertEquals(source2.numReceived, 0);
-
-        assertEquals(task.process(), 4);
-        assertEquals(source1.numReceived, 1);
-        assertEquals(source2.numReceived, 1);
-
-        assertEquals(task.process(), 3);
-        assertEquals(source1.numReceived, 2);
-        assertEquals(source2.numReceived, 1);
-
-        assertEquals(task.process(), 2);
-        assertEquals(source1.numReceived, 3);
-        assertEquals(source2.numReceived, 1);
-
-        assertEquals(task.process(), 1);
-        assertEquals(source1.numReceived, 3);
-        assertEquals(source2.numReceived, 2);
-
-        assertEquals(task.process(), 0);
-        assertEquals(source1.numReceived, 3);
-        assertEquals(source2.numReceived, 3);
-
-        task.close();
+    public void testProcessOrder() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            StreamingConfig config = createConfig(baseDir);
+            StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer,
partitions, topology, config, null);
+
+            task.addRecords(partition1, records(
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
10, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
20, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
30, recordKey, recordValue)
+            ));
+
+            task.addRecords(partition2, records(
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
25, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
35, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
45, recordKey, recordValue)
+            ));
+
+            assertEquals(task.process(), 5);
+            assertEquals(source1.numReceived, 1);
+            assertEquals(source2.numReceived, 0);
+
+            assertEquals(task.process(), 4);
+            assertEquals(source1.numReceived, 1);
+            assertEquals(source2.numReceived, 1);
+
+            assertEquals(task.process(), 3);
+            assertEquals(source1.numReceived, 2);
+            assertEquals(source2.numReceived, 1);
+
+            assertEquals(task.process(), 2);
+            assertEquals(source1.numReceived, 3);
+            assertEquals(source2.numReceived, 1);
+
+            assertEquals(task.process(), 1);
+            assertEquals(source1.numReceived, 3);
+            assertEquals(source2.numReceived, 2);
+
+            assertEquals(task.process(), 0);
+            assertEquals(source1.numReceived, 3);
+            assertEquals(source2.numReceived, 3);
+
+            task.close();
+
+        } finally {
+            Utils.delete(baseDir);
+        }
     }
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testPauseResume() {
-        StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions,
topology, config, null);
-
-        task.addRecords(partition1, records(
-            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey,
recordValue),
-            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, recordKey,
recordValue)
-        ));
-
-        task.addRecords(partition2, records(
-            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, recordKey,
recordValue),
-            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, recordKey,
recordValue),
-            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55, recordKey,
recordValue),
-            new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65, recordKey,
recordValue)
-        ));
-
-        assertEquals(task.process(), 5);
-        assertEquals(source1.numReceived, 1);
-        assertEquals(source2.numReceived, 0);
-
-        assertEquals(consumer.paused().size(), 1);
-        assertTrue(consumer.paused().contains(partition2));
-
-        task.addRecords(partition1, records(
-            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, recordKey,
recordValue),
-            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, recordKey,
recordValue),
-            new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50, recordKey,
recordValue)
-        ));
-
-        assertEquals(consumer.paused().size(), 2);
-        assertTrue(consumer.paused().contains(partition1));
-        assertTrue(consumer.paused().contains(partition2));
-
-        assertEquals(task.process(), 7);
-        assertEquals(source1.numReceived, 1);
-        assertEquals(source2.numReceived, 1);
-
-        assertEquals(consumer.paused().size(), 1);
-        assertTrue(consumer.paused().contains(partition1));
-
-        assertEquals(task.process(), 6);
-        assertEquals(source1.numReceived, 2);
-        assertEquals(source2.numReceived, 1);
-
-        assertEquals(consumer.paused().size(), 0);
-
-        task.close();
+    public void testPauseResume() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            StreamingConfig config = createConfig(baseDir);
+            StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer,
partitions, topology, config, null);
+
+            task.addRecords(partition1, records(
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
10, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
20, recordKey, recordValue)
+            ));
+
+            task.addRecords(partition2, records(
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
35, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
45, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
55, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
65, recordKey, recordValue)
+            ));
+
+            assertEquals(task.process(), 5);
+            assertEquals(source1.numReceived, 1);
+            assertEquals(source2.numReceived, 0);
+
+            assertEquals(consumer.paused().size(), 1);
+            assertTrue(consumer.paused().contains(partition2));
+
+            task.addRecords(partition1, records(
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
30, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
40, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
50, recordKey, recordValue)
+            ));
+
+            assertEquals(consumer.paused().size(), 2);
+            assertTrue(consumer.paused().contains(partition1));
+            assertTrue(consumer.paused().contains(partition2));
+
+            assertEquals(task.process(), 7);
+            assertEquals(source1.numReceived, 1);
+            assertEquals(source2.numReceived, 1);
+
+            assertEquals(consumer.paused().size(), 1);
+            assertTrue(consumer.paused().contains(partition1));
+
+            assertEquals(task.process(), 6);
+            assertEquals(source1.numReceived, 2);
+            assertEquals(source2.numReceived, 1);
+
+            assertEquals(consumer.paused().size(), 0);
+
+            task.close();
+
+        } finally {
+            Utils.delete(baseDir);
+        }
     }
 
     private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[],
byte[]>... recs) {


Mime
View raw message