kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6499: Do not write offset checkpoint file with empty offset map (#4492)
Date Thu, 01 Feb 2018 18:11:33 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a2aa4b  KAFKA-6499: Do not write offset checkpoint file with empty offset map (#4492)
4a2aa4b is described below

commit 4a2aa4bb6700c02b573c3ae71a40bdb8df5ab3a7
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Feb 1 10:11:29 2018 -0800

    KAFKA-6499: Do not write offset checkpoint file with empty offset map (#4492)
    
    * In Checkpoint.write(), if the offset map passed in is empty, skip the writing of the
file which would only contain version number and the empty size. From the reading pov, it
is the same as no file existed.
    * Add related unit tests.
    * Minor fixes on log4j messages.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Damian Guy <damian@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 .../processor/internals/AbstractStateManager.java   |  2 +-
 .../processor/internals/GlobalStateManagerImpl.java |  2 +-
 .../processor/internals/ProcessorStateManager.java  | 13 +++++++------
 .../streams/state/internals/OffsetCheckpoint.java   |  5 +++++
 .../internals/ProcessorStateManagerTest.java        |  6 +++---
 .../state/internals/OffsetCheckpointTest.java       | 21 +++++++++++++++++++--
 6 files changed, 36 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index d387762..b270e03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -65,7 +65,7 @@ abstract class AbstractStateManager implements StateManager {
         try {
             checkpoint.write(checkpointableOffsets);
         } catch (final IOException fatalException) {
-            log.error("Failed to update checkpoint file for global stores.", fatalException);
+            log.error("Failed to write offset checkpoint file to {} while re-initializing
{}: {}", checkpoint, stateStores, fatalException);
             throw new StreamsException("Failed to reinitialize global store.", fatalException);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 2d4ee8f..56e6bed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -339,7 +339,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements
Glob
             try {
                 checkpoint.write(checkpointableOffsets);
             } catch (IOException e) {
-                log.warn("Failed to write offsets checkpoint for global globalStores", e);
+                log.warn("Failed to write offset checkpoint file to {} for global stores:
{}", checkpoint, e);
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 1ee0e14..e7a23bd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -294,7 +294,6 @@ public class ProcessorStateManager extends AbstractStateManager {
     // write the checkpoint
     @Override
     public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
-        log.trace("Writing checkpoint: {}", ackedOffsets);
         checkpointableOffsets.putAll(changelogReader.restoredOffsets());
         for (final StateStore store : stores.values()) {
             final String storeName = store.name();
@@ -311,14 +310,16 @@ public class ProcessorStateManager extends AbstractStateManager {
                 }
             }
         }
-        // write the checkpoint file before closing, to indicate clean shutdown
+        // write the checkpoint file before closing
+        if (checkpoint == null) {
+            checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
+        }
+
+        log.trace("Writing checkpoint: {}", checkpointableOffsets);
         try {
-            if (checkpoint == null) {
-                checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
-            }
             checkpoint.write(checkpointableOffsets);
         } catch (final IOException e) {
-            log.warn("Failed to write checkpoint file to {}:", new File(baseDir, CHECKPOINT_FILE_NAME),
e);
+            log.warn("Failed to write offset checkpoint file to {}: {}", checkpoint, e);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 8c14737..9f0e1f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -66,6 +66,11 @@ public class OffsetCheckpoint {
      * @throws IOException if any file operation fails with an IO exception
      */
     public void write(final Map<TopicPartition, Long> offsets) throws IOException {
+        // if there is no offsets, skip writing the file to save disk IOs
+        if (offsets.isEmpty()) {
+            return;
+        }
+
         synchronized (lock) {
             // write to temp file and then swap with the existing file
             final File temp = new File(file.getAbsolutePath() + ".tmp");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index ab9abc3..31f07cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -309,8 +309,8 @@ public class ProcessorStateManagerTest {
             false,
             logContext);
         try {
-            // make sure the checkpoint file isn't deleted
-            assertTrue(checkpointFile.exists());
+            // make sure the checkpoint file is not written yet
+            assertFalse(checkpointFile.exists());
 
             stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
             stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
@@ -630,7 +630,7 @@ public class ProcessorStateManagerTest {
 
     @Test
     public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOException {
-        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+        checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName,
1), 123L));
         assertTrue(checkpointFile.exists());
 
         ProcessorStateManager stateManager = null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index 3b78d05..54cd3df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -34,8 +35,8 @@ public class OffsetCheckpointTest {
 
     @Test
     public void testReadWrite() throws IOException {
-        File f = TestUtils.tempFile();
-        OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+        final File f = TestUtils.tempFile();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
 
         try {
             Map<TopicPartition, Long> offsets = new HashMap<>();
@@ -56,4 +57,20 @@ public class OffsetCheckpointTest {
             checkpoint.delete();
         }
     }
+
+    @Test
+    public void shouldNotWriteCheckpointWhenNoOffsets() throws IOException {
+        // we do not need to worry about file name uniqueness since this file should not
be created
+        final File f = new File(TestUtils.tempDirectory().getAbsolutePath(), "kafka.tmp");
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+
+        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+
+        assertFalse(f.exists());
+
+        assertEquals(Collections.<TopicPartition, Long>emptyMap(), checkpoint.read());
+
+        // deleting a non-exist checkpoint file should be fine
+        checkpoint.delete();
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message