kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5958; Global stores access state restore listener
Date Thu, 28 Sep 2017 09:54:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 1444b7b59 -> e1543a5a8


KAFKA-5958; Global stores access state restore listener

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3973 from bbejeck/KAFKA-5958_global_stores_access_state_restore_listener


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e1543a5a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e1543a5a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e1543a5a

Branch: refs/heads/trunk
Commit: e1543a5a8ecbd9da6e39fb0952b1193450b3c931
Parents: 1444b7b
Author: Bill Bejeck <bill@confluent.io>
Authored: Thu Sep 28 10:54:38 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Thu Sep 28 10:54:38 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  3 ++-
 .../internals/GlobalStateManagerImpl.java       | 17 +++++++++---
 .../processor/internals/GlobalStreamThread.java | 17 ++++++++----
 .../apache/kafka/streams/KafkaStreamsTest.java  |  4 +--
 .../internals/GlobalStateManagerImplTest.java   | 27 ++++++++++++++++++--
 .../internals/GlobalStreamThreadTest.java       |  8 ++++--
 .../kafka/test/ProcessorTopologyTestDriver.java |  6 ++++-
 7 files changed, 66 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e1543a5a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 5aec3c5..2f5ce4b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -613,7 +613,8 @@ public class KafkaStreams {
                                                         stateDirectory,
                                                         metrics,
                                                         Time.SYSTEM,
-                                                        globalThreadId);
+                                                        globalThreadId,
+                                                        delegatingStateRestoreListener);
             globalThreadState = globalStreamThread.state();
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1543a5a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index d9205a0..d03425b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.slf4j.Logger;
@@ -61,15 +62,18 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     private final OffsetCheckpoint checkpoint;
     private final Set<String> globalStoreNames = new HashSet<>();
     private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
+    private final StateRestoreListener stateRestoreListener;
 
     public GlobalStateManagerImpl(final ProcessorTopology topology,
                                   final Consumer<byte[], byte[]> consumer,
-                                  final StateDirectory stateDirectory) {
+                                  final StateDirectory stateDirectory,
+                                  final StateRestoreListener stateRestoreListener) {
         this.topology = topology;
         this.consumer = consumer;
         this.stateDirectory = stateDirectory;
         this.baseDir = stateDirectory.globalStateDir();
         this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
+        this.stateRestoreListener = stateRestoreListener;
     }
 
     @Override
@@ -135,7 +139,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
         final List<TopicPartition> topicPartitions = topicPartitionsForStore(store);
         final Map<TopicPartition, Long> highWatermarks = consumer.endOffsets(topicPartitions);
         try {
-            restoreState(stateRestoreCallback, topicPartitions, highWatermarks);
+            restoreState(stateRestoreCallback, topicPartitions, highWatermarks, store.name());
             stores.put(store.name(), store);
         } finally {
             consumer.assign(Collections.<TopicPartition>emptyList());
@@ -159,7 +163,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
 
     private void restoreState(final StateRestoreCallback stateRestoreCallback,
                               final List<TopicPartition> topicPartitions,
-                              final Map<TopicPartition, Long> highWatermarks) {
+                              final Map<TopicPartition, Long> highWatermarks,
+                              final String storeName) {
         for (final TopicPartition topicPartition : topicPartitions) {
             consumer.assign(Collections.singletonList(topicPartition));
             final Long checkpoint = checkpointableOffsets.get(topicPartition);
@@ -178,6 +183,9 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
                                                 ? stateRestoreCallback
                                                 : new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
 
+            stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
+            long restoreCount = 0L;
+
             while (offset < highWatermark) {
                 final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
                 final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
@@ -188,7 +196,10 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
                     }
                 }
                 stateRestoreAdapter.restoreAll(restoreRecords);
+                stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
+                restoreCount += restoreRecords.size();
             }
+            stateRestoreListener.onRestoreEnd(topicPartition, storeName, restoreCount);
             checkpointableOffsets.put(topicPartition, offset);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1543a5a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 41ebcca..a365add 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -27,15 +27,16 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.HashSet;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
 import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
@@ -113,6 +114,7 @@ public class GlobalStreamThread extends Thread {
     private final Object stateLock = new Object();
     private StreamThread.StateListener stateListener = null;
     private final String logPrefix;
+    private final StateRestoreListener stateRestoreListener;
 
     /**
      * Set the {@link StreamThread.StateListener} to be notified when state changes. Note
this API is internal to
@@ -175,7 +177,8 @@ public class GlobalStreamThread extends Thread {
                               final StateDirectory stateDirectory,
                               final Metrics metrics,
                               final Time time,
-                              final String threadClientId) {
+                              final String threadClientId,
+                              final StateRestoreListener stateRestoreListener) {
         super(threadClientId);
         this.time = time;
         this.config = config;
@@ -189,6 +192,7 @@ public class GlobalStreamThread extends Thread {
         this.logContext = new LogContext(logPrefix);
         this.log = logContext.logger(getClass());
         this.cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
+        this.stateRestoreListener = stateRestoreListener;
 
     }
 
@@ -294,7 +298,10 @@ public class GlobalStreamThread extends Thread {
 
     private StateConsumer initialize() {
         try {
-            final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology, consumer,
stateDirectory);
+            final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology,
+                                                                           consumer,
+                                                                           stateDirectory,
+                                                                           stateRestoreListener);
             final StateConsumer stateConsumer
                     = new StateConsumer(this.logContext,
                                         consumer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1543a5a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index f1ae6da..4bd2890 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -28,9 +28,9 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -54,9 +54,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assert.assertNotNull;
 
 @Category({IntegrationTest.class})
 public class KafkaStreamsTest {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1543a5a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index e530d60..b438347 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockProcessorNode;
+import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.NoOpReadOnlyStore;
 import org.apache.kafka.test.TestUtils;
@@ -49,6 +50,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -61,6 +65,7 @@ public class GlobalStateManagerImplTest {
 
     private final MockTime time = new MockTime();
     private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+    private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
     private final TopicPartition t1 = new TopicPartition("t1", 1);
     private final TopicPartition t2 = new TopicPartition("t2", 1);
     private GlobalStateManagerImpl stateManager;
@@ -95,7 +100,7 @@ public class GlobalStateManagerImplTest {
         stateDirPath = TestUtils.tempDirectory().getPath();
         stateDirectory = new StateDirectory("appId", stateDirPath, time);
         consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory);
+        stateManager = new GlobalStateManagerImpl(topology, consumer, stateDirectory, stateRestoreListener);
         checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
     }
 
@@ -204,6 +209,24 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
+    public void shouldListenForRestoreEvents() {
+        initializeConsumer(5, 1, t1);
+        stateManager.initialize(context);
+
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        stateManager.register(store1, false, stateRestoreCallback);
+
+        assertThat(stateRestoreListener.restoreStartOffset, equalTo(1L));
+        assertThat(stateRestoreListener.restoreEndOffset, equalTo(5L));
+        assertThat(stateRestoreListener.totalNumRestored, equalTo(5L));
+
+
+        assertThat(stateRestoreListener.storeNameCalledStates.get(RESTORE_START), equalTo(store1.name()));
+        assertThat(stateRestoreListener.storeNameCalledStates.get(RESTORE_BATCH), equalTo(store1.name()));
+        assertThat(stateRestoreListener.storeNameCalledStates.get(RESTORE_END), equalTo(store1.name()));
+    }
+
+    @Test
     public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws IOException {
         initializeConsumer(5, 6, t1);
 
@@ -452,7 +475,7 @@ public class GlobalStateManagerImplTest {
             public boolean lockGlobalState(final int retry) throws IOException {
                 throw new IOException("KABOOM!");
             }
-        });
+        }, stateRestoreListener);
 
         try {
             stateManager.initialize(context);

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1543a5a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 9d0b637..29f1ac0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -49,6 +50,7 @@ public class GlobalStreamThreadTest {
     private final KStreamBuilder builder = new KStreamBuilder();
     private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockTime time = new MockTime();
+    private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
     private GlobalStreamThread globalStreamThread;
     private StreamsConfig config;
 
@@ -65,7 +67,8 @@ public class GlobalStreamThreadTest {
                                                     new StateDirectory("appId", TestUtils.tempDirectory().getPath(),
time),
                                                     new Metrics(),
                                                     new MockTime(),
-                                                    "clientId");
+                                                    "clientId",
+                                                     stateRestoreListener);
     }
 
     @Test
@@ -96,7 +99,8 @@ public class GlobalStreamThreadTest {
                                                     new StateDirectory("appId", TestUtils.tempDirectory().getPath(),
time),
                                                     new Metrics(),
                                                     new MockTime(),
-                                                    "clientId");
+                                                    "clientId",
+                                                    stateRestoreListener);
 
         try {
             globalStreamThread.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/e1543a5a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 27e9309..babf704 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -211,6 +211,7 @@ public class ProcessorTopologyTestDriver {
 
         if (globalTopology != null) {
             final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
+            final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
             for (final String topicName : globalTopology.sourceTopics()) {
                 final List<PartitionInfo> partitionInfos = new ArrayList<>();
                 partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
@@ -220,7 +221,10 @@ public class ProcessorTopologyTestDriver {
                 globalPartitionsByTopic.put(topicName, partition);
                 offsetsByTopicPartition.put(partition, new AtomicLong());
             }
-            final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology,
globalConsumer, stateDirectory);
+            final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology,
+                                                                                   globalConsumer,
+                                                                                   stateDirectory,
+                                                                                   stateRestoreListener);
             globalStateTask = new GlobalStateUpdateTask(globalTopology,
                                                         new GlobalProcessorContextImpl(config,
stateManager, streamsMetrics, cache),
                                                         stateManager, new LogAndContinueExceptionHandler()


Mime
View raw message