kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5989: resume consumption of tasks that have state stores but no changelogging
Date Thu, 05 Oct 2017 19:55:59 GMT
Repository: kafka
Updated Branches:
  refs/heads/1.0 1d026269e -> b2bb2c6e8


KAFKA-5989: resume consumption of tasks that have state stores but no changelogging

Stores where logging is disabled where never consumed as the partitions were paused, but never
resumed.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #4025 from dguy/1.0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b2bb2c6e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b2bb2c6e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b2bb2c6e

Branch: refs/heads/1.0
Commit: b2bb2c6e8f6d430bd84690f25c2c1a29bc1f3864
Parents: 1d02626
Author: Damian Guy <damian.guy@gmail.com>
Authored: Thu Oct 5 12:55:55 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Oct 5 12:55:55 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AssignedTasks.java      |  19 ++--
 .../processor/internals/TaskManager.java        |   4 +-
 .../integration/RestoreIntegrationTest.java     | 109 ++++++++++++++++++-
 .../processor/internals/AssignedTasksTest.java  |  14 ++-
 .../processor/internals/TaskManagerTest.java    |  25 ++++-
 5 files changed, 154 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b2bb2c6e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 4448a78..12c3f79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -109,10 +109,12 @@ class AssignedTasks implements RestoringTasks {
     }
 
     /**
+     * @return partitions that are ready to be resumed
      * @throws IllegalStateException If store gets registered after initialized is already
finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void initializeNewTasks() {
+    Set<TopicPartition> initializeNewTasks() {
+        final Set<TopicPartition> readyPartitions = new HashSet<>();
         if (!created.isEmpty()) {
             log.debug("Initializing {}s {}", taskTypeName, created.keySet());
         }
@@ -123,7 +125,7 @@ class AssignedTasks implements RestoringTasks {
                     log.debug("transitioning {} {} to restoring", taskTypeName, entry.getKey());
                     addToRestoring(entry.getValue());
                 } else {
-                    transitionToRunning(entry.getValue());
+                    transitionToRunning(entry.getValue(), readyPartitions);
                 }
                 it.remove();
             } catch (final LockException e) {
@@ -131,6 +133,7 @@ class AssignedTasks implements RestoringTasks {
                 log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(),
e.getMessage());
             }
         }
+        return readyPartitions;
     }
 
     Set<TopicPartition> updateRestored(final Collection<TopicPartition> restored)
{
@@ -144,8 +147,7 @@ class AssignedTasks implements RestoringTasks {
             final Map.Entry<TaskId, Task> entry = it.next();
             final Task task = entry.getValue();
             if (restoredPartitions.containsAll(task.changelogPartitions())) {
-                transitionToRunning(task);
-                resume.addAll(task.partitions());
+                transitionToRunning(task, resume);
                 it.remove();
             } else {
                 if (log.isTraceEnabled()) {
@@ -262,11 +264,11 @@ class AssignedTasks implements RestoringTasks {
                     suspended.remove(taskId);
                     throw e;
                 }
-                transitionToRunning(task);
+                transitionToRunning(task, new HashSet<TopicPartition>());
                 log.trace("resuming suspended {} {}", taskTypeName, task.id());
                 return true;
             } else {
-                log.trace("couldn't resume task {} assigned partitions {}, task partitions
{}", taskId, partitions, task.partitions());
+                log.warn("couldn't resume task {} assigned partitions {}, task partitions
{}", taskId, partitions, task.partitions());
             }
         }
         return false;
@@ -282,11 +284,14 @@ class AssignedTasks implements RestoringTasks {
         }
     }
 
-    private void transitionToRunning(final Task task) {
+    private void transitionToRunning(final Task task, final Set<TopicPartition> readyPartitions)
{
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
         for (TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
+            if (task.hasStateStores()) {
+                readyPartitions.add(topicPartition);
+            }
         }
         for (TopicPartition topicPartition : task.changelogPartitions()) {
             runningByPartition.put(topicPartition, task);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b2bb2c6e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 652f4e4..5387425 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -257,11 +257,11 @@ class TaskManager {
      * @throws TaskMigratedException if another thread wrote to the changelog topic that
is currently restored
      */
     boolean updateNewAndRestoringTasks() {
-        active.initializeNewTasks();
+        final Set<TopicPartition> resumed = active.initializeNewTasks();
         standby.initializeNewTasks();
 
         final Collection<TopicPartition> restored = changelogReader.restore(active);
-        final Set<TopicPartition> resumed = active.updateRestored(restored);
+        resumed.addAll(active.updateRestored(restored));
 
         if (!resumed.isEmpty()) {
             log.trace("resuming partitions {}", resumed);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b2bb2c6e/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index ae36ad8..19ddedf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -30,15 +30,25 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -71,6 +81,7 @@ public class RestoreIntegrationTest {
     public static final EmbeddedKafkaCluster CLUSTER =
             new EmbeddedKafkaCluster(NUM_BROKERS);
     private static final String INPUT_STREAM = "input-stream";
+    private static final String INPUT_STREAM_2 = "input-stream-2";
     private final int numberOfKeys = 10000;
     private KafkaStreams kafkaStreams;
     private String applicationId = "restore-test";
@@ -79,9 +90,10 @@ public class RestoreIntegrationTest {
     @BeforeClass
     public static void createTopics() throws InterruptedException {
         CLUSTER.createTopic(INPUT_STREAM, 2, 1);
+        CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
     }
 
-    private Properties props() {
+    private Properties props(final String applicationId) {
         Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -119,7 +131,7 @@ public class RestoreIntegrationTest {
 
 
         final CountDownLatch startupLatch = new CountDownLatch(1);
-        kafkaStreams = new KafkaStreams(builder.build(), props());
+        kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
             @Override
             public void onChange(final KafkaStreams.State newState, final KafkaStreams.State
oldState) {
@@ -168,7 +180,7 @@ public class RestoreIntegrationTest {
                 }, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled());
 
         final CountDownLatch startupLatch = new CountDownLatch(1);
-        kafkaStreams = new KafkaStreams(builder.build(), props());
+        kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
             @Override
             public void onChange(final KafkaStreams.State newState, final KafkaStreams.State
oldState) {
@@ -182,6 +194,97 @@ public class RestoreIntegrationTest {
 
         assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
     }
+
+    @Test
+    public void shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedException,
ExecutionException {
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_STREAM_2,
+                                                           Arrays.asList(KeyValue.pair(1,
1),
+                                                                         KeyValue.pair(2,
2),
+                                                                         KeyValue.pair(3,
3)),
+                                                           TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                                                                                    IntegerSerializer.class,
+                                                                                    IntegerSerializer.class),
+                                                           CLUSTER.time);
+
+        final KeyValueBytesStoreSupplier lruMapSupplier = Stores.lruMap(INPUT_STREAM_2, 10);
+
+        final StoreBuilder<KeyValueStore<Integer, Integer>> storeBuilder = new
KeyValueStoreBuilder<>(lruMapSupplier,
+                                                                                        
             Serdes.Integer(),
+                                                                                        
             Serdes.Integer(),
+                                                                                        
             CLUSTER.time)
+                .withLoggingDisabled();
+
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+        streamsBuilder.addStateStore(storeBuilder);
+
+        final KStream<Integer, Integer> stream = streamsBuilder.stream(INPUT_STREAM_2);
+        final CountDownLatch processorLatch = new CountDownLatch(3);
+        stream.process(new ProcessorSupplier<Integer, Integer>() {
+            @Override
+            public Processor<Integer, Integer> get() {
+                return new KeyValueStoreProcessor(INPUT_STREAM_2, processorLatch);
+            }
+        }, INPUT_STREAM_2);
+
+        final Topology topology = streamsBuilder.build();
+
+        kafkaStreams = new KafkaStreams(topology, props(applicationId + "-logging-disabled"));
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
+            @Override
+            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State
oldState) {
+                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING)
{
+                    latch.countDown();
+                }
+            }
+        });
+        kafkaStreams.start();
+
+        latch.await(30, TimeUnit.SECONDS);
+
+        assertTrue(processorLatch.await(30, TimeUnit.SECONDS));
+
+    }
+
+
+    public static class KeyValueStoreProcessor implements Processor<Integer, Integer>
{
+
+        private String topic;
+        private final CountDownLatch processorLatch;
+
+        private KeyValueStore<Integer, Integer> store;
+
+        public KeyValueStoreProcessor(final String topic, final CountDownLatch processorLatch)
{
+            this.topic = topic;
+            this.processorLatch = processorLatch;
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            this.store = (KeyValueStore<Integer, Integer>) context.getStateStore(topic);
+        }
+
+        @Override
+        public void process(final Integer key, final Integer value) {
+            if (key != null) {
+                store.put(key, value);
+                processorLatch.countDown();
+            }
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
     
     private void createStateForRestoration()
             throws ExecutionException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b2bb2c6e/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index 9d6aea1..a721936 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -105,19 +105,22 @@ public class AssignedTasksTest {
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         EasyMock.expect(t2.initialize()).andReturn(true);
-        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
+        final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
+        EasyMock.expect(t2.partitions()).andReturn(t2partitions);
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+        EasyMock.expect(t2.hasStateStores()).andReturn(true);
 
         EasyMock.replay(t1, t2);
 
         assignedTasks.addNewTask(t1);
         assignedTasks.addNewTask(t2);
 
-        assignedTasks.initializeNewTasks();
+        final Set<TopicPartition> readyPartitions = assignedTasks.initializeNewTasks();
 
         Collection<Task> restoring = assignedTasks.restoringTasks();
         assertThat(restoring.size(), equalTo(1));
         assertSame(restoring.iterator().next(), t1);
+        assertThat(readyPartitions, equalTo(t2partitions));
     }
 
     @Test
@@ -125,13 +128,15 @@ public class AssignedTasksTest {
         EasyMock.expect(t2.initialize()).andReturn(true);
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+        EasyMock.expect(t2.hasStateStores()).andReturn(false);
 
         EasyMock.replay(t2);
 
         assignedTasks.addNewTask(t2);
-        assignedTasks.initializeNewTasks();
+        final Set<TopicPartition> toResume = assignedTasks.initializeNewTasks();
 
         assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2)));
+        assertThat(toResume, equalTo(Collections.<TopicPartition>emptySet()));
     }
 
     @Test
@@ -140,6 +145,7 @@ public class AssignedTasksTest {
         EasyMock.expect(t1.initialize()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
         EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes();
+        EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
         EasyMock.replay(t1);
 
         addAndInitTask();
@@ -263,6 +269,7 @@ public class AssignedTasksTest {
         EasyMock.expect(t1.initialize()).andReturn(true);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
+        EasyMock.expect(t1.hasStateStores()).andReturn(false);
     }
 
     @Test
@@ -443,6 +450,7 @@ public class AssignedTasksTest {
 
     private void mockRunningTaskSuspension() {
         EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
         t1.suspend();

http://git-wip-us.apache.org/repos/asf/kafka/blob/b2bb2c6e/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 7ee8fae..67dd6c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -32,6 +32,7 @@ import org.junit.runner.RunWith;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -293,7 +294,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldInitializeNewActiveTasks() {
-        active.initializeNewTasks();
+        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
                 andReturn(Collections.<TopicPartition>emptySet());
         EasyMock.expectLastCall();
@@ -304,7 +305,8 @@ public class TaskManagerTest {
 
     @Test
     public void shouldInitializeNewStandbyTasks() {
-        standby.initializeNewTasks();
+        EasyMock.expect(standby.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
+        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
                 andReturn(Collections.<TopicPartition>emptySet());
         EasyMock.expectLastCall();
@@ -316,6 +318,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldRestoreStateFromChangeLogReader() {
+        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
         EasyMock.expect(active.updateRestored(taskId0Partitions)).
                 andReturn(Collections.<TopicPartition>emptySet());
@@ -327,6 +330,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldResumeRestoredPartitions() {
+        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
         EasyMock.expect(active.updateRestored(taskId0Partitions)).
                 andReturn(taskId0Partitions);
@@ -350,6 +354,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
+        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(active.allTasksRunning()).andReturn(false);
         EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
                 andReturn(Collections.<TopicPartition>emptySet());
@@ -449,8 +454,24 @@ public class TaskManagerTest {
         verify(active);
     }
 
+    @Test
+    public void shouldResumeConsumptionOfInitializedPartitions() {
+        final Set<TopicPartition> resumed = Collections.singleton(new TopicPartition("topic",
0));
+        EasyMock.expect(active.initializeNewTasks()).andReturn(resumed);
+        EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
+                andReturn(Collections.<TopicPartition>emptySet());
+        consumer.resume(resumed);
+        EasyMock.expectLastCall();
+
+        EasyMock.replay(active, consumer);
+
+        taskManager.updateNewAndRestoringTasks();
+        EasyMock.verify(consumer);
+    }
+
     private void mockAssignStandbyPartitions(final long offset) {
         final Task task = EasyMock.createNiceMock(Task.class);
+        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(active.allTasksRunning()).andReturn(true);
         EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
                 andReturn(Collections.<TopicPartition>emptySet());


Mime
View raw message