kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4509: Task reusage on rebalance fails for threads on same host
Date Tue, 13 Dec 2016 20:11:13 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 67f1e5b91 -> 859113786


KAFKA-4509: Task reusage on rebalance fails for threads on same host

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #2233 from mjsax/kafka-4509-task-reusage-fix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/85911378
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/85911378
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/85911378

Branch: refs/heads/trunk
Commit: 859113786957a36381222f21287a940767e92f1c
Parents: 67f1e5b
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Tue Dec 13 12:11:09 2016 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Dec 13 12:11:09 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/errors/LockException.java     |  36 ++++++
 .../internals/ProcessorStateManager.java        |  11 +-
 .../processor/internals/StreamThread.java       | 123 ++++++++++++++++---
 .../processor/internals/StreamThreadTest.java   |  88 ++++++++++++-
 4 files changed, 233 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java b/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
new file mode 100644
index 0000000..00c75ec
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may
obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.kafka.streams.errors;
+
+/**
+ * Indicates that the state store directory lock could not be acquired because another thread
holds the lock.
+ */
+public class LockException extends StreamsException {
+
+    private static final long serialVersionUID = 1L;
+
+    public LockException(final String s) {
+        super(s);
+    }
+
+    public LockException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public LockException(Throwable throwable) {
+        super(throwable);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 795949f..30b84f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -37,8 +38,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -71,11 +72,13 @@ public class ProcessorStateManager {
     private final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap;
 
     /**
-     * @throws IOException if any error happens while creating or locking the state directory
+     * @throws LockException if the state directory cannot be locked because another thread
holds the lock
+     *                       (this might be recoverable by retrying)
+     * @throws IOException if any severe error happens while creating or locking the state
directory
      */
     public ProcessorStateManager(String applicationId, TaskId taskId, Collection<TopicPartition>
sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby,
                                  StateDirectory stateDirectory, final Map<String, String>
sourceStoreToSourceTopic,
-                                 final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap)
throws IOException {
+                                 final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap)
throws LockException, IOException {
         this.applicationId = applicationId;
         this.defaultPartition = taskId.partition;
         this.taskId = taskId;
@@ -98,7 +101,7 @@ public class ProcessorStateManager {
         this.logPrefix = String.format("task [%s]", taskId);
 
         if (!stateDirectory.lock(taskId, 5)) {
-            throw new IOException(String.format("%s Failed to lock the state directory: %s",
logPrefix, baseDir.getCanonicalPath()));
+            throw new LockException(String.format("%s Failed to lock the state directory:
%s", logPrefix, baseDir.getCanonicalPath()));
         }
 
         // load the checkpoint information

http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 151bfd5..a7793f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -38,6 +38,8 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.processor.PartitionGrouper;
@@ -56,6 +58,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -216,6 +219,9 @@ public class StreamThread extends Thread {
 
     private ThreadCache cache;
 
+    private final TaskCreator taskCreator = new TaskCreator();
+    private final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator();
+
     final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
@@ -811,10 +817,12 @@ public class StreamThread extends Thread {
         if (partitionAssignor == null)
             throw new IllegalStateException(logPrefix + " Partition assignor has not been
initialized while adding stream tasks: this should not happen.");
 
-        // create the active tasks
+        final Map<TaskId, Set<TopicPartition>> newTasks = new HashMap<>();
+
+        // collect newly assigned tasks and reopen re-assigned tasks
         for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.activeTasks().entrySet())
{
-            TaskId taskId = entry.getKey();
-            Set<TopicPartition> partitions = entry.getValue();
+            final TaskId taskId = entry.getKey();
+            final Set<TopicPartition> partitions = entry.getValue();
 
             if (assignment.containsAll(partitions)) {
                 try {
@@ -823,14 +831,15 @@ public class StreamThread extends Thread {
                         log.debug("{} recycling old task {}", logPrefix, taskId);
                         suspendedTasks.remove(taskId);
                         task.initTopology();
+
+                        activeTasks.put(taskId, task);
+
+                        for (TopicPartition partition : partitions) {
+                            activeTasksByPartition.put(partition, task);
+                        }
                     } else {
-                        log.debug("{} creating new task {}", logPrefix, taskId);
-                        task = createStreamTask(taskId, partitions);
+                        newTasks.put(taskId, partitions);
                     }
-                    activeTasks.put(taskId, task);
-
-                    for (TopicPartition partition : partitions)
-                        activeTasksByPartition.put(partition, task);
                 } catch (StreamsException e) {
                     log.error("{} Failed to create an active task {}: ", logPrefix, taskId,
e);
                     throw e;
@@ -840,8 +849,12 @@ public class StreamThread extends Thread {
             }
         }
 
-        // finally destroy any remaining suspended tasks
+        // destroy any remaining suspended tasks
         removeSuspendedTasks();
+
+        // create all newly assigned tasks (guard against race condition with other thread
via backoff and retry)
+        // -> other thread will call removeSuspendedTasks(); eventually
+        taskCreator.retryWithBackoff(newTasks);
     }
 
     private StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions)
{
@@ -864,10 +877,12 @@ public class StreamThread extends Thread {
 
         Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
 
-        // create the standby tasks
+        final Map<TaskId, Set<TopicPartition>> newStandbyTasks = new HashMap<>();
+
+        // collect newly assigned standby tasks and reopen re-assigned standby tasks
         for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionAssignor.standbyTasks().entrySet())
{
-            TaskId taskId = entry.getKey();
-            Set<TopicPartition> partitions = entry.getValue();
+            final TaskId taskId = entry.getKey();
+            final Set<TopicPartition> partitions = entry.getValue();
             StandbyTask task = findMatchingSuspendedStandbyTask(taskId, partitions);
 
             if (task != null) {
@@ -875,9 +890,9 @@ public class StreamThread extends Thread {
                 suspendedStandbyTasks.remove(taskId);
                 task.initTopology();
             } else {
-                log.debug("{} creating new standby task {}", logPrefix, taskId);
-                task = createStandbyTask(taskId, partitions);
+                newStandbyTasks.put(taskId, partitions);
             }
+
             if (task != null) {
                 standbyTasks.put(taskId, task);
                 for (TopicPartition partition : partitions) {
@@ -891,9 +906,14 @@ public class StreamThread extends Thread {
                 checkpointedOffsets.putAll(task.checkpointedOffsets());
             }
         }
-        // finally destroy any remaining suspended tasks
+
+        // destroy any remaining suspended tasks
         removeSuspendedStandbyTasks();
 
+        // create all newly assigned standby tasks (guard against race condition with other
thread via backoff and retry)
+        // -> other thread will call removeSuspendedStandbyTasks(); eventually
+        standbyTaskCreator.retryWithBackoff(newStandbyTasks);
+
         restoreConsumer.assign(new ArrayList<>(checkpointedOffsets.keySet()));
 
         for (Map.Entry<TopicPartition, Long> entry : checkpointedOffsets.entrySet())
{
@@ -1153,4 +1173,75 @@ public class StreamThread extends Thread {
                 sensor.add(name, stat);
         }
     }
+
+    abstract class AbstractTaskCreator {
+        void retryWithBackoff(final Map<TaskId, Set<TopicPartition>> tasksToBeCreated)
{
+            long backoffTimeMs = 50L;
+            while (true) {
+                final Iterator<Map.Entry<TaskId, Set<TopicPartition>>>
it = tasksToBeCreated.entrySet().iterator();
+                while (it.hasNext()) {
+                    final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions
= it.next();
+                    final TaskId taskId = newTaskAndPartitions.getKey();
+                    final Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
+
+                    try {
+                        createTask(taskId, partitions);
+                        it.remove();
+                    } catch (final ProcessorStateException e) {
+                        if (e.getCause() instanceof LockException) {
+                            // ignore and retry
+                            log.warn("Could not create task {}. Will retry.", taskId, e);
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+
+                if (tasksToBeCreated.isEmpty()) {
+                    break;
+                }
+
+                try {
+                    Thread.sleep(backoffTimeMs);
+                    backoffTimeMs <<= 1;
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        }
+
+        abstract void createTask(final TaskId id, final Collection<TopicPartition>
partitions);
+    }
+
+    class TaskCreator extends AbstractTaskCreator {
+        void createTask(final TaskId taskId, final Collection<TopicPartition> partitions)
{
+            log.debug("{} creating new task {}", logPrefix, taskId);
+            final StreamTask task = createStreamTask(taskId, partitions);
+
+            activeTasks.put(taskId, task);
+
+            for (TopicPartition partition : partitions) {
+                activeTasksByPartition.put(partition, task);
+            }
+        }
+    }
+
+    class StandbyTaskCreator extends AbstractTaskCreator {
+        void createTask(final TaskId taskId, final Collection<TopicPartition> partitions)
{
+            log.debug("{} creating new standby task {}", logPrefix, taskId);
+            final StandbyTask task = createStandbyTask(taskId, partitions);
+
+            standbyTasks.put(taskId, task);
+
+            for (TopicPartition partition : partitions) {
+                standbyTasksByPartition.put(partition, task);
+            }
+            // collect checked pointed offsets to position the restore consumer
+            // this include all partitions from which we restore states
+            for (TopicPartition partition : task.checkpointedOffsets().keySet()) {
+                standbyTasksByPartition.put(partition, task);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/85911378/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index c491657..0c2ace9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -37,9 +32,11 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -57,6 +54,13 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
 public class StreamThreadTest {
 
     private final String clientId = "clientId";
@@ -118,6 +122,7 @@ public class StreamThreadTest {
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
             }
         };
     }
@@ -290,6 +295,79 @@ public class StreamThreadTest {
             (thread.state() == StreamThread.State.NOT_RUNNING));
     }
 
+    final static String TOPIC = "topic";
+    final Set<TopicPartition> assignmentThread1 = Collections.singleton(new TopicPartition(TOPIC,
0));
+    final Set<TopicPartition> assignmentThread2 = Collections.singleton(new TopicPartition(TOPIC,
1));
+
+    @Test
+    public void testHandingOverTaskFromOneToAnotherThread() throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addStateStore(
+            Stores
+                .create("store")
+                .withByteArrayKeys()
+                .withByteArrayValues()
+                .persistent()
+                .build()
+        );
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+        mockClientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new
TopicPartition(TOPIC, 1)));
+
+        final StreamThread thread1 = new StreamThread(builder, config, mockClientSupplier,
applicationId, clientId + 1, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        final StreamThread thread2 = new StreamThread(builder, config, mockClientSupplier,
applicationId, clientId + 2, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder));
+        thread1.partitionAssignor(new MockStreamsPartitionAssignor());
+        thread2.partitionAssignor(new MockStreamsPartitionAssignor());
+
+        // revoke (to get threads in correct state)
+        thread1.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
+        thread2.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
+
+        // assign
+        thread1.rebalanceListener.onPartitionsAssigned(assignmentThread1);
+        thread2.rebalanceListener.onPartitionsAssigned(assignmentThread2);
+
+        final Set<TaskId> originalTaskAssignmentThread1 = new HashSet<>();
+        for (TaskId tid : thread1.tasks().keySet()) {
+            originalTaskAssignmentThread1.add(tid);
+        }
+        final Set<TaskId> originalTaskAssignmentThread2 = new HashSet<>();
+        for (TaskId tid : thread2.tasks().keySet()) {
+            originalTaskAssignmentThread2.add(tid);
+        }
+
+        // revoke (task will be suspended)
+        thread1.rebalanceListener.onPartitionsRevoked(assignmentThread1);
+        thread2.rebalanceListener.onPartitionsRevoked(assignmentThread2);
+
+        // assign reverted
+        Thread runIt = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                thread1.rebalanceListener.onPartitionsAssigned(assignmentThread2);
+            }
+        });
+        runIt.start();
+
+        thread2.rebalanceListener.onPartitionsAssigned(assignmentThread1);
+
+        runIt.join();
+
+        assertThat(thread1.tasks().keySet(), equalTo(originalTaskAssignmentThread2));
+        assertThat(thread2.tasks().keySet(), equalTo(originalTaskAssignmentThread1));
+        assertThat(thread1.prevTasks(), equalTo(originalTaskAssignmentThread1));
+        assertThat(thread2.prevTasks(), equalTo(originalTaskAssignmentThread2));
+    }
+
+    private class MockStreamsPartitionAssignor extends StreamPartitionAssignor {
+        @Override
+        Map<TaskId, Set<TopicPartition>> activeTasks() {
+            Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+            activeTasks.put(new TaskId(0, 0), assignmentThread1);
+            activeTasks.put(new TaskId(0, 1), assignmentThread2);
+            return activeTasks;
+        }
+    }
 
     @Test
     public void testMaybeClean() throws Exception {


Mime
View raw message