kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] 01/02: KAFKA-7192: Wipe out if EOS is turned on and checkpoint file does not exist (#5421)
Date Wed, 01 Aug 2018 01:13:42 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2a63c5113902c71a41b9ef48b6fd3cd29b5892da
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Jul 26 17:58:29 2018 -0700

    KAFKA-7192: Wipe out if EOS is turned on and checkpoint file does not exist (#5421)
    
    1. As titled and as described in comments.
    2. Modified unit test slightly to insert for new keys in committed data to expose this
issue.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../streams/processor/internals/AbstractTask.java  |  4 ++
 .../streams/processor/internals/StateRestorer.java |  4 ++
 .../processor/internals/StoreChangelogReader.java  | 24 ++++++++--
 .../streams/integration/EosIntegrationTest.java    | 26 +++++++----
 .../internals/StoreChangelogReaderTest.java        | 51 +++++++++++++++-------
 5 files changed, 81 insertions(+), 28 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 188ff47..94e4c71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -137,6 +137,10 @@ public abstract class AbstractTask implements Task {
         return toString("");
     }
 
+    public boolean isEosEnabled() {
+        return eosEnabled;
+    }
+
     /**
      * Produces a string representation containing useful information about a Task starting
with the given indent.
      * This is useful in debugging scenarios.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 33dce9e..c1a41ce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -55,6 +55,10 @@ public class StateRestorer {
         return partition;
     }
 
+    public String storeName() {
+        return storeName;
+    }
+
     long checkpoint() {
         return checkpoint == null ? NO_CHECKPOINT : checkpoint;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 07af801..1927b5a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -71,7 +71,7 @@ public class StoreChangelogReader implements ChangelogReader {
 
     public Collection<TopicPartition> restore(final RestoringTasks active) {
         if (!needsInitializing.isEmpty()) {
-            initialize();
+            initialize(active);
         }
 
         if (needsRestoring.isEmpty()) {
@@ -111,7 +111,7 @@ public class StoreChangelogReader implements ChangelogReader {
         return completed();
     }
 
-    private void initialize() {
+    private void initialize(final RestoringTasks active) {
         if (!restoreConsumer.subscription().isEmpty()) {
             throw new StreamsException("Restore consumer should not be subscribed to any
topics (" + restoreConsumer.subscription() + ")");
         }
@@ -165,11 +165,12 @@ public class StoreChangelogReader implements ChangelogReader {
 
         // set up restorer for those initializable
         if (!initializable.isEmpty()) {
-            startRestoration(initializable);
+            startRestoration(initializable, active);
         }
     }
 
-    private void startRestoration(final Map<TopicPartition, StateRestorer> initialized)
{
+    private void startRestoration(final Map<TopicPartition, StateRestorer> initialized,
+                                  final RestoringTasks active) {
         log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
 
         final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment());
@@ -186,6 +187,18 @@ public class StoreChangelogReader implements ChangelogReader {
                 restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
                 restorer.restoreStarted();
             } else {
+                final StreamTask task = active.restoringTaskFor(restorer.partition());
+
+                // If checkpoint does not exist it means the task was not shutdown gracefully
before;
+                // and in this case if EOS is turned on we should wipe out the state and
re-initialize the task
+                if (task.isEosEnabled()) {
+                    log.info("No checkpoint found for task {} state store {} changelog {}
with EOS turned on. " +
+                            "Reinitializing the task and restore its state from the beginning.",
task.id, restorer.storeName(), restorer.partition());
+                    task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition()));
+                } else {
+                    log.info("Restoring task {}'s state store {} from beginning of the changelog
{} ", task.id, restorer.storeName(), restorer.partition());
+                }
+
                 restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
                 needsPositionUpdate.add(restorer);
             }
@@ -280,6 +293,9 @@ public class StoreChangelogReader implements ChangelogReader {
         if (!restoreRecords.isEmpty()) {
             restorer.restore(restoreRecords);
             restorer.restoreBatchCompleted(lastRestoredOffset, records.size());
+
+            log.trace("Restored from {} to {} with {} records, ending offset is {}, next
starting position is {}",
+                    restorer.partition(), restorer.storeName(), records.size(), lastRestoredOffset,
nextPosition);
         }
 
         return nextPosition;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 30c90c2..770f579 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -391,8 +391,9 @@ public class EosIntegrationTest {
         // the app is supposed to emit all 40 update records into the output topic
         // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted
writes
         // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
+        // in the uncommitted batch sending some data for the new key to validate that upon
resuming they will not be shown up in the store
         //
-        // the failure gets inject after 20 committed and 30 uncommitted records got received
+        // the failure gets inject after 20 committed and 10 uncommitted records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records and the state stores should
contain the correct sums
         // per key (even if some records got processed twice)
@@ -402,7 +403,7 @@ public class EosIntegrationTest {
             streams.start();
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L,
10L, 0L, 1L);
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L,
15L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L,
15L, 0L, 1L, 2L, 3L);
 
             final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
             dataBeforeFailure.addAll(committedDataBeforeFailure);
@@ -610,10 +611,6 @@ public class EosIntegrationTest {
 
                     @Override
                     public KeyValue<Long, Long> transform(final Long key, final Long
value) {
-                        if (errorInjected.compareAndSet(true, false)) {
-                            // only tries to fail once on one of the task
-                            throw new RuntimeException("Injected test exception.");
-                        }
                         if (gcInjected.compareAndSet(true, false)) {
                             while (doGC) {
                                 try {
@@ -631,16 +628,27 @@ public class EosIntegrationTest {
 
                         if (state != null) {
                             Long sum = state.get(key);
+
                             if (sum == null) {
                                 sum = value;
                             } else {
                                 sum += value;
                             }
                             state.put(key, sum);
-                            context.forward(key, sum);
-                            return null;
+                            state.flush();
+                        }
+
+
+                        if (errorInjected.compareAndSet(true, false)) {
+                            // only tries to fail once on one of the task
+                            throw new RuntimeException("Injected test exception.");
+                        }
+
+                        if (state != null) {
+                            return new KeyValue<>(key, state.get(key));
+                        } else {
+                            return new KeyValue<>(key, value);
                         }
-                        return new KeyValue<>(key, value);
                     }
 
                     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 90abf32..1e74d47 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -119,7 +119,10 @@ public class StoreChangelogReaderTest {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true, "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
+
         assertThat(callback.restored.size(), equalTo(messages));
     }
 
@@ -136,8 +139,8 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true,
                                                    "storeName"));
 
-        EasyMock.expect(active.restoringTaskFor(topicPartition)).andReturn(task);
-        EasyMock.replay(active);
+        EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        EasyMock.replay(active, task);
 
         // first restore call "fails" but we should not die with an exception
         assertEquals(0, changelogReader.restore(active).size());
@@ -164,7 +167,8 @@ public class StoreChangelogReaderTest {
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true,
                                                    "storeName"));
-
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
         assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet()));
     }
@@ -175,6 +179,8 @@ public class StoreChangelogReaderTest {
         final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener,
null, 3, true,
                                                          "storeName");
         changelogReader.register(restorer);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
@@ -192,14 +198,14 @@ public class StoreChangelogReaderTest {
         setupConsumer(5, one);
         setupConsumer(3, two);
 
-        changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE,
true, "storeName1"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true, "storeName1"));
         changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE,
true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE,
true, "storeName3"));
 
-        expect(active.restoringTaskFor(one)).andReturn(null);
-        expect(active.restoringTaskFor(two)).andReturn(null);
-        replay(active);
+        expect(active.restoringTaskFor(one)).andStubReturn(task);
+        expect(active.restoringTaskFor(two)).andStubReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(10));
@@ -224,9 +230,13 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE,
true, "storeName2"));
         changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE,
true, "storeName3"));
 
-        expect(active.restoringTaskFor(one)).andReturn(null);
-        expect(active.restoringTaskFor(two)).andReturn(null);
-        replay(active);
+        expect(active.restoringTaskFor(one)).andReturn(task);
+        expect(active.restoringTaskFor(two)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andReturn(task);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
+        changelogReader.restore(active);
+
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(10));
@@ -248,6 +258,8 @@ public class StoreChangelogReaderTest {
         setupConsumer(10, topicPartition);
         changelogReader
             .register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored.size(), equalTo(5));
@@ -306,7 +318,10 @@ public class StoreChangelogReaderTest {
     public void shouldReturnRestoredOffsetsForPersistentStores() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true, "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
+
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L)));
     }
@@ -315,6 +330,8 @@ public class StoreChangelogReaderTest {
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, false, "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
@@ -330,6 +347,8 @@ public class StoreChangelogReaderTest {
         consumer.assign(Collections.singletonList(topicPartition));
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, false,
                                                    "storeName"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
         changelogReader.restore(active);
 
         assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes,
bytes), KeyValue.pair(bytes, bytes))));
@@ -340,6 +359,9 @@ public class StoreChangelogReaderTest {
         final Collection<TopicPartition> expected = Collections.singleton(topicPartition);
         setupConsumer(0, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true, "store"));
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        replay(active, task);
+
         final Collection<TopicPartition> restored = changelogReader.restore(active);
         assertThat(restored, equalTo(expected));
     }
@@ -354,10 +376,9 @@ public class StoreChangelogReaderTest {
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, false, "storeName"));
 
         final TopicPartition postInitialization = new TopicPartition("other", 0);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
-        expect(active.restoringTaskFor(topicPartition)).andReturn(null);
-        expect(active.restoringTaskFor(postInitialization)).andReturn(null);
-        replay(active);
+        expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
+        expect(active.restoringTaskFor(postInitialization)).andStubReturn(task);
+        replay(active, task);
 
         assertTrue(changelogReader.restore(active).isEmpty());
 


Mime
View raw message