kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: fix compilation error due to merge of KAFKA-3812
Date Tue, 19 Jul 2016 23:54:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 14934157d -> 72c6b7d84


HOTFIX: fix compilation error due to merge of KAFKA-3812

Merge of KAFKA-3812 caused a compilation error in StreamThreadStateStoreProviderTest

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1641 from dguy/fix-compile-error


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/72c6b7d8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/72c6b7d8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/72c6b7d8

Branch: refs/heads/trunk
Commit: 72c6b7d84a6f1bdece804d7e42642b7505c4a4eb
Parents: 1493415
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Jul 19 16:54:51 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 19 16:54:51 2016 -0700

----------------------------------------------------------------------
 .../internals/StreamThreadStateStoreProviderTest.java     | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/72c6b7d8/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 5ac57f8..c105790 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
@@ -37,7 +38,6 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -56,6 +56,7 @@ public class StreamThreadStateStoreProviderTest {
     private StreamTask taskOne;
     private StreamTask taskTwo;
     private StreamThreadStateStoreProvider provider;
+    private StateDirectory stateDirectory;
 
     @Before
     public void before() throws IOException {
@@ -76,8 +77,9 @@ public class StreamThreadStateStoreProviderTest {
         final String applicationId = "applicationId";
         properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        final String stateConfigDir = TestUtils.tempDirectory().getPath();
         properties.put(StreamsConfig.STATE_DIR_CONFIG,
-                       TestUtils.tempDirectory(new File("/tmp").toPath(), "my-state").getPath());
+                stateConfigDir);
 
         final StreamsConfig streamsConfig = new StreamsConfig(properties);
         final MockClientSupplier clientSupplier = new MockClientSupplier();
@@ -86,6 +88,7 @@ public class StreamThreadStateStoreProviderTest {
 
         final ProcessorTopology topology = builder.build("X", null);
         final Map<TaskId, StreamTask> tasks = new HashMap<>();
+        stateDirectory = new StateDirectory(applicationId, stateConfigDir);
         taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 0));
         tasks.put(new TaskId(0, 0),
@@ -105,6 +108,7 @@ public class StreamThreadStateStoreProviderTest {
             }
         };
         provider = new StreamThreadStateStoreProvider(thread);
+
     }
 
     @Test
@@ -157,7 +161,7 @@ public class StreamThreadStateStoreProviderTest {
                               clientSupplier.consumer,
                               clientSupplier.producer,
                               clientSupplier.restoreConsumer,
-                              streamsConfig, new TheStreamMetrics()) {
+                              streamsConfig, new TheStreamMetrics(), stateDirectory) {
             @Override
             protected void initializeOffsetLimits() {
 


Mime
View raw message