kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7021: Reuse source based on config (#5163)
Date Mon, 11 Jun 2018 23:08:52 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d98ec33  KAFKA-7021: Reuse source based on config (#5163)
d98ec33 is described below

commit d98ec33364d4ddb6ee53e0cd6001857207ba2089
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Mon Jun 11 16:08:24 2018 -0700

    KAFKA-7021: Reuse source based on config (#5163)
    
    This PR actually contains two changes:
    
    1. leverage on the TOPOLOGY_OPTIMIZATION config to "adjust" the topology internally to
reuse the source topic.
    
    2. fixed a long dangling bug that whenever source topic is reused as changelog topic,
write the checkpoint file for the consumed offset, this is done by union the ackedOffset from
the producer, plus the consumed offset from the consumer, note we will priori ackedOffset
since the same topic may show up in both (think about repartition topic), by doing this the
consumed offset from source topics can be treated as checkpointed offset when reuse happens.
    
    3. added a few unit and integration tests with / wo the reusing, and make sure the restoration,
standby task, and internal topic creation behaviors are all correct.
    
    Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   6 +-
 .../org/apache/kafka/streams/StreamsBuilder.java   |   7 +-
 .../java/org/apache/kafka/streams/Topology.java    |   1 -
 .../kstream/internals/InternalStreamsBuilder.java  |   8 +-
 .../streams/processor/internals/AbstractTask.java  |   4 +-
 .../internals/InternalTopologyBuilder.java         |  44 ++++++-
 .../processor/internals/ProcessorStateManager.java |  22 ++--
 .../processor/internals/StateDirectory.java        |   2 +-
 .../streams/processor/internals/StreamTask.java    |  11 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |  27 +++-
 .../integration/RestoreIntegrationTest.java        | 137 +++++++++++++++++----
 .../integration/TableTableJoinIntegrationTest.java |   4 +-
 .../internals/InternalStreamsBuilderTest.java      |   2 +-
 .../processor/internals/StreamThreadTest.java      |  32 ++---
 .../internals/StreamsPartitionAssignorTest.java    |   1 +
 15 files changed, 230 insertions(+), 78 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d6002ff..6a707ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -599,7 +599,7 @@ public class KafkaStreams {
     @Deprecated
     public KafkaStreams(final Topology topology,
                         final StreamsConfig config) {
-        this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier());
+        this(topology, config, new DefaultKafkaClientSupplier());
     }
 
     /**
@@ -635,6 +635,10 @@ public class KafkaStreams {
         this.config = config;
         this.time = time;
 
+        // adjust the topology if optimization is turned on.
+        // TODO: to be removed post 2.0
+        internalTopologyBuilder.adjust(config);
+
         // The application ID is a required config and hence should always have value
         processId = UUID.randomUUID();
         final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 517104d..ae6d44c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -302,11 +302,10 @@ public class StreamsBuilder {
         Objects.requireNonNull(materialized, "materialized can't be null");
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
= new MaterializedInternal<>(materialized);
         materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-");
+        final ConsumedInternal<K, V> consumedInternal =
+                new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
materializedInternal.valueSerde()));
 
-        return internalStreamsBuilder.table(topic,
-                                            new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
-                                                                                 materializedInternal.valueSerde())),
-                                            materializedInternal);
+        return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 22f6ea8..753185c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -776,5 +776,4 @@ public class Topology {
     public synchronized TopologyDescription describe() {
         return internalTopologyBuilder.describe();
     }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 0a19b4e..c7bf2fa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -72,11 +72,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     public <K, V> KTable<K, V> table(final String topic,
                                      final ConsumedInternal<K, V> consumed,
                                      final MaterializedInternal<K, V, KeyValueStore<Bytes,
byte[]>> materialized) {
-        // explicitly disable logging for source table materialized stores
-        materialized.withLoggingDisabled();
-
-        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized)
-                .materialize();
+        final StoreBuilder<KeyValueStore<K, V>> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize();
 
         final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
         final String name = newProcessorName(KTableImpl.SOURCE_NAME);
@@ -88,7 +84,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                                  name);
 
         internalTopologyBuilder.addStateStore(storeBuilder, name);
-        internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
+        internalTopologyBuilder.markSourceStoreAndTopic(storeBuilder, topic);
 
         return kTable;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 02a1a06..188ff47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -167,7 +167,7 @@ public abstract class AbstractTask implements Task {
         return sb.toString();
     }
 
-    protected Map<TopicPartition, Long> recordCollectorOffsets() {
+    protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
         return Collections.emptyMap();
     }
 
@@ -242,7 +242,7 @@ public abstract class AbstractTask implements Task {
         ProcessorStateException exception = null;
         log.trace("Closing state manager");
         try {
-            stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+            stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null);
         } catch (final ProcessorStateException e) {
             exception = e;
         } finally {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 7d09031..36a2edc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -121,6 +122,9 @@ public class InternalTopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
+    // TODO: this is only temporary for 2.0 and should be removed
+    public final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>();
+
     public interface StateStoreFactory {
         Set<String> users();
         boolean loggingEnabled();
@@ -498,8 +502,14 @@ public class InternalTopologyBuilder {
 
     public final void addStateStore(final StoreBuilder storeBuilder,
                                     final String... processorNames) {
+        addStateStore(storeBuilder, false, processorNames);
+    }
+
+    public final void addStateStore(final StoreBuilder storeBuilder,
+                                    final boolean allowOverride,
+                                    final String... processorNames) {
         Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
-        if (stateFactories.containsKey(storeBuilder.name())) {
+        if (!allowOverride && stateFactories.containsKey(storeBuilder.name())) {
             throw new TopologyException("StateStore " + storeBuilder.name() + " is already
added.");
         }
 
@@ -566,16 +576,22 @@ public class InternalTopologyBuilder {
         }
     }
 
-    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void connectSourceStoreAndTopic(final String sourceStoreName,
-                                                  final String topic) {
+                                                 final String topic) {
         if (storeToChangelogTopic.containsKey(sourceStoreName)) {
             throw new TopologyException("Source store " + sourceStoreName + " is already
added.");
         }
         storeToChangelogTopic.put(sourceStoreName, topic);
     }
 
-    // TODO: this method is only used by DSL and we might want to refactor this part
+    public final void markSourceStoreAndTopic(final StoreBuilder storeBuilder,
+                                              final String topic) {
+        if (storeToSourceChangelogTopic.containsKey(storeBuilder)) {
+            throw new TopologyException("Source store " + storeBuilder.name() + " is already
used.");
+        }
+        storeToSourceChangelogTopic.put(storeBuilder, topic);
+    }
+
     public final void connectProcessors(final String... processorNames) {
         if (processorNames.length < 2) {
             throw new TopologyException("At least two processors need to participate in the
connection.");
@@ -591,13 +607,11 @@ public class InternalTopologyBuilder {
         nodeGrouper.unite(processorNames[0], Arrays.copyOfRange(processorNames, 1, processorNames.length));
     }
 
-    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void addInternalTopic(final String topicName) {
         Objects.requireNonNull(topicName, "topicName can't be null");
         internalTopicNames.add(topicName);
     }
 
-    // TODO: this method is only used by DSL and we might want to refactor this part
     public final void copartitionSources(final Collection<String> sourceNodes) {
         copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes)));
     }
@@ -1059,6 +1073,24 @@ public class InternalTopologyBuilder {
         return Collections.unmodifiableMap(topicGroups);
     }
 
+    // Adjust the generated topology based on the configs.
+    // Not exposed as public API and should be removed post 2.0
+    public void adjust(final StreamsConfig config) {
+        final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
+
+        if (enableOptimization20) {
+            for (final Map.Entry<StoreBuilder, String> entry : storeToSourceChangelogTopic.entrySet())
{
+                final StoreBuilder storeBuilder = entry.getKey();
+                final String topicName = entry.getValue();
+
+                // update store map to disable logging for this store
+                storeBuilder.withLoggingDisabled();
+                addStateStore(storeBuilder, true);
+                connectSourceStoreAndTopic(storeBuilder.name(), topicName);
+            }
+        }
+    }
+
     private void setRegexMatchedTopicsToSourceNodes() {
         if (subscriptionUpdates.hasUpdates()) {
             for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet())
{
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index e7a23bd..054333b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -46,7 +46,7 @@ public class ProcessorStateManager extends AbstractStateManager {
     private final boolean isStandby;
     private final ChangelogReader changelogReader;
     private final Map<TopicPartition, Long> offsetLimits;
-    private final Map<TopicPartition, Long> restoredOffsets;
+    private final Map<TopicPartition, Long> standbyRestoredOffsets;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby
tasks, keyed by state topic name
     private final Map<String, String> storeToChangelogTopic;
     private final List<TopicPartition> changelogPartitions = new ArrayList<>();
@@ -79,7 +79,7 @@ public class ProcessorStateManager extends AbstractStateManager {
             partitionForTopic.put(source.topic(), source);
         }
         offsetLimits = new HashMap<>();
-        restoredOffsets = new HashMap<>();
+        standbyRestoredOffsets = new HashMap<>();
         this.isStandby = isStandby;
         restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>()
: null;
         this.storeToChangelogTopic = storeToChangelogTopic;
@@ -212,7 +212,7 @@ public class ProcessorStateManager extends AbstractStateManager {
         }
 
         // record the restored offset for its change log partition
-        restoredOffsets.put(storePartition, lastOffset + 1);
+        standbyRestoredOffsets.put(storePartition, lastOffset + 1);
 
         return remainingRecords;
     }
@@ -293,8 +293,8 @@ public class ProcessorStateManager extends AbstractStateManager {
 
     // write the checkpoint
     @Override
-    public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
-        checkpointableOffsets.putAll(changelogReader.restoredOffsets());
+    public void checkpoint(final Map<TopicPartition, Long> checkpointableOffsets) {
+        this.checkpointableOffsets.putAll(changelogReader.restoredOffsets());
         for (final StateStore store : stores.values()) {
             final String storeName = store.name();
             // only checkpoint the offset to the offsets file if
@@ -302,11 +302,11 @@ public class ProcessorStateManager extends AbstractStateManager {
             if (store.persistent() && storeToChangelogTopic.containsKey(storeName))
{
                 final String changelogTopic = storeToChangelogTopic.get(storeName);
                 final TopicPartition topicPartition = new TopicPartition(changelogTopic,
getPartition(storeName));
-                if (ackedOffsets.containsKey(topicPartition)) {
+                if (checkpointableOffsets.containsKey(topicPartition)) {
                     // store the last offset + 1 (the log position after restoration)
-                    checkpointableOffsets.put(topicPartition, ackedOffsets.get(topicPartition)
+ 1);
-                } else if (restoredOffsets.containsKey(topicPartition)) {
-                    checkpointableOffsets.put(topicPartition, restoredOffsets.get(topicPartition));
+                    this.checkpointableOffsets.put(topicPartition, checkpointableOffsets.get(topicPartition)
+ 1);
+                } else if (standbyRestoredOffsets.containsKey(topicPartition)) {
+                    this.checkpointableOffsets.put(topicPartition, standbyRestoredOffsets.get(topicPartition));
                 }
             }
         }
@@ -315,9 +315,9 @@ public class ProcessorStateManager extends AbstractStateManager {
             checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
         }
 
-        log.trace("Writing checkpoint: {}", checkpointableOffsets);
+        log.trace("Writing checkpoint: {}", this.checkpointableOffsets);
         try {
-            checkpoint.write(checkpointableOffsets);
+            checkpoint.write(this.checkpointableOffsets);
         } catch (final IOException e) {
             log.warn("Failed to write offset checkpoint file to {}: {}", checkpoint, e);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index c33ade6..7623c66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -91,7 +91,7 @@ public class StateDirectory {
      * @return directory for the {@link TaskId}
      * @throws ProcessorStateException if the task directory does not exists and could not
be created
      */
-    File directoryForTask(final TaskId taskId) {
+    public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
         if (!taskDir.exists() && !taskDir.mkdir()) {
             throw new ProcessorStateException(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4cea528..805b4c0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -380,7 +380,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         flushState();
 
         if (!eosEnabled) {
-            stateMgr.checkpoint(recordCollectorOffsets());
+            stateMgr.checkpoint(activeTaskCheckpointableOffsets());
         }
 
         commitOffsets(startNewTransaction);
@@ -391,8 +391,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     }
 
     @Override
-    protected Map<TopicPartition, Long> recordCollectorOffsets() {
-        return recordCollector.offsets();
+    protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
+        final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
+        for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet())
{
+            checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
+        }
+
+        return checkpointableOffsets;
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 37101de..3b8c9bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -273,11 +273,17 @@ public class StreamsBuilderTest {
     }
 
     @Test
-    public void shouldReuseSourceTopicAsChangelogs() {
+    public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
         final String topic = "topic";
         builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
+        final Topology topology = builder.build();
+        final Properties props = StreamsTestUtils.minimalStreamsConfig();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
 
-        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+        internalTopologyBuilder.adjust(new StreamsConfig(props));
+
+        assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store",
"topic")));
 
         assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store")));
 
@@ -285,6 +291,23 @@ public class StreamsBuilderTest {
 
         assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.isEmpty(),
equalTo(true));
     }
+
+    @Test
+    public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
+        final String topic = "topic";
+        builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
+
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
+        internalTopologyBuilder.setApplicationId("appId");
+
+        assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store",
"appId-store-changelog")));
+
+        assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store")));
+
+        assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
equalTo(true));
+
+        assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("appId-store-changelog")));
+    }
     
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index f6d36f7..dbf85fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -28,6 +27,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -44,11 +44,14 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -57,10 +60,10 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.File;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -76,6 +79,8 @@ import static org.junit.Assert.assertTrue;
 public class RestoreIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
+    private static final String APPID = "restore-test";
+
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
             new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -83,24 +88,24 @@ public class RestoreIntegrationTest {
     private static final String INPUT_STREAM_2 = "input-stream-2";
     private final int numberOfKeys = 10000;
     private KafkaStreams kafkaStreams;
-    private String applicationId = "restore-test";
-
 
     @BeforeClass
     public static void createTopics() throws InterruptedException {
         CLUSTER.createTopic(INPUT_STREAM, 2, 1);
         CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
+        CLUSTER.createTopic(APPID + "-store-changelog", 2, 1);
     }
 
     private Properties props(final String applicationId) {
         Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         return streamsConfiguration;
     }
 
@@ -112,24 +117,106 @@ public class RestoreIntegrationTest {
     }
 
     @Test
-    public void shouldRestoreState() throws ExecutionException, InterruptedException {
+    public void shouldRestoreStateFromSourceTopic() throws Exception {
         final AtomicInteger numReceived = new AtomicInteger(0);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        createStateForRestoration();
+        final Properties props = props(APPID);
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
+
+        // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on
each of the two partitions
+        final int offsetLimitDelta = 1000;
+        final int offsetCheckpointed = 1000;
+        createStateForRestoration(INPUT_STREAM);
+        setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
+
+        final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props),
new MockTime());
+        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)),
".checkpoint"))
+                .write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 0), (long)
offsetCheckpointed));
+        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)),
".checkpoint"))
+                .write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 1), (long)
offsetCheckpointed));
+
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+        final CountDownLatch shutdownLatch = new CountDownLatch(1);
 
         builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()))
                 .toStream()
                 .foreach(new ForeachAction<Integer, Integer>() {
                     @Override
                     public void apply(final Integer key, final Integer value) {
-                        numReceived.incrementAndGet();
+                        if (numReceived.incrementAndGet() == 2 * offsetLimitDelta)
+                            shutdownLatch.countDown();
                     }
                 });
 
+        kafkaStreams = new KafkaStreams(builder.build(), props);
+        kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
+            @Override
+            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State
oldState) {
+                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING)
{
+                    startupLatch.countDown();
+                }
+            }
+        });
+
+        final AtomicLong restored = new AtomicLong(0);
+        kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
+            @Override
+            public void onRestoreStart(final TopicPartition topicPartition, final String
storeName, final long startingOffset, final long endingOffset) {
+
+            }
+
+            @Override
+            public void onBatchRestored(final TopicPartition topicPartition, final String
storeName, final long batchEndOffset, final long numRestored) {
+
+            }
+
+            @Override
+            public void onRestoreEnd(final TopicPartition topicPartition, final String storeName,
final long totalRestored) {
+                restored.addAndGet(totalRestored);
+            }
+        });
+        kafkaStreams.start();
+
+        assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
+        assertThat(restored.get(), equalTo((long) numberOfKeys - offsetLimitDelta * 2 - offsetCheckpointed
* 2));
+
+        assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+        assertThat(numReceived.get(), equalTo(offsetLimitDelta * 2));
+    }
+
+    @Test
+    public void shouldRestoreStateFromChangelogTopic() throws Exception {
+        final AtomicInteger numReceived = new AtomicInteger(0);
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Properties props = props(APPID);
+
+        // restoring from 1000 to 5000, and then process from 5000 to 10000 on each of the
two partitions
+        final int offsetCheckpointed = 1000;
+        createStateForRestoration(APPID + "-store-changelog");
+        createStateForRestoration(INPUT_STREAM);
+
+        final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props),
new MockTime());
+        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)),
".checkpoint"))
+                .write(Collections.singletonMap(new TopicPartition(APPID + "-store-changelog",
0), (long) offsetCheckpointed));
+        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)),
".checkpoint"))
+                .write(Collections.singletonMap(new TopicPartition(APPID + "-store-changelog",
1), (long) offsetCheckpointed));
 
         final CountDownLatch startupLatch = new CountDownLatch(1);
-        kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
+        final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+        builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("store"))
+                .toStream()
+                .foreach(new ForeachAction<Integer, Integer>() {
+                    @Override
+                    public void apply(final Integer key, final Integer value) {
+                        if (numReceived.incrementAndGet() == numberOfKeys)
+                            shutdownLatch.countDown();
+                    }
+                });
+
+        kafkaStreams = new KafkaStreams(builder.build(), props);
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
             @Override
             public void onChange(final KafkaStreams.State newState, final KafkaStreams.State
oldState) {
@@ -159,8 +246,10 @@ public class RestoreIntegrationTest {
         kafkaStreams.start();
 
         assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
-        assertThat(restored.get(), equalTo((long) numberOfKeys));
-        assertThat(numReceived.get(), equalTo(0));
+        assertThat(restored.get(), equalTo((long) numberOfKeys - 2 * offsetCheckpointed));
+
+        assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+        assertThat(numReceived.get(), equalTo(numberOfKeys));
     }
 
 
@@ -178,7 +267,7 @@ public class RestoreIntegrationTest {
                 }, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled());
 
         final CountDownLatch startupLatch = new CountDownLatch(1);
-        kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
+        kafkaStreams = new KafkaStreams(builder.build(), props(APPID));
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
             @Override
             public void onChange(final KafkaStreams.State newState, final KafkaStreams.State
oldState) {
@@ -228,7 +317,7 @@ public class RestoreIntegrationTest {
 
         final Topology topology = streamsBuilder.build();
 
-        kafkaStreams = new KafkaStreams(topology, props(applicationId + "-logging-disabled"));
+        kafkaStreams = new KafkaStreams(topology, props(APPID + "-logging-disabled"));
 
         final CountDownLatch latch = new CountDownLatch(1);
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
@@ -279,8 +368,7 @@ public class RestoreIntegrationTest {
         }
     }
     
-    private void createStateForRestoration()
-            throws ExecutionException, InterruptedException {
+    private void createStateForRestoration(final String changelogTopic) {
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
@@ -288,30 +376,33 @@ public class RestoreIntegrationTest {
                      new KafkaProducer<>(producerConfig, new IntegerSerializer(), new
IntegerSerializer())) {
 
             for (int i = 0; i < numberOfKeys; i++) {
-                producer.send(new ProducerRecord<>(INPUT_STREAM, i, i));
+                producer.send(new ProducerRecord<>(changelogTopic, i, i));
             }
         }
+    }
 
+    private void setCommittedOffset(final String topic, final int limitDelta) {
         final Properties consumerConfig = new Properties();
         consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationId);
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID);
+        consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "commit-consumer");
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
 
         final Consumer consumer = new KafkaConsumer(consumerConfig);
-        final List<TopicPartition> partitions = Arrays.asList(new TopicPartition(INPUT_STREAM,
0),
-                                                              new TopicPartition(INPUT_STREAM,
1));
+        final List<TopicPartition> partitions = Arrays.asList(
+            new TopicPartition(topic, 0),
+            new TopicPartition(topic, 1));
 
         consumer.assign(partitions);
         consumer.seekToEnd(partitions);
 
-        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         for (TopicPartition partition : partitions) {
             final long position = consumer.position(partition);
-            offsets.put(partition, new OffsetAndMetadata(position + 1));
+            consumer.seek(partition, position - limitDelta);
         }
 
-        consumer.commitSync(offsets);
+        consumer.commitSync();
         consumer.close();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index b5e6fcb..5fab666 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -55,8 +55,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest
{
         appID = "table-table-join-integration-test";
 
         builder = new StreamsBuilder();
-        leftTable = builder.table(INPUT_TOPIC_LEFT);
-        rightTable = builder.table(INPUT_TOPIC_RIGHT);
+        leftTable = builder.table(INPUT_TOPIC_LEFT, Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("left").withLoggingDisabled());
+        rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long, String, KeyValueStore<Bytes,
byte[]>>as("right").withLoggingDisabled());
     }
 
     final private String expectedFinalJoinResult = "D-d";
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 63432ff..ef3fcd6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -137,7 +137,7 @@ public class InternalStreamsBuilderTest {
         assertEquals(storeName, topology.stateStores().get(0).name());
 
         assertEquals(1, topology.storeToChangelogTopic().size());
-        assertEquals("topic2", topology.storeToChangelogTopic().get(storeName));
+        assertEquals("app-id-prefix-STATE-STORE-0000000000-changelog", topology.storeToChangelogTopic().get(storeName));
         assertNull(table1.queryableStoreName());
     }
     
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1cc9c06..3412c62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Cluster;
@@ -59,6 +58,7 @@ import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -69,6 +69,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -84,6 +86,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -821,12 +824,13 @@ public class StreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldUpdateStandbyTask() {
+    public void shouldUpdateStandbyTask() throws IOException {
         final String storeName1 = "count-one";
         final String storeName2 = "table-two";
-        final String changelogName = applicationId + "-" + storeName1 + "-changelog";
-        final TopicPartition partition1 = new TopicPartition(changelogName, 1);
-        final TopicPartition partition2 = t2p1;
+        final String changelogName1 = applicationId + "-" + storeName1 + "-changelog";
+        final String changelogName2 = applicationId + "-" + storeName2 + "-changelog";
+        final TopicPartition partition1 = new TopicPartition(changelogName1, 1);
+        final TopicPartition partition2 = new TopicPartition(changelogName2, 1);
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
             .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName1));
         final MaterializedInternal materialized = new MaterializedInternal(Materialized.as(storeName2));
@@ -835,10 +839,10 @@ public class StreamThreadTest {
 
         final StreamThread thread = createStreamThread(clientId, config, false);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
-        restoreConsumer.updatePartitions(changelogName,
+        restoreConsumer.updatePartitions(changelogName1,
             singletonList(
                 new PartitionInfo(
-                    changelogName,
+                    changelogName1,
                     1,
                     null,
                     new Node[0],
@@ -852,13 +856,13 @@ public class StreamThreadTest {
         restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L));
         restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
         restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L));
-        // let the store1 be restored from 0 to 10; store2 be restored from 0 to (committed
offset) 5
-        clientSupplier.consumer.assign(Utils.mkSet(partition2));
-        clientSupplier.consumer.commitSync(Collections.singletonMap(partition2, new OffsetAndMetadata(5L,
"")));
+        // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed)
to 10
+        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(task3),
CHECKPOINT_FILE_NAME));
+        checkpoint.write(Collections.singletonMap(partition2, 5L));
 
         for (long i = 0L; i < 10L; i++) {
-            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName, 1, i, ("K"
+ i).getBytes(), ("V" + i).getBytes()));
-            restoreConsumer.addRecord(new ConsumerRecord<>(topic2, 1, i, ("K" + i).getBytes(),
("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName1, 1, i, ("K"
+ i).getBytes(), ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1, i, ("K"
+ i).getBytes(), ("V" + i).getBytes()));
         }
 
         thread.setState(StreamThread.State.RUNNING);
@@ -884,9 +888,7 @@ public class StreamThreadTest {
 
         assertEquals(10L, store1.approximateNumEntries());
         assertEquals(5L, store2.approximateNumEntries());
-        assertEquals(Collections.singleton(partition2), restoreConsumer.paused());
-        assertEquals(1, thread.standbyRecords().size());
-        assertEquals(5, thread.standbyRecords().get(partition2).size());
+        assertEquals(0, thread.standbyRecords().size());
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index a32d193..4327e8f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -799,6 +799,7 @@ public class StreamsPartitionAssignorTest {
         final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>();
         expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition",
4);
         expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog",
4);
+        expectedCreatedInternalTopics.put(applicationId + "-topic3-STATE-STORE-0000000002-changelog",
4);
         expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-MAP-0000000001-repartition",
4);
 
         // check if all internal topics were created as expected

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message