kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Log encountered exception during rebalance
Date Thu, 07 Sep 2017 01:38:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 212bce6e3 -> 3f155eaa2


MINOR: Log encountered exception during rebalance

Some other minor changes:

1. Do not throw the exception form callback as it would only be swallowed by consumer coordinator;
remembering it and re-throw in the next loop is good enough.
2. Change Creating to Defining in Stores to avoid confusions that the stores have already
been successfully created at that time.
3. Do not need unAssignChangeLogPartitions as the restore consumer will be unassigned already
inside changelog reader.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com>

Closes #3769 from guozhangwang/KMinor-logging-before-throwing


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f155eaa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f155eaa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f155eaa

Branch: refs/heads/trunk
Commit: 3f155eaa23c6081e4afa3b49f0f8a65a16b8e05c
Parents: 212bce6
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Sep 6 18:38:08 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Sep 6 18:38:08 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 21 ++--------
 .../org/apache/kafka/streams/state/Stores.java  |  4 +-
 .../processor/internals/StreamThreadTest.java   | 41 ++++++--------------
 3 files changed, 17 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3f155eaa/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 818992e..6a45ef7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -256,14 +256,11 @@ public class StreamThread extends Thread implements ThreadDataProvider
{
                     return;
                 }
                 taskManager.createTasks(assignment);
-                final RuntimeException exception = streamThread.unAssignChangeLogPartitions();
-                if (exception != null) {
-                    throw exception;
-                }
                 streamThread.refreshMetadataState();
             } catch (final Throwable t) {
+                log.error("{} Error caught during partition assignment, " +
+                        "will abort the current process and re-throw at the end of rebalance:
{}", logPrefix, t.getMessage());
                 streamThread.setRebalanceException(t);
-                throw t;
             } finally {
                 log.info("{} partition assignment took {} ms.\n" +
                                  "\tcurrent active tasks: {}\n" +
@@ -294,8 +291,9 @@ public class StreamThread extends Thread implements ThreadDataProvider
{
                     // suspend active tasks
                     taskManager.suspendTasksAndState();
                 } catch (final Throwable t) {
+                    log.error("{} Error caught during partition revocation, " +
+                            "will abort the current process and re-throw at the end of rebalance:
{}", logPrefix, t.getMessage());
                     streamThread.setRebalanceException(t);
-                    throw t;
                 } finally {
                     streamThread.refreshMetadataState();
                     streamThread.clearStandbyRecords();
@@ -1163,17 +1161,6 @@ public class StreamThread extends Thread implements ThreadDataProvider
{
         log.info("{} Shutdown complete", logPrefix);
     }
 
-    private RuntimeException unAssignChangeLogPartitions() {
-        try {
-            // un-assign the change log partitions
-            restoreConsumer.assign(Collections.<TopicPartition>emptyList());
-        } catch (final RuntimeException e) {
-            log.error("{} Failed to un-assign change log partitions due to the following
error:", logPrefix, e);
-            return e;
-        }
-        return null;
-    }
-
     private void clearStandbyRecords() {
         standbyRecords.clear();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f155eaa/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index fef4ade..3cf22c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -90,7 +90,7 @@ public class Stores {
 
                                     @Override
                                     public StateStoreSupplier build() {
-                                        log.trace("Creating InMemory Store name={} capacity={}
logged={}", name, capacity, logged);
+                                        log.trace("Defining InMemory Store name={} capacity={}
logged={}", name, capacity, logged);
                                         if (capacity < Integer.MAX_VALUE) {
                                             return new InMemoryLRUCacheStoreSupplier<>(name,
capacity, keySerde, valueSerde, logged, logConfig);
                                         }
@@ -154,7 +154,7 @@ public class Stores {
 
                                     @Override
                                     public StateStoreSupplier build() {
-                                        log.trace("Creating RocksDb Store name={} numSegments={}
logged={}", name, numSegments, logged);
+                                        log.trace("Defining RocksDb Store name={} numSegments={}
logged={}", name, numSegments, logged);
                                         if (sessionWindows) {
                                             return new RocksDBSessionStoreSupplier<>(name,
retentionPeriod, keySerde, valueSerde, logged, logConfig, cachingEnabled);
                                         } else if (numSegments > 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f155eaa/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 51e6568..b82c1ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
@@ -72,7 +71,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 public class StreamThreadTest {
 
@@ -105,25 +103,11 @@ public class StreamThreadTest {
     private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
     private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
 
-    private final List<PartitionInfo> infos = Arrays.asList(
-        new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0])
-    );
-
-
     // task0 is unused
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
-    private final TaskId task3 = new TaskId(0, 3);
-    private final TaskId task4 = new TaskId(1, 1);
-    private final TaskId task5 = new TaskId(1, 2);
+    private final TaskId task3 = new TaskId(1, 1);
+    private final TaskId task4 = new TaskId(1, 2);
 
     private Properties configProps(final boolean enableEos) {
         return new Properties() {
@@ -281,10 +265,10 @@ public class StreamThreadTest {
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
         thread.runOnce(-1);
 
+        assertTrue(thread.tasks().containsKey(task3));
         assertTrue(thread.tasks().containsKey(task4));
-        assertTrue(thread.tasks().containsKey(task5));
-        assertEquals(expectedGroup1, thread.tasks().get(task4).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task5).partitions());
+        assertEquals(expectedGroup1, thread.tasks().get(task3).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
         assertEquals(2, thread.tasks().size());
 
         // revoke four partitions and assign three partitions of both subtopologies
@@ -300,9 +284,9 @@ public class StreamThreadTest {
         thread.runOnce(-1);
 
         assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task4));
+        assertTrue(thread.tasks().containsKey(task3));
         assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(task3).partitions());
         assertEquals(2, thread.tasks().size());
 
         // revoke all three partitons and reassign the same three partitions (from different
subtopologies)
@@ -315,9 +299,9 @@ public class StreamThreadTest {
         thread.runOnce(-1);
 
         assertTrue(thread.tasks().containsKey(task1));
-        assertTrue(thread.tasks().containsKey(task4));
+        assertTrue(thread.tasks().containsKey(task3));
         assertEquals(expectedGroup1, thread.tasks().get(task1).partitions());
-        assertEquals(expectedGroup2, thread.tasks().get(task4).partitions());
+        assertEquals(expectedGroup2, thread.tasks().get(task3).partitions());
         assertEquals(2, thread.tasks().size());
 
         // revoke all partitions and assign nothing
@@ -905,11 +889,8 @@ public class StreamThreadTest {
 
         thread.rebalanceListener.onPartitionsRevoked(null);
         clientSupplier.producers.get(0).fenceProducer();
-        try {
-            thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
-            thread.runOnce(-1);
-            fail("should have thrown " + ProducerFencedException.class.getSimpleName());
-        } catch (final ProducerFencedException e) { }
+        thread.rebalanceListener.onPartitionsAssigned(task0Assignment);
+        thread.runOnce(-1);
 
         assertTrue(thread.tasks().isEmpty());
     }


Mime
View raw message