This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 0.11.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/0.11.0 by this push: new 8852900 KAFKA-7021: checkpoint offsets from committed (#5232) 8852900 is described below commit 88529006b425bbc683da9fb8132fe6af823c953f Author: Guozhang Wang AuthorDate: Thu Jun 14 22:21:49 2018 -0700 KAFKA-7021: checkpoint offsets from committed (#5232) This is a cherry-pick PR from #5207 1. add the committed offsets to checkpointable offset map. 2. add the restoration integration test for the source KTable case. --- .../streams/processor/internals/AbstractTask.java | 4 +- .../processor/internals/StateDirectory.java | 2 +- .../streams/processor/internals/StreamTask.java | 15 +- .../integration/RestoreIntegrationTest.java | 193 +++++++++++++++++++++ 4 files changed, 208 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 364fbe8..7f6ac7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -163,7 +163,7 @@ public abstract class AbstractTask { return sb.toString(); } - protected Map recordCollectorOffsets() { + protected Map activeTaskCheckpointableOffsets() { return Collections.emptyMap(); } @@ -234,7 +234,7 @@ public abstract class AbstractTask { ProcessorStateException exception = null; log.trace("{} Closing state manager", logPrefix); try { - stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null); + stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null); } catch (final ProcessorStateException e) { exception = e; } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 8d46da1..a18175a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -81,7 +81,7 @@ public class StateDirectory { * @param taskId * @return directory for the {@link TaskId} */ - File directoryForTask(final TaskId taskId) { + public File directoryForTask(final TaskId taskId) { final File taskDir = new File(stateDir, taskId.toString()); if (!taskDir.exists() && !taskDir.mkdir()) { throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 4b24aab..86855f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -286,7 +286,7 @@ public class StreamTask extends AbstractTask implements Punctuator { public void run() { flushState(); if (!eosEnabled) { - stateMgr.checkpoint(recordCollectorOffsets()); + stateMgr.checkpoint(activeTaskCheckpointableOffsets()); } commitOffsets(startNewTransaction); } @@ -297,8 +297,17 @@ public class StreamTask extends AbstractTask implements Punctuator { } @Override - protected Map recordCollectorOffsets() { - return recordCollector.offsets(); + + protected Map activeTaskCheckpointableOffsets() { + // put both producer acked offsets and consumer committed offsets as checkpointable offsets + final Map checkpointableOffsets = recordCollector.offsets(); + for (final Map.Entry entry : consumedOffsets.entrySet()) { + if (!checkpointableOffsets.containsKey(entry.getKey())) { + checkpointableOffsets.put(entry.getKey(), entry.getValue()); + } + } + + return checkpointableOffsets; } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java new file mode 100644 index 0000000..54c2bd7 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.StateDirectory; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertTrue; + +@Category({IntegrationTest.class}) +public class RestoreIntegrationTest { + private static final int NUM_BROKERS = 1; + + private static final String APPID = "restore-test"; + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = + new EmbeddedKafkaCluster(NUM_BROKERS); + private static final String INPUT_STREAM = "input-stream"; + private static final String INPUT_STREAM_2 = "input-stream-2"; + private final int numberOfKeys = 10000; + private KafkaStreams kafkaStreams; + + @BeforeClass + public static void createTopics() throws InterruptedException { + CLUSTER.createTopic(INPUT_STREAM, 2, 1); + CLUSTER.createTopic(INPUT_STREAM_2, 2, 1); + CLUSTER.createTopic(APPID + "-store-changelog", 2, 1); + } + + private Properties props(final String applicationId) { + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return streamsConfiguration; + } + + @After + public void shutdown() { + if (kafkaStreams != null) { + kafkaStreams.close(30, TimeUnit.SECONDS); + } + } + + @Test + public void shouldRestoreStateFromSourceTopic() throws Exception { + final AtomicInteger numReceived = new AtomicInteger(0); + final KStreamBuilder builder = new KStreamBuilder(); + + final Properties props = props(APPID); + + // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions + final int offsetLimitDelta = 1000; + final int offsetCheckpointed = 1000; + createStateForRestoration(INPUT_STREAM); + setCommittedOffset(INPUT_STREAM, offsetLimitDelta); + + final StateDirectory stateDirectory = new StateDirectory(APPID, props.getProperty(StreamsConfig.STATE_DIR_CONFIG), new MockTime()); + new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint")) + .write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 0), (long) offsetCheckpointed)); + new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 1)), ".checkpoint")) + .write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 1), (long) offsetCheckpointed)); + + final CountDownLatch startupLatch = new CountDownLatch(1); + final CountDownLatch shutdownLatch = new CountDownLatch(1); + + builder.table(Serdes.Integer(), Serdes.Integer(), INPUT_STREAM, "store") + .toStream() + .foreach(new ForeachAction() { + @Override + public void apply(final Integer key, final Integer value) { + if (numReceived.incrementAndGet() == 2 * offsetLimitDelta) + shutdownLatch.countDown(); + } + }); + + kafkaStreams = new KafkaStreams(builder, props); + kafkaStreams.setStateListener(new KafkaStreams.StateListener() { + @Override + public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) { + if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) { + startupLatch.countDown(); + } + } + }); + + kafkaStreams.start(); + + assertTrue(startupLatch.await(30, TimeUnit.SECONDS)); + ReadOnlyKeyValueStore store = kafkaStreams.store("store", QueryableStoreTypes.keyValueStore()); + assertThat(store.approximateNumEntries(), equalTo((long) numberOfKeys - offsetLimitDelta * 2 - offsetCheckpointed * 2)); + + assertTrue(shutdownLatch.await(30, TimeUnit.SECONDS)); + assertThat(numReceived.get(), equalTo(offsetLimitDelta * 2)); + assertThat(store.approximateNumEntries(), equalTo((long) numberOfKeys - offsetCheckpointed * 2)); + } + + private void createStateForRestoration(final String changelogTopic) { + final Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + + try (final KafkaProducer producer = + new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer())) { + + for (int i = 0; i < numberOfKeys; i++) { + producer.send(new ProducerRecord<>(changelogTopic, i, i)); + } + } + } + + private void setCommittedOffset(final String topic, final int limitDelta) { + final Properties consumerConfig = new Properties(); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, APPID); + consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "commit-consumer"); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + + final Consumer consumer = new KafkaConsumer(consumerConfig); + final List partitions = Arrays.asList( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1)); + + consumer.assign(partitions); + consumer.seekToEnd(partitions); + + for (TopicPartition partition : partitions) { + final long position = consumer.position(partition); + consumer.seek(partition, position - limitDelta); + } + + consumer.commitSync(); + consumer.close(); + } + +} -- To stop receiving notification emails like this one, please contact guozhang@apache.org.