kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: log state store restore offsets
Date Wed, 22 Mar 2017 19:24:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 cf310ec48 -> 75df53f4b


MINOR: log state store restore offsets

Debug logging of the start and end offsets used during state store restoration

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2724 from dguy/log-restore-offsets-0.10.2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/75df53f4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/75df53f4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/75df53f4

Branch: refs/heads/0.10.2
Commit: 75df53f4b3923fee79a7457f1e32abca7e5e98ee
Parents: cf310ec
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Mar 22 12:24:33 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Mar 22 12:24:33 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/ProcessorStateManager.java | 4 ++++
 .../apache/kafka/streams/integration/ResetIntegrationTest.java   | 1 -
 2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/75df53f4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index ad16c77..65831a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -219,6 +219,10 @@ public class ProcessorStateManager implements StateManager {
             } else {
                 restoreConsumer.seekToBeginning(singleton(storePartition));
             }
+            log.debug("restoring partition {} from offset {} to endOffset {}",
+                      storePartition,
+                      restoreConsumer.position(storePartition),
+                      endOffset);
 
             // restore its state from changelog records
             long limit = offsetLimit(storePartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/75df53f4/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 35a58f0..39889b1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -23,7 +23,6 @@ import kafka.utils.MockTime;
 import kafka.utils.ZkUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.LongDeserializer;


Mime
View raw message