kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3812: fix state store directory locking in Kafka Streams
Date Tue, 19 Jul 2016 23:00:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cfebfdfa5 -> 14934157d


http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 6014c36..e3f00ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -35,11 +35,12 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
 import org.junit.Test;
 import org.junit.Before;
 
 import java.io.File;
-import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -74,6 +75,8 @@ public class StreamTaskTest {
             },
             Collections.<StateStoreSupplier>emptyList()
     );
+    private File baseDir;
+    private StateDirectory stateDirectory;
 
     private StreamsConfig createConfig(final File baseDir) throws Exception {
         return new StreamsConfig(new Properties() {
@@ -100,188 +103,181 @@ public class StreamTaskTest {
         consumer.assign(Arrays.asList(partition1, partition2));
         source1.addChild(processor);
         source2.addChild(processor);
+        baseDir = TestUtils.tempDirectory();
+        stateDirectory = new StateDirectory("applicationId", baseDir.getPath());
+    }
+
+    @After
+    public void cleanup() {
+        Utils.delete(baseDir);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testProcessOrder() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            StreamsConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
topology, consumer, producer, restoreStateConsumer, config, null);
-
-            task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
-            ));
-
-            task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
-            ));
-
-            assertEquals(5, task.process());
-            assertEquals(1, source1.numReceived);
-            assertEquals(0, source2.numReceived);
-
-            assertEquals(4, task.process());
-            assertEquals(2, source1.numReceived);
-            assertEquals(0, source2.numReceived);
-
-            assertEquals(3, task.process());
-            assertEquals(2, source1.numReceived);
-            assertEquals(1, source2.numReceived);
-
-            assertEquals(2, task.process());
-            assertEquals(3, source1.numReceived);
-            assertEquals(1, source2.numReceived);
-
-            assertEquals(1, task.process());
-            assertEquals(3, source1.numReceived);
-            assertEquals(2, source2.numReceived);
-
-            assertEquals(0, task.process());
-            assertEquals(3, source1.numReceived);
-            assertEquals(3, source2.numReceived);
-
-            task.close();
-
-        } finally {
-            Utils.delete(baseDir);
-        }
+        StreamsConfig config = createConfig(baseDir);
+        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology,
consumer, producer, restoreStateConsumer, config, null, stateDirectory);
+
+        task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
+
+        task.addRecords(partition2, records(
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
+
+        assertEquals(5, task.process());
+        assertEquals(1, source1.numReceived);
+        assertEquals(0, source2.numReceived);
+
+        assertEquals(4, task.process());
+        assertEquals(2, source1.numReceived);
+        assertEquals(0, source2.numReceived);
+
+        assertEquals(3, task.process());
+        assertEquals(2, source1.numReceived);
+        assertEquals(1, source2.numReceived);
+
+        assertEquals(2, task.process());
+        assertEquals(3, source1.numReceived);
+        assertEquals(1, source2.numReceived);
+
+        assertEquals(1, task.process());
+        assertEquals(3, source1.numReceived);
+        assertEquals(2, source2.numReceived);
+
+        assertEquals(0, task.process());
+        assertEquals(3, source1.numReceived);
+        assertEquals(3, source2.numReceived);
+
+        task.close();
+
+
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testPauseResume() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            StreamsConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions,
topology, consumer, producer, restoreStateConsumer, config, null);
+        StreamsConfig config = createConfig(baseDir);
+        StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology,
consumer, producer, restoreStateConsumer, config, null, stateDirectory);
 
-            task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
-            ));
+        task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
 
-            task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
55, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
65, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
-            ));
+        task.addRecords(partition2, records(
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 55,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 65,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
 
-            assertEquals(5, task.process());
-            assertEquals(1, source1.numReceived);
-            assertEquals(0, source2.numReceived);
+        assertEquals(5, task.process());
+        assertEquals(1, source1.numReceived);
+        assertEquals(0, source2.numReceived);
 
-            assertEquals(1, consumer.paused().size());
-            assertTrue(consumer.paused().contains(partition2));
+        assertEquals(1, consumer.paused().size());
+        assertTrue(consumer.paused().contains(partition2));
 
-            task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
50, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
-            ));
+        task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 50,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
 
-            assertEquals(2, consumer.paused().size());
-            assertTrue(consumer.paused().contains(partition1));
-            assertTrue(consumer.paused().contains(partition2));
+        assertEquals(2, consumer.paused().size());
+        assertTrue(consumer.paused().contains(partition1));
+        assertTrue(consumer.paused().contains(partition2));
 
-            assertEquals(7, task.process());
-            assertEquals(2, source1.numReceived);
-            assertEquals(0, source2.numReceived);
+        assertEquals(7, task.process());
+        assertEquals(2, source1.numReceived);
+        assertEquals(0, source2.numReceived);
 
-            assertEquals(1, consumer.paused().size());
-            assertTrue(consumer.paused().contains(partition2));
+        assertEquals(1, consumer.paused().size());
+        assertTrue(consumer.paused().contains(partition2));
 
-            assertEquals(6, task.process());
-            assertEquals(3, source1.numReceived);
-            assertEquals(0, source2.numReceived);
+        assertEquals(6, task.process());
+        assertEquals(3, source1.numReceived);
+        assertEquals(0, source2.numReceived);
 
-            assertEquals(1, consumer.paused().size());
-            assertTrue(consumer.paused().contains(partition2));
+        assertEquals(1, consumer.paused().size());
+        assertTrue(consumer.paused().contains(partition2));
 
-            assertEquals(5, task.process());
-            assertEquals(3, source1.numReceived);
-            assertEquals(1, source2.numReceived);
+        assertEquals(5, task.process());
+        assertEquals(3, source1.numReceived);
+        assertEquals(1, source2.numReceived);
 
-            assertEquals(0, consumer.paused().size());
+        assertEquals(0, consumer.paused().size());
 
-            task.close();
+        task.close();
 
-        } finally {
-            Utils.delete(baseDir);
-        }
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testMaybePunctuate() throws Exception {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            StreamsConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions,
topology, consumer, producer, restoreStateConsumer, config, null);
+        StreamsConfig config = createConfig(baseDir);
+        StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology,
consumer, producer, restoreStateConsumer, config, null, stateDirectory);
 
-            task.addRecords(partition1, records(
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition1.topic(), partition1.partition(),
40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
-            ));
+        task.addRecords(partition1, records(
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
 
-            task.addRecords(partition2, records(
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-                    new ConsumerRecord<>(partition2.topic(), partition2.partition(),
45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
-            ));
+        task.addRecords(partition2, records(
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+        ));
 
-            assertTrue(task.maybePunctuate());
+        assertTrue(task.maybePunctuate());
 
-            assertEquals(5, task.process());
-            assertEquals(1, source1.numReceived);
-            assertEquals(0, source2.numReceived);
+        assertEquals(5, task.process());
+        assertEquals(1, source1.numReceived);
+        assertEquals(0, source2.numReceived);
 
-            assertFalse(task.maybePunctuate());
+        assertFalse(task.maybePunctuate());
 
-            assertEquals(4, task.process());
-            assertEquals(1, source1.numReceived);
-            assertEquals(1, source2.numReceived);
+        assertEquals(4, task.process());
+        assertEquals(1, source1.numReceived);
+        assertEquals(1, source2.numReceived);
 
-            assertTrue(task.maybePunctuate());
+        assertTrue(task.maybePunctuate());
 
-            assertEquals(3, task.process());
-            assertEquals(2, source1.numReceived);
-            assertEquals(1, source2.numReceived);
+        assertEquals(3, task.process());
+        assertEquals(2, source1.numReceived);
+        assertEquals(1, source2.numReceived);
 
-            assertFalse(task.maybePunctuate());
+        assertFalse(task.maybePunctuate());
 
-            assertEquals(2, task.process());
-            assertEquals(2, source1.numReceived);
-            assertEquals(2, source2.numReceived);
+        assertEquals(2, task.process());
+        assertEquals(2, source1.numReceived);
+        assertEquals(2, source2.numReceived);
 
-            assertTrue(task.maybePunctuate());
+        assertTrue(task.maybePunctuate());
 
-            assertEquals(1, task.process());
-            assertEquals(3, source1.numReceived);
-            assertEquals(2, source2.numReceived);
+        assertEquals(1, task.process());
+        assertEquals(3, source1.numReceived);
+        assertEquals(2, source2.numReceived);
 
-            assertFalse(task.maybePunctuate());
+        assertFalse(task.maybePunctuate());
 
-            assertEquals(0, task.process());
-            assertEquals(3, source1.numReceived);
-            assertEquals(3, source2.numReceived);
+        assertEquals(0, task.process());
+        assertEquals(3, source1.numReceived);
+        assertEquals(3, source2.numReceived);
 
-            assertFalse(task.maybePunctuate());
+        assertFalse(task.maybePunctuate());
 
-            processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
+        processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
 
-            task.close();
+        task.close();
 
-        } finally {
-            Utils.delete(baseDir);
-        }
     }
 
     private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[],
byte[]>... recs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 4ae31e4..cb3dee0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -130,8 +130,9 @@ public class StreamThreadTest {
                               Consumer<byte[], byte[]> consumer,
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
-                              StreamsConfig config) {
-            super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer,
config, null);
+                              StreamsConfig config,
+                              StateDirectory stateDirectory) {
+            super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer,
config, null, stateDirectory);
         }
 
         @Override
@@ -161,7 +162,7 @@ public class StreamThreadTest {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
                 ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config);
+                return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config, stateDirectory);
             }
         };
 
@@ -285,7 +286,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
                     ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                    return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config);
+                    return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config, stateDirectory);
                 }
             };
 
@@ -404,7 +405,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition>
partitionsForTask) {
                     ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                    return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config);
+                    return new TestStreamTask(id, applicationId, partitionsForTask, topology,
consumer, producer, restoreConsumer, config, stateDirectory);
                 }
             };
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/14934157/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 4ddbc2a..d2d9668 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -185,7 +186,7 @@ public class ProcessorTopologyTestDriver {
                 public void recordLatency(Sensor sensor, long startNs, long endNs) {
                     // do nothing
                 }
-            });
+            }, new StateDirectory(applicationId, TestUtils.tempDirectory().getPath()));
     }
 
     /**


Mime
View raw message