kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3942; Change IntegrationTestUtils.purgeLocalStreamsState to use java.io.tmpdir
Date Mon, 11 Jul 2016 08:50:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e0eaa7f12 -> 383cec9cf


KAFKA-3942; Change IntegrationTestUtils.purgeLocalStreamsState to use java.io.tmpdir

It was previously only deleting files/folders where the path started with /tmp. Changed it
to delete from the value of the System Property `java.io.tmpdir`. Also changed the tests that
were creating State dirs under /tmp to just use `TestUtils.tempDirectory(..)`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #1600 from dguy/kafka-3942


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/383cec9c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/383cec9c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/383cec9c

Branch: refs/heads/trunk
Commit: 383cec9cf38607cdfbda0256d3447d253dcdde7d
Parents: e0eaa7f
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Jul 11 09:19:33 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon Jul 11 09:19:33 2016 +0100

----------------------------------------------------------------------
 .../kafka/streams/integration/JoinIntegrationTest.java       | 5 -----
 .../kafka/streams/integration/WordCountIntegrationTest.java  | 8 ++------
 .../streams/integration/utils/IntegrationTestUtils.java      | 5 +++--
 .../test/java/org/apache/kafka/test/StreamsTestUtils.java    | 2 +-
 4 files changed, 6 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/383cec9c/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 249e1ea..f99a142 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -143,11 +143,6 @@ public class JoinIntegrationTest {
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        // Explicitly place the state directory under /tmp so that we can remove it via
-        // `purgeLocalStreamsState` below.  Once Streams is updated to expose the effective
-        // StreamsConfig configuration (so we can retrieve whatever state directory Streams
came up
-        // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
-        // accordingly.
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
                                  TestUtils.tempDirectory().getPath());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/383cec9c/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
index 9692cda..af51dca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.TestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -86,12 +87,7 @@ public class WordCountIntegrationTest {
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        // Explicitly place the state directory under /tmp so that we can remove it via
-        // `purgeLocalStreamsState` below.  Once Streams is updated to expose the effective
-        // StreamsConfig configuration (so we can retrieve whatever state directory Streams
came up
-        // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
-        // accordingly.
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstreams-word-count");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
 
         KStreamBuilder builder = new KStreamBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/383cec9c/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 8ae55cc..1a1a561 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -115,12 +115,13 @@ public class IntegrationTestUtils {
      * @param streamsConfiguration Streams configuration settings
      */
     public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException
{
+        final String tmpDir = TestUtils.IO_TMP_DIR.getPath();
         String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG);
         if (path != null) {
             File node = Paths.get(path).normalize().toFile();
-            // Only purge state when it's under /tmp.  This is a safety net to prevent accidentally
+            // Only purge state when it's under java.io.tmpdir.  This is a safety net to
prevent accidentally
             // deleting important local directory trees.
-            if (node.getAbsolutePath().startsWith("/tmp")) {
+            if (node.getAbsolutePath().startsWith(tmpDir)) {
                 Utils.delete(new File(node.getAbsolutePath()));
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/383cec9c/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 5a7bfa7..2c8a55e 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -36,7 +36,7 @@ public class StreamsTestUtils {
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, keySerdeClassName);
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, valueSerdeClassName);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/" + applicationId);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.putAll(additional);
         return streamsConfiguration;
 


Mime
View raw message