kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [2/5] kafka git commit: KAFKA-5702; extract refactor StreamThread
Date Fri, 11 Aug 2017 11:14:08 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 5fc3807..e638a48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -23,9 +23,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -35,6 +32,8 @@ import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
+import org.apache.kafka.streams.processor.PartitionGrouper;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
@@ -44,9 +43,8 @@ import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
-import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -105,18 +103,15 @@ public class StreamPartitionAssignorTest {
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
     private final TaskId task3 = new TaskId(0, 3);
-    private final String userEndPoint = "localhost:2171";
     private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
     private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
     private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
     private final StreamsConfig config = new StreamsConfig(configProps());
-    private final StateDirectory stateDirectory = new StateDirectory("appId", TestUtils.tempDirectory().getPath(),
new MockTime());
-    private final StreamThread mockStreamThread = new StreamThread(builder, config,
-                                                                   mockClientSupplier, "appID",
-                                                                   "clientId", UUID.randomUUID(),
-                                                                   new Metrics(), new MockTime(),
-                                                                   null, 1L, stateDirectory);
+    private final ThreadDataProvider threadDataProvider = EasyMock.createNiceMock(ThreadDataProvider.class);
     private final Map<String, Object> configurationMap = new HashMap<>();
+    private final DefaultPartitionGrouper defaultPartitionGrouper = new DefaultPartitionGrouper();
+    private final SingleGroupPartitionGrouperStub stubPartitionGrouper = new SingleGroupPartitionGrouperStub();
+    private final String userEndPoint = "localhost:8080";
 
     private Properties configProps() {
         return new Properties() {
@@ -129,13 +124,29 @@ public class StreamPartitionAssignorTest {
         };
     }
 
-    @Before
-    public void setUp() {
-        configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, mockStreamThread);
-        configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 0);
+    private void configurePartitionAssignor(final int standbyReplicas, final String endPoint)
{
+        configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, threadDataProvider);
+        configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, standbyReplicas);
+        configurationMap.put(StreamsConfig.APPLICATION_SERVER_CONFIG, endPoint);
         partitionAssignor.configure(configurationMap);
     }
 
+    private void mockThreadDataProvider(final Set<TaskId> prevTasks,
+                                        final Set<TaskId> cachedTasks,
+                                        final UUID processId,
+                                        final PartitionGrouper partitionGrouper,
+                                        final InternalTopologyBuilder builder) throws NoSuchFieldException,
IllegalAccessException {
+        EasyMock.expect(threadDataProvider.name()).andReturn("name").anyTimes();
+        EasyMock.expect(threadDataProvider.prevActiveTasks()).andReturn(prevTasks).anyTimes();
+        EasyMock.expect(threadDataProvider.cachedTasks()).andReturn(cachedTasks).anyTimes();
+        EasyMock.expect(threadDataProvider.config()).andReturn(config).anyTimes();
+        EasyMock.expect(threadDataProvider.builder()).andReturn(builder).anyTimes();
+        EasyMock.expect(threadDataProvider.processId()).andReturn(processId).anyTimes();
+        EasyMock.expect(threadDataProvider.partitionGrouper()).andReturn(partitionGrouper).anyTimes();
+        EasyMock.replay(threadDataProvider);
+    }
+
+
     @Test
     public void testSubscription() throws Exception {
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -148,34 +159,10 @@ public class StreamPartitionAssignorTest {
                 new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1),
                 new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2));
 
-        String clientId = "client-id";
-        UUID processId = UUID.randomUUID();
-        StreamThread thread = new StreamThread(
-            builder,
-            config,
-            new MockClientSupplier(),
-            "test",
-            clientId,
-            processId,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder,
-                StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory) {
-
-            @Override
-            public Set<TaskId> prevActiveTasks() {
-                return prevTasks;
-            }
-            @Override
-            public Set<TaskId> cachedTasks() {
-                return cachedTasks;
-            }
-        };
-
-        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
+        final UUID processId = UUID.randomUUID();
+        mockThreadDataProvider(prevTasks, cachedTasks, processId, stubPartitionGrouper, builder);
 
+        configurePartitionAssignor(0, null);
         PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1",
"topic2"));
 
         Collections.sort(subscription.topics());
@@ -188,6 +175,7 @@ public class StreamPartitionAssignorTest {
         assertEquals(info.encode(), subscription.userData());
     }
 
+
     @Test
     public void testAssignBasic() throws Exception {
         builder.addSource(null, "source1", null, null, null, "topic1");
@@ -205,24 +193,11 @@ public class StreamPartitionAssignorTest {
 
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
 
+        mockThreadDataProvider(prevTasks10, standbyTasks10, uuid1, stubPartitionGrouper,
builder);
+        configurePartitionAssignor(0, null);
 
-        StreamThread thread10 = new StreamThread(
-            builder,
-            config,
-            mockClientSupplier,
-            "test",
-            client1,
-            uuid1,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config,
mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -281,23 +256,11 @@ public class StreamPartitionAssignorTest {
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         UUID uuid1 = UUID.randomUUID();
-        String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(
-            builder,
-            config,
-            mockClientSupplier,
-            "test",
-            client1,
-            uuid1,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config,
mockClientSupplier.restoreConsumer));
+        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
uuid1, stubPartitionGrouper, builder);
+        configurePartitionAssignor(0, null);
+
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
             new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(),
Collections.<TaskId>emptySet(), userEndPoint).encode()));
@@ -331,22 +294,9 @@ public class StreamPartitionAssignorTest {
             Collections.<String>emptySet(),
             Collections.<String>emptySet());
         UUID uuid1 = UUID.randomUUID();
-        String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(
-            builder,
-            config,
-            new MockClientSupplier(),
-            "test",
-            client1,
-            uuid1,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+        mockThreadDataProvider(prevTasks10, standbyTasks10, uuid1, stubPartitionGrouper,
builder);
+        configurePartitionAssignor(0, null);
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -400,23 +350,9 @@ public class StreamPartitionAssignorTest {
 
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
-
-        StreamThread thread10 = new StreamThread(
-            builder,
-            config,
-            mockClientSupplier,
-            "test",
-            client1,
-            uuid1,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config,
mockClientSupplier.restoreConsumer));
+        mockThreadDataProvider(prevTasks10, Collections.<TaskId>emptySet(), uuid1,
stubPartitionGrouper, builder);
+        configurePartitionAssignor(0, null);
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -477,24 +413,14 @@ public class StreamPartitionAssignorTest {
 
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
 
+        mockThreadDataProvider(Collections.<TaskId>emptySet(),
+                               Collections.<TaskId>emptySet(),
+                               uuid1,
+                               defaultPartitionGrouper, builder);
+        configurePartitionAssignor(0, null);
 
-        StreamThread thread10 = new StreamThread(
-            builder,
-            config,
-            mockClientSupplier,
-            applicationId,
-            client1,
-            uuid1,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config,
mockClientSupplier.restoreConsumer));
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -526,7 +452,7 @@ public class StreamPartitionAssignorTest {
         assertEquals(new HashSet<>(tasks), allTasks);
 
         // check tasks for state topics
-        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = thread10.builder.topicGroups();
+        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
 
         assertEquals(Utils.mkSet(task00, task01, task02), tasksForState(applicationId, "store1",
tasks, topicGroups));
         assertEquals(Utils.mkSet(task10, task11, task12), tasksForState(applicationId, "store2",
tasks, topicGroups));
@@ -572,23 +498,11 @@ public class StreamPartitionAssignorTest {
 
         UUID uuid1 = UUID.randomUUID();
         UUID uuid2 = UUID.randomUUID();
-        String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(
-            builder,
-            config,
-            mockClientSupplier,
-            "test",
-            client1,
-            uuid1,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config,
mockClientSupplier.restoreConsumer));
+        mockThreadDataProvider(prevTasks00, standbyTasks01, uuid1, defaultPartitionGrouper,
builder);
+
+        configurePartitionAssignor(1, null);
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -642,22 +556,8 @@ public class StreamPartitionAssignorTest {
         builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2");
 
         UUID uuid = UUID.randomUUID();
-        String client1 = "client1";
-
-        StreamThread thread = new StreamThread(
-            builder,
-            config,
-            mockClientSupplier,
-            "test",
-            client1,
-            uuid,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
+        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
uuid, defaultPartitionGrouper, builder);
+        configurePartitionAssignor(0, null);
 
         List<TaskId> activeTaskList = Utils.mkList(task0, task3);
         Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -689,23 +589,9 @@ public class StreamPartitionAssignorTest {
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         UUID uuid1 = UUID.randomUUID();
-        String client1 = "client1";
-
-        StreamThread thread10 = new StreamThread(
-            builder,
-            config,
-            mockClientSupplier,
-            applicationId,
-            client1,
-            uuid1,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
-        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config,
mockClientSupplier.restoreConsumer);
+        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
uuid1, defaultPartitionGrouper, builder);
+        configurePartitionAssignor(0, null);
+        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(config,
mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
@@ -737,13 +623,10 @@ public class StreamPartitionAssignorTest {
         Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
 
         UUID uuid1 = UUID.randomUUID();
-        String client1 = "client1";
+        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
uuid1, defaultPartitionGrouper, builder);
 
-        StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId,
client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-                                                 0, stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
-        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config,
mockClientSupplier.restoreConsumer);
+        configurePartitionAssignor(0, null);
+        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(config,
mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
@@ -760,9 +643,6 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
-        final Properties properties = configProps();
-        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
-        final StreamsConfig config = new StreamsConfig(properties);
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
         builder.addSource(null, "source", null, null, null, "input");
@@ -770,12 +650,11 @@ public class StreamPartitionAssignorTest {
         builder.addSink("sink", "output", null, null, null, "processor");
 
         final UUID uuid1 = UUID.randomUUID();
-        final String client1 = "client1";
-
-        final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier,
applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder,
StreamsMetadataState.UNKNOWN_HOST),
-                                                           0, stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId,
client1));
+        mockThreadDataProvider(Collections.<TaskId>emptySet(),
+                               Collections.<TaskId>emptySet(),
+                               uuid1,
+                               defaultPartitionGrouper, builder);
+        configurePartitionAssignor(0, userEndPoint);
         final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
         final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
         assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
@@ -783,10 +662,6 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void shouldMapUserEndPointToTopicPartitions() throws Exception {
-        final Properties properties = configProps();
-        final String myEndPoint = "localhost:8080";
-        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
-        final StreamsConfig config = new StreamsConfig(properties);
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
         builder.addSource(null, "source", null, null, null, "topic1");
@@ -796,29 +671,15 @@ public class StreamPartitionAssignorTest {
         final List<String> topics = Utils.mkList("topic1");
 
         final UUID uuid1 = UUID.randomUUID();
-        final String client1 = "client1";
 
-        final StreamThread streamThread = new StreamThread(
-            builder,
-            config,
-            mockClientSupplier,
-            applicationId,
-            client1,
-            uuid1,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId,
client1));
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config,
mockClientSupplier.restoreConsumer));
+        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
uuid1, defaultPartitionGrouper, builder);
+        configurePartitionAssignor(0, userEndPoint);
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         final Set<TaskId> emptyTasks = Collections.emptySet();
         subscriptions.put("consumer1",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks,
emptyTasks, myEndPoint).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks,
emptyTasks, userEndPoint).encode()));
 
         final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata,
subscriptions);
         final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
@@ -831,32 +692,15 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception
{
-        final Properties properties = configProps();
         final String myEndPoint = "localhost";
-        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
-        final StreamsConfig config = new StreamsConfig(properties);
-        final UUID uuid1 = UUID.randomUUID();
-        final String client1 = "client1";
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
 
-        final StreamThread streamThread = new StreamThread(
-            builder,
-            config,
-            mockClientSupplier,
-            applicationId,
-            client1,
-            uuid1,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config,
mockClientSupplier.restoreConsumer));
+        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
UUID.randomUUID(), defaultPartitionGrouper, builder);
+        partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(config, mockClientSupplier.restoreConsumer));
 
         try {
-            partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId,
client1));
+            configurePartitionAssignor(0, myEndPoint);
             Assert.fail("expected to an exception due to invalid config");
         } catch (ConfigException e) {
             // pass
@@ -865,30 +709,12 @@ public class StreamPartitionAssignorTest {
 
     @Test
     public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws
Exception {
-        final Properties properties = configProps();
         final String myEndPoint = "localhost:j87yhk";
-        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
-        final StreamsConfig config = new StreamsConfig(properties);
-        final UUID uuid1 = UUID.randomUUID();
-        final String client1 = "client1";
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
 
-        final StreamThread streamThread = new StreamThread(
-            builder,
-            config,
-            mockClientSupplier,
-            applicationId,
-            client1,
-            uuid1,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
         try {
-            partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId,
client1));
+            configurePartitionAssignor(0, myEndPoint);
             Assert.fail("expected to an exception due to invalid config");
         } catch (ConfigException e) {
             // pass
@@ -904,6 +730,9 @@ public class StreamPartitionAssignorTest {
         AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new
TaskId(0, 0)),
                 Collections.<TaskId, Set<TopicPartition>>emptyMap(),
                 hostState);
+        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
UUID.randomUUID(), defaultPartitionGrouper, builder);
+        configurePartitionAssignor(0, null);
+
         partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
         assertEquals(hostState, partitionAssignor.getPartitionsByHostState());
     }
@@ -919,6 +748,8 @@ public class StreamPartitionAssignorTest {
                 hostState);
 
 
+        mockThreadDataProvider(Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(),
UUID.randomUUID(), defaultPartitionGrouper, builder);
+        configurePartitionAssignor(0, null);
         partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
         final Cluster cluster = partitionAssignor.clusterMetadata();
         final List<PartitionInfo> partitionInfos = cluster.partitionsForTopic("topic");
@@ -999,22 +830,15 @@ public class StreamPartitionAssignorTest {
         final UUID uuid = UUID.randomUUID();
         final String client = "client1";
 
-        final StreamThread streamThread = new StreamThread(
-            internalTopologyBuilder,
-            config,
-            mockClientSupplier,
-            applicationId,
-            client,
-            uuid,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId,
client));
+        mockThreadDataProvider(Collections.<TaskId>emptySet(),
+                               Collections.<TaskId>emptySet(),
+                               UUID.randomUUID(),
+                               defaultPartitionGrouper,
+                               internalTopologyBuilder);
+        configurePartitionAssignor(0, null);
+
         final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
-            streamThread.config,
+            config,
             mockClientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
 
@@ -1027,6 +851,7 @@ public class StreamPartitionAssignorTest {
                 new SubscriptionInfo(uuid, emptyTasks, emptyTasks, userEndPoint).encode()
             )
         );
+
         final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata,
subscriptions);
 
         final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>();
@@ -1042,7 +867,7 @@ public class StreamPartitionAssignorTest {
             new TopicPartition(applicationId + "-count-repartition", 1),
             new TopicPartition(applicationId + "-count-repartition", 2)
         );
-        assertThat(new HashSet(assignment.get(client).partitions()), equalTo(new HashSet(expectedAssignment)));
+        assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new
HashSet<>(expectedAssignment)));
     }
 
     @Test
@@ -1056,6 +881,12 @@ public class StreamPartitionAssignorTest {
         secondHostState.put(new HostInfo("localhost", 9090), Utils.mkSet(partitionOne));
         secondHostState.put(new HostInfo("other", 9090), Utils.mkSet(partitionTwo));
 
+        mockThreadDataProvider(Collections.<TaskId>emptySet(),
+                               Collections.<TaskId>emptySet(),
+                               UUID.randomUUID(),
+                               defaultPartitionGrouper,
+                               builder);
+        configurePartitionAssignor(0, null);
         partitionAssignor.onAssignment(createAssignment(firstHostState));
         assertEquals(firstHostState, partitionAssignor.getPartitionsByHostState());
         partitionAssignor.onAssignment(createAssignment(secondHostState));
@@ -1072,6 +903,12 @@ public class StreamPartitionAssignorTest {
         final Map<HostInfo, Set<TopicPartition>> secondHostState = Collections.singletonMap(
                 new HostInfo("localhost", 9090), Utils.mkSet(topicOne, topicTwo));
 
+        mockThreadDataProvider(Collections.<TaskId>emptySet(),
+                               Collections.<TaskId>emptySet(),
+                               UUID.randomUUID(),
+                               defaultPartitionGrouper,
+                               builder);
+        configurePartitionAssignor(0, null);
         partitionAssignor.onAssignment(createAssignment(firstHostState));
         assertEquals(Utils.mkSet("topic"), partitionAssignor.clusterMetadata().topics());
         partitionAssignor.onAssignment(createAssignment(secondHostState));
@@ -1081,9 +918,6 @@ public class StreamPartitionAssignorTest {
     @Test
     public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
         final String applicationId = "appId";
-        final Properties props = configProps();
-        props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
-        final StreamsConfig config = new StreamsConfig(props);
         final StreamsBuilder builder = new StreamsBuilder();
 
         final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
@@ -1092,24 +926,15 @@ public class StreamPartitionAssignorTest {
         builder.stream("topic1").groupByKey().count("count");
 
         final UUID uuid = UUID.randomUUID();
-        final String client = "client1";
+        mockThreadDataProvider(Collections.<TaskId>emptySet(),
+                               Collections.<TaskId>emptySet(),
+                               uuid,
+                               defaultPartitionGrouper,
+                               internalTopologyBuilder);
 
-        final StreamThread streamThread = new StreamThread(
-            internalTopologyBuilder,
-            config,
-            mockClientSupplier,
-            applicationId,
-            client,
-            uuid,
-            new Metrics(),
-            Time.SYSTEM,
-            new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST),
-            0,
-            stateDirectory);
-
-        partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId,
client));
+        configurePartitionAssignor(1, userEndPoint);
         partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(
-            streamThread.config,
+            config,
             mockClientSupplier.restoreConsumer));
 
         final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
@@ -1133,7 +958,7 @@ public class StreamPartitionAssignorTest {
         final Map<String, PartitionAssignor.Assignment> assign = partitionAssignor.assign(metadata,
subscriptions);
         final PartitionAssignor.Assignment consumer1Assignment = assign.get("consumer1");
         final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumer1Assignment.userData());
-        final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new
HostInfo("localhost", 2171));
+        final Set<TopicPartition> consumer1partitions = assignmentInfo.partitionsByHost.get(new
HostInfo("localhost", 8080));
         final Set<TopicPartition> consumer2Partitions = assignmentInfo.partitionsByHost.get(new
HostInfo("other", 9090));
         final HashSet<TopicPartition> allAssignedPartitions = new HashSet<>(consumer1partitions);
         allAssignedPartitions.addAll(consumer2Partitions);
@@ -1148,7 +973,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test(expected = KafkaException.class)
-    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance()
throws Exception {
+    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProviderInstance()
throws Exception {
         final Map<String, Object> config = new HashMap<>();
         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am not a stream
thread");

http://git-wip-us.apache.org/repos/asf/kafka/blob/3e69ce80/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index f3222d0..648a15d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.common.KafkaException;
@@ -36,6 +38,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -54,6 +57,7 @@ import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -121,7 +125,6 @@ public class StreamTaskTest {
     private final MockTime time = new MockTime();
     private File baseDir = TestUtils.tempDirectory();
     private StateDirectory stateDirectory;
-    private final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer,
"taskId");
     private StreamsConfig config;
     private StreamsConfig eosConfig;
     private StreamTask task;
@@ -903,6 +906,93 @@ public class StreamTaskTest {
     }
 
     @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhenCommitting()
{
+        final MockProducer producer = new MockProducer();
+        final Consumer<byte[], byte[]> consumer = EasyMock.createStrictMock(Consumer.class);
+        EasyMock.expect(consumer.committed(EasyMock.anyObject(TopicPartition.class)))
+                .andStubReturn(new OffsetAndMetadata(1L));
+        EasyMock.replay(consumer);
+        final StreamTask task = new StreamTask(taskId00, applicationId, partitions, topology,
consumer,
+                              changelogReader, eosConfig, streamsMetrics, stateDirectory,
null, time, producer) {
+
+            @Override
+            protected void flushState() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+
+        try {
+            task.commit();
+            fail("should have thrown an exception");
+        } catch (Exception e) {
+            // all good
+        }
+        EasyMock.verify(consumer);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
+        final MockProducer producer = new MockProducer();
+        final Consumer<byte[], byte[]> consumer = EasyMock.createStrictMock(Consumer.class);
+        EasyMock.expect(consumer.committed(EasyMock.anyObject(TopicPartition.class)))
+                .andStubReturn(new OffsetAndMetadata(1L));
+        EasyMock.replay(consumer);
+        MockSourceNode sourceNode = new MockSourceNode(topic1, intDeserializer, intDeserializer)
{
+            @Override
+            public void close() {
+                throw new RuntimeException("KABOOM!");
+            }
+        };
+
+        final ProcessorTopology topology = new ProcessorTopology(Collections.<ProcessorNode>singletonList(sourceNode),
+                                                                 Collections.<String,
SourceNode>singletonMap(topic1[0], sourceNode),
+                                                                 Collections.<String,
SinkNode>emptyMap(),
+                                                                 Collections.<StateStore>emptyList(),
+                                                                 Collections.<String,
String>emptyMap(),
+                                                                 Collections.<StateStore>emptyList());
+        final StreamTask task = new StreamTask(taskId00, applicationId, Utils.mkSet(partition1),
topology, consumer,
+                                               changelogReader, eosConfig, streamsMetrics,
stateDirectory, null, time, producer);
+
+
+        try {
+            task.suspend();
+            fail("should have thrown an exception");
+        } catch (Exception e) {
+            // all good
+        }
+        EasyMock.verify(consumer);
+    }
+
+    @Test
+    public void shouldCloseStateManagerIfFailureOnTaskClose() {
+        final AtomicBoolean stateManagerCloseCalled = new AtomicBoolean(false);
+        final StreamTask streamTask = new StreamTask(taskId00, applicationId, partitions,
topology, consumer,
+                                               changelogReader, eosConfig, streamsMetrics,
stateDirectory, null,
+                                                     time, new MockProducer<byte[], byte[]>())
{
+
+            @Override
+            void suspend(boolean val) {
+                throw new RuntimeException("KABOOM!");
+            }
+
+            @Override
+            void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException
{
+                stateManagerCloseCalled.set(true);
+            }
+        };
+
+        try {
+            streamTask.close(true);
+            fail("should have thrown an exception");
+        } catch (Exception e) {
+            // all good
+        }
+        assertTrue(stateManagerCloseCalled.get());
+    }
+
+    @SuppressWarnings("unchecked")
     private StreamTask createTaskThatThrowsExceptionOnClose() {
         final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer,
intDeserializer) {
             @Override


Mime
View raw message