kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5167: fix unreleased state lock due to uncaught exception when closing a task
Date Sat, 13 May 2017 05:50:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 8bf70610b -> 6ca71c064


KAFKA-5167: fix unreleased state lock due to uncaught exception when closing a task

Catch any exception throw from user code during `task.close()` and close state manger to release
state lock even in case on an exception.

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Eno Thereska, Damian Guy, Guozhang Wang

Closes #3001 from mjsax/kafka-5167-rebalance-lock-exception-0102


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ca71c06
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ca71c06
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ca71c06

Branch: refs/heads/0.10.2
Commit: 6ca71c064940605873e93ebf82ca8c404c8b43e2
Parents: 8bf7061
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Fri May 12 22:50:05 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri May 12 22:50:05 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       |   8 +-
 .../processor/internals/StreamThreadTest.java   | 166 +++++++++++++++++--
 2 files changed, 155 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca71c06/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a0ee9a8..abaacce 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -899,6 +899,7 @@ public class StreamThread extends Thread {
                 } catch (Exception e) {
                     log.error("{} Failed to remove suspended task {}", logPrefix, next.getKey(),
e);
                 } finally {
+                    task.closeStateManager(false);
                     suspendedTaskIterator.remove();
                 }
             }
@@ -907,11 +908,11 @@ public class StreamThread extends Thread {
     }
 
     private void closeNonAssignedSuspendedStandbyTasks() {
-        final Set<TaskId> currentSuspendedTaskIds = partitionAssignor.standbyTasks().keySet();
+        final Set<TaskId> currentStandbyTaskIds = partitionAssignor.standbyTasks().keySet();
         final Iterator<Map.Entry<TaskId, StandbyTask>> standByTaskIterator =
suspendedStandbyTasks.entrySet().iterator();
         while (standByTaskIterator.hasNext()) {
             final Map.Entry<TaskId, StandbyTask> suspendedTask = standByTaskIterator.next();
-            if (!currentSuspendedTaskIds.contains(suspendedTask.getKey())) {
+            if (!currentStandbyTaskIds.contains(suspendedTask.getKey())) {
                 log.debug("{} Closing suspended non-assigned standby task {}", logPrefix,
suspendedTask.getKey());
                 final StandbyTask task = suspendedTask.getValue();
                 try {
@@ -920,6 +921,7 @@ public class StreamThread extends Thread {
                 } catch (Exception e) {
                     log.error("{} Failed to remove suspended task standby {}", logPrefix,
suspendedTask.getKey(), e);
                 } finally {
+                    task.closeStateManager(false);
                     standByTaskIterator.remove();
                 }
             }
@@ -1211,7 +1213,7 @@ public class StreamThread extends Thread {
                         it.remove();
                     } catch (final LockException e) {
                         // ignore and retry
-                        log.warn("Could not create task {}. Will retry.", taskId, e);
+                        log.warn("Could not create task {}. Will retry: {}", taskId, e.getMessage());
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ca71c06/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 0e98f56..c2f62d6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -17,19 +17,6 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -54,11 +41,24 @@ import org.apache.kafka.test.MockInternalTopicManager;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
-import org.junit.Before;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -67,11 +67,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import static org.junit.Assert.assertThat;
-
 public class StreamThreadTest {
 
     private final String clientId = "clientId";
@@ -414,15 +413,27 @@ public class StreamThreadTest {
     private class MockStreamsPartitionAssignor extends StreamPartitionAssignor {
 
         private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;
+        private final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment;
 
         public MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>>
activeTaskAssignment) {
+            this(activeTaskAssignment, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        }
+
+        public MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>>
activeTaskAssignment,
+                                            final Map<TaskId, Set<TopicPartition>>
standbyTaskAssignment) {
             this.activeTaskAssignment = activeTaskAssignment;
+            this.standbyTaskAssignment = standbyTaskAssignment;
         }
 
         @Override
         Map<TaskId, Set<TopicPartition>> activeTasks() {
             return activeTaskAssignment;
         }
+
+        @Override
+        Map<TaskId, Set<TopicPartition>> standbyTasks() {
+            return standbyTaskAssignment;
+        }
     }
 
     @Test
@@ -1104,6 +1115,129 @@ public class StreamThreadTest {
 
     }
 
+    @Test
+    public void shouldReleaseStateDirLockIfFailureOnTaskCloseForUnassignedSuspendedTask()
throws Exception {
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId(applicationId);
+
+        final TaskId taskId = new TaskId(0, 0);
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final StateDirectory stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
+
+        final TestStreamTask testStreamTask = new TestStreamTask(taskId,
+            applicationId,
+            Utils.mkSet(new TopicPartition("topic", 0)),
+            builder.build(0),
+            clientSupplier.consumer,
+            clientSupplier.producer,
+            clientSupplier.restoreConsumer,
+            config,
+            new MockStreamsMetrics(new Metrics()),
+            stateDirectory) {
+
+            @Override
+            public void close() {
+                throw new RuntimeException("KABOOM!!!");
+            }
+        };
+
+        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+            clientId, processId, new Metrics(), new MockTime(),
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) {
+            @Override
+            protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition>
partitions) {
+                return testStreamTask;
+            }
+        };
+
+        final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
+        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(activeTasks));
+        thread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId,
Set<TopicPartition>>emptyMap()));
+
+        final StateDirectory testStateDir = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
+        try {
+            assertFalse(testStateDir.lock(taskId, 0));
+            thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+            assertTrue(testStateDir.lock(taskId, 0));
+        } finally {
+            testStateDir.unlock(taskId);
+        }
+    }
+
+    @Test
+    public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask()
throws Exception {
+        final String storeName = "store";
+        final String changelogTopic = applicationId + "-" + storeName + "-changelog";
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        builder.setApplicationId(applicationId);
+        builder.stream("topic1").groupByKey().count(storeName);
+
+        final TaskId taskId = new TaskId(0, 0);
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+        final StreamsConfig config = new StreamsConfig(configProps());
+        final StateDirectory stateDirectory = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
+
+        clientSupplier.restoreConsumer.updatePartitions(changelogTopic,
+            Collections.singletonList(new PartitionInfo(changelogTopic, 0, null, null, null)));
+        clientSupplier.restoreConsumer.updateBeginningOffsets(new HashMap<TopicPartition,
Long>() {
+            {
+                put(new TopicPartition(changelogTopic, 0), 0L);
+            }
+        });
+        clientSupplier.restoreConsumer.updateEndOffsets(new HashMap<TopicPartition, Long>()
{
+            {
+                put(new TopicPartition(changelogTopic, 0), 0L);
+            }
+        });
+
+        final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
+            clientId, processId, new Metrics(), new MockTime(),
+            new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0)
+        {
+            @Override
+            protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition>
partitions) {
+                return new StandbyTask(
+                    taskId,
+                    applicationId,
+                    partitions,
+                    builder.build(0),
+                    clientSupplier.consumer,
+                    clientSupplier.restoreConsumer,
+                    config,
+                    new StreamsMetricsImpl(new Metrics(), "groupName", Collections.<String,
String>emptyMap()),
+                    stateDirectory) {
+
+                    @Override
+                    public void close() {
+                        throw new RuntimeException("KABOOM!!!");
+                    }
+                };
+            }
+        };
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+        standbyTasks.put(taskId, Collections.singleton(new TopicPartition("topic", 0)));
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId,
Set<TopicPartition>>emptyMap(), standbyTasks));
+        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptySet());
+        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+
+        thread.partitionAssignor(new MockStreamsPartitionAssignor(Collections.<TaskId,
Set<TopicPartition>>emptyMap()));
+
+        final StateDirectory testStateDir = new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
+        try {
+            assertFalse(testStateDir.lock(taskId, 0));
+            thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+            assertTrue(testStateDir.lock(taskId, 0));
+        } finally {
+            testStateDir.unlock(taskId);
+        }
+    }
 
     private void initPartitionGrouper(StreamsConfig config, StreamThread thread, MockClientSupplier
clientSupplier) {
 


Mime
View raw message