kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-7021: checkpoint offsets from committed (#5207)
Date Thu, 14 Jun 2018 18:08:34 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 313960f  KAFKA-7021: checkpoint offsets from committed (#5207)
313960f is described below

commit 313960f2cdd92343517c0db56622fa2b4daf1a0e
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Jun 14 10:42:32 2018 -0700

    KAFKA-7021: checkpoint offsets from committed (#5207)
    
    This is a cherry-pick PR of KAFKA-7051 for 1.1 and older. In this PR:
    
    1. With old StateStoreSupplier APIs the source topics are reused, and this PR will make
sure the offsets are written for source topics.
    
    2. With new Materialized API we do not enforce reusing source topic, and will use the
changelog topic if it is specified.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  1 -
 .../streams/processor/internals/AbstractTask.java  |  4 +-
 .../internals/InternalTopologyBuilder.java         |  8 +-
 .../processor/internals/ProcessorStateManager.java | 27 ++++---
 .../processor/internals/StateDirectory.java        |  2 +-
 .../streams/processor/internals/StreamTask.java    | 13 ++-
 .../streams/processor/internals/StreamThread.java  |  9 +++
 .../apache/kafka/streams/StreamsBuilderTest.java   | 21 ++++-
 .../integration/RestoreIntegrationTest.java        | 92 ++++++++++------------
 .../internals/InternalStreamsBuilderTest.java      |  2 +-
 10 files changed, 105 insertions(+), 74 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4308e5d..c5c71d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -107,7 +107,6 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                                  name);
 
         internalTopologyBuilder.addStateStore(storeBuilder, name);
-        internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
 
         return kTable;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 47d1e58..baea4af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -165,7 +165,7 @@ public abstract class AbstractTask implements Task {
         return sb.toString();
     }
 
-    protected Map<TopicPartition, Long> recordCollectorOffsets() {
+    protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
         return Collections.emptyMap();
     }
 
@@ -236,7 +236,7 @@ public abstract class AbstractTask implements Task {
         ProcessorStateException exception = null;
         log.trace("Closing state manager");
         try {
-            stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+            stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null);
         } catch (final ProcessorStateException e) {
             exception = e;
         } finally {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index faaf96a..5ab3c94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -123,7 +123,7 @@ public class InternalTopologyBuilder {
 
     private Map<Integer, Set<String>> nodeGroups = null;
 
-    interface StateStoreFactory {
+    public interface StateStoreFactory {
         Set<String> users();
         boolean loggingEnabled();
         StateStore build();
@@ -1821,4 +1821,10 @@ public class InternalTopologyBuilder {
         return sb.toString();
     }
 
+    // following functions are for test only
+
+    public synchronized Map<String, StateStoreFactory> getStateStores() {
+        return stateFactories;
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3a2803e..5536ac1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -52,8 +52,8 @@ public class ProcessorStateManager implements StateManager {
     private final Map<String, StateStore> stores;
     private final Map<String, StateStore> globalStores;
     private final Map<TopicPartition, Long> offsetLimits;
-    private final Map<TopicPartition, Long> restoredOffsets;
     private final Map<TopicPartition, Long> checkpointedOffsets;
+    private final Map<TopicPartition, Long> standbyRestoredOffsets;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby
tasks, keyed by state topic name
     private final Map<String, String> storeToChangelogTopic;
     private final List<TopicPartition> changelogPartitions = new ArrayList<>();
@@ -87,7 +87,7 @@ public class ProcessorStateManager implements StateManager {
         stores = new LinkedHashMap<>();
         globalStores = new HashMap<>();
         offsetLimits = new HashMap<>();
-        restoredOffsets = new HashMap<>();
+        standbyRestoredOffsets = new HashMap<>();
         this.isStandby = isStandby;
         restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>()
: null;
         this.storeToChangelogTopic = storeToChangelogTopic;
@@ -210,7 +210,7 @@ public class ProcessorStateManager implements StateManager {
         }
 
         // record the restored offset for its change log partition
-        restoredOffsets.put(storePartition, lastOffset + 1);
+        standbyRestoredOffsets.put(storePartition, lastOffset + 1);
 
         return remainingRecords;
     }
@@ -291,9 +291,8 @@ public class ProcessorStateManager implements StateManager {
 
     // write the checkpoint
     @Override
-    public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
-        log.trace("Writing checkpoint: {}", ackedOffsets);
-        checkpointedOffsets.putAll(changelogReader.restoredOffsets());
+    public void checkpoint(final Map<TopicPartition, Long> checkpointableOffsets) {
+        this.checkpointedOffsets.putAll(changelogReader.restoredOffsets());
         for (final StateStore store : stores.values()) {
             final String storeName = store.name();
             // only checkpoint the offset to the offsets file if
@@ -301,19 +300,21 @@ public class ProcessorStateManager implements StateManager {
             if (store.persistent() && storeToChangelogTopic.containsKey(storeName))
{
                 final String changelogTopic = storeToChangelogTopic.get(storeName);
                 final TopicPartition topicPartition = new TopicPartition(changelogTopic,
getPartition(storeName));
-                if (ackedOffsets.containsKey(topicPartition)) {
+                if (checkpointableOffsets.containsKey(topicPartition)) {
                     // store the last offset + 1 (the log position after restoration)
-                    checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition)
+ 1);
-                } else if (restoredOffsets.containsKey(topicPartition)) {
-                    checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition));
+                    checkpointedOffsets.put(topicPartition, checkpointableOffsets.get(topicPartition)
+ 1);
+                } else if (standbyRestoredOffsets.containsKey(topicPartition)) {
+                    checkpointedOffsets.put(topicPartition, standbyRestoredOffsets.get(topicPartition));
                 }
             }
         }
+
+        if (checkpoint == null) {
+            checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
+        }
+
         // write the checkpoint file before closing, to indicate clean shutdown
         try {
-            if (checkpoint == null) {
-                checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
-            }
             checkpoint.write(checkpointedOffsets);
         } catch (final IOException e) {
             log.warn("Failed to write checkpoint file to {}:", new File(baseDir, CHECKPOINT_FILE_NAME),
e);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index b7bc45c..dda5660 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -87,7 +87,7 @@ public class StateDirectory {
      * @return directory for the {@link TaskId}
      * @throws ProcessorStateException if the task directory does not exists and could not
be created
      */
-    File directoryForTask(final TaskId taskId) {
+    public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
         if (!taskDir.exists() && !taskDir.mkdir()) {
             throw new ProcessorStateException(
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index cc6d7a7..4969b27 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -309,7 +309,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                 public void run() {
                     flushState();
                     if (!eosEnabled) {
-                        stateMgr.checkpoint(recordCollectorOffsets());
+                        stateMgr.checkpoint(activeTaskCheckpointableOffsets());
                     }
                     commitOffsets(startNewTransaction);
                 }
@@ -320,8 +320,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     }
 
     @Override
-    protected Map<TopicPartition, Long> recordCollectorOffsets() {
-        return recordCollector.offsets();
+    protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
+        final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
+        for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet())
{
+            if (!checkpointableOffsets.containsKey(entry.getKey())) {
+                checkpointableOffsets.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        return checkpointableOffsets;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c3f9312..08dfdd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1257,4 +1257,13 @@ public class StreamThread extends Thread implements ThreadDataProvider
{
         }
         threadMetadata = new ThreadMetadata(this.getName(), this.state().name(), activeTasksMetadata,
standbyTasksMetadata);
     }
+
+    // this is for testing only
+    TaskManager taskManager() {
+        return taskManager;
+    }
+
+    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords()
{
+        return standbyRecords;
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 4a496b8..e7ce819 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -293,7 +293,26 @@ public class StreamsBuilderTest {
         assertFalse(stores.hasNext());
         assertFalse(subtopologies.hasNext());
     }
-    
+
+    @Test
+    public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
+        final String topic = "topic";
+        builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
+
+        final InternalTopologyBuilder internalTopologyBuilder = InternalTopologyAccessor.getInternalTopologyBuilder(builder.build());
+        internalTopologyBuilder.setApplicationId("appId");
+
+        assertThat(internalTopologyBuilder.build().storeToChangelogTopic(), equalTo(Collections.singletonMap("store",
"appId-store-changelog")));
+
+        assertThat(internalTopologyBuilder.getStateStores().keySet(), equalTo(Collections.singleton("store")));
+
+
+        assertThat(internalTopologyBuilder.getStateStores().get("store").loggingEnabled(),
equalTo(true));
+
+        assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("appId-store-changelog")));
+    }
+
+
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
         builder.stream(Collections.<String>emptyList());
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 19ddedf..e2cd16e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -16,18 +16,15 @@
  */
 package org.apache.kafka.streams.integration;
 
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -44,11 +41,14 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -57,11 +57,10 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Collections;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -76,32 +75,33 @@ import static org.junit.Assert.assertTrue;
 @Category({IntegrationTest.class})
 public class RestoreIntegrationTest {
     private static final int NUM_BROKERS = 1;
+    private static final String APPID = "restore-test";
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-            new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private static final String INPUT_STREAM = "input-stream";
     private static final String INPUT_STREAM_2 = "input-stream-2";
     private final int numberOfKeys = 10000;
     private KafkaStreams kafkaStreams;
-    private String applicationId = "restore-test";
 
 
     @BeforeClass
     public static void createTopics() throws InterruptedException {
         CLUSTER.createTopic(INPUT_STREAM, 2, 1);
         CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
+        CLUSTER.createTopic(APPID + "-store-changelog", 2, 1);
     }
 
     private Properties props(final String applicationId) {
         Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         return streamsConfiguration;
     }
 
@@ -112,26 +112,38 @@ public class RestoreIntegrationTest {
         }
     }
 
-
     @Test
-    public void shouldRestoreState() throws ExecutionException, InterruptedException {
+    public void shouldRestoreStateFromChangelogTopic() throws Exception {
         final AtomicInteger numReceived = new AtomicInteger(0);
         final StreamsBuilder builder = new StreamsBuilder();
 
-        createStateForRestoration();
+        final Properties props = props(APPID);
+
+        // restoring from 1000 to 5000, and then process from 5000 to 10000 on each of the
two partitions
+        final int offsetCheckpointed = 1000;
+        createStateForRestoration(APPID + "-store-changelog");
+        createStateForRestoration(INPUT_STREAM);
+
+        final StateDirectory stateDirectory = new StateDirectory(APPID, props.getProperty(StreamsConfig.STATE_DIR_CONFIG),
new MockTime());
+        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)),
".checkpoint"))
+                .write(Collections.singletonMap(new TopicPartition(APPID + "-store-changelog",
0), (long) offsetCheckpointed));
+        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)),
".checkpoint"))
+                .write(Collections.singletonMap(new TopicPartition(APPID + "-store-changelog",
1), (long) offsetCheckpointed));
 
-        builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()))
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+        final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+        builder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.<Integer,
Integer, KeyValueStore<Bytes, byte[]>>as("store"))
                 .toStream()
                 .foreach(new ForeachAction<Integer, Integer>() {
                     @Override
                     public void apply(final Integer key, final Integer value) {
-                        numReceived.incrementAndGet();
+                        if (numReceived.incrementAndGet() == numberOfKeys)
+                            shutdownLatch.countDown();
                     }
                 });
 
-
-        final CountDownLatch startupLatch = new CountDownLatch(1);
-        kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
+        kafkaStreams = new KafkaStreams(builder.build(), props);
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
             @Override
             public void onChange(final KafkaStreams.State newState, final KafkaStreams.State
oldState) {
@@ -161,10 +173,11 @@ public class RestoreIntegrationTest {
         kafkaStreams.start();
 
         assertTrue(startupLatch.await(30, TimeUnit.SECONDS));
-        assertThat(restored.get(), equalTo((long) numberOfKeys));
-        assertThat(numReceived.get(), equalTo(0));
-    }
+        assertThat(restored.get(), equalTo((long) numberOfKeys - 2 * offsetCheckpointed));
 
+        assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS));
+        assertThat(numReceived.get(), equalTo(numberOfKeys));
+    }
 
     @Test
     public void shouldSuccessfullyStartWhenLoggingDisabled() throws InterruptedException
{
@@ -180,7 +193,7 @@ public class RestoreIntegrationTest {
                 }, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled());
 
         final CountDownLatch startupLatch = new CountDownLatch(1);
-        kafkaStreams = new KafkaStreams(builder.build(), props(applicationId));
+        kafkaStreams = new KafkaStreams(builder.build(), props(APPID));
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
             @Override
             public void onChange(final KafkaStreams.State newState, final KafkaStreams.State
oldState) {
@@ -230,7 +243,7 @@ public class RestoreIntegrationTest {
 
         final Topology topology = streamsBuilder.build();
 
-        kafkaStreams = new KafkaStreams(topology, props(applicationId + "-logging-disabled"));
+        kafkaStreams = new KafkaStreams(topology, props(APPID + "-logging-disabled"));
 
         final CountDownLatch latch = new CountDownLatch(1);
         kafkaStreams.setStateListener(new KafkaStreams.StateListener() {
@@ -250,6 +263,7 @@ public class RestoreIntegrationTest {
     }
 
 
+
     public static class KeyValueStoreProcessor implements Processor<Integer, Integer>
{
 
         private String topic;
@@ -285,9 +299,8 @@ public class RestoreIntegrationTest {
 
         }
     }
-    
-    private void createStateForRestoration()
-            throws ExecutionException, InterruptedException {
+
+    private void createStateForRestoration(final String changelogTopic) {
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
@@ -295,31 +308,8 @@ public class RestoreIntegrationTest {
                      new KafkaProducer<>(producerConfig, new IntegerSerializer(), new
IntegerSerializer())) {
 
             for (int i = 0; i < numberOfKeys; i++) {
-                producer.send(new ProducerRecord<>(INPUT_STREAM, i, i));
+                producer.send(new ProducerRecord<>(changelogTopic, i, i));
             }
         }
-
-        final Properties consumerConfig = new Properties();
-        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, applicationId);
-        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-
-        final Consumer consumer = new KafkaConsumer(consumerConfig);
-        final List<TopicPartition> partitions = Arrays.asList(new TopicPartition(INPUT_STREAM,
0),
-                                                              new TopicPartition(INPUT_STREAM,
1));
-
-        consumer.assign(partitions);
-        consumer.seekToEnd(partitions);
-
-        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-        for (TopicPartition partition : partitions) {
-            final long position = consumer.position(partition);
-            offsets.put(partition, new OffsetAndMetadata(position + 1));
-        }
-
-        consumer.commitSync(offsets);
-        consumer.close();
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 156acad..851c36f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -153,7 +153,7 @@ public class InternalStreamsBuilderTest {
         assertEquals(storeName, topology.stateStores().get(0).name());
 
         assertEquals(1, topology.storeToChangelogTopic().size());
-        assertEquals("topic2", topology.storeToChangelogTopic().get(storeName));
+        assertEquals("app-id-prefix-STATE-STORE-0000000000-changelog", topology.storeToChangelogTopic().get(storeName));
         assertNull(table1.queryableStoreName());
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message