From commits-return-14742-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Sat Jun 6 03:43:01 2020 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id 351DD19301 for ; Sat, 6 Jun 2020 03:42:58 +0000 (UTC) Received: (qmail 25905 invoked by uid 500); 6 Jun 2020 03:42:58 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 25860 invoked by uid 500); 6 Jun 2020 03:42:57 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 25847 invoked by uid 99); 6 Jun 2020 03:42:57 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Jun 2020 03:42:57 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 7A83F814A0; Sat, 6 Jun 2020 03:42:57 +0000 (UTC) Date: Sat, 06 Jun 2020 03:42:54 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 2.6 updated: MINOR: improve code encapsulation between StreamThread and TaskManager (#8819) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <159141497059.27065.9150950954392937810@gitbox.apache.org> From: mjsax@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/2.6 X-Git-Reftype: branch X-Git-Oldrev: bc82c7d2284ad03969a7c7f8bd1d934b8cb65437 X-Git-Newrev: 775d1b3bee1efcd56e3d118d4bb124dd9532ef7c X-Git-Rev: 775d1b3bee1efcd56e3d118d4bb124dd9532ef7c X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.6 by this push: new 775d1b3 MINOR: improve code encapsulation between StreamThread and TaskManager (#8819) 775d1b3 is described below commit 775d1b3bee1efcd56e3d118d4bb124dd9532ef7c Author: Matthias J. Sax AuthorDate: Fri Jun 5 20:29:50 2020 -0700 MINOR: improve code encapsulation between StreamThread and TaskManager (#8819) Reviewers: Boyang Chen , John Roesler --- .../streams/processor/internals/StreamThread.java | 21 +---------------- .../streams/processor/internals/TaskManager.java | 26 +++++++++++++++++----- .../processor/internals/TaskManagerTest.java | 1 - 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 6e7a3aa..3a57cc6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -644,7 +644,7 @@ public class StreamThread extends Thread { if (records != null && !records.isEmpty()) { pollSensor.record(pollLatency, now); pollRecordsSensor.record(records.count(), now); - addRecordsToTasks(records); + taskManager.addRecordsToTasks(records); } // Shutdown hook could potentially be triggered and transit the thread state to PENDING_SHUTDOWN during #pollRequests(). @@ -820,25 +820,6 @@ public class StreamThread extends Thread { } /** - * Take records and add them to each respective task - * - * @param records Records, can be null - */ - private void addRecordsToTasks(final ConsumerRecords records) { - for (final TopicPartition partition : records.partitions()) { - final Task task = taskManager.taskForInputPartition(partition); - - if (task == null) { - log.error("Unable to locate active task for received-record partition {}. Current tasks: {}", - partition, taskManager.toString(">")); - throw new NullPointerException("Task was unexpectedly missing for partition " + partition); - } - - task.addRecords(partition, records.records(partition)); - } - } - - /** * Try to commit all active tasks owned by this thread. * * Visible for testing. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index d1be8a3..5c55093 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.DeleteRecordsResult; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -86,7 +87,7 @@ public class TaskManager { private boolean rebalanceInProgress = false; // if we are in the middle of a rebalance, it is not safe to commit // includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance - private Set lockedTaskDirectories = new HashSet<>(); + private final Set lockedTaskDirectories = new HashSet<>(); TaskManager(final ChangelogReader changelogReader, final UUID processId, @@ -743,10 +744,6 @@ public class TaskManager { .collect(Collectors.toSet()); } - Task taskForInputPartition(final TopicPartition partition) { - return partitionToTask.get(partition); - } - Map tasks() { // not bothering with an unmodifiable map, since the tasks themselves are mutable, but // if any outside code modifies the map or the tasks, it would be a severe transgression. @@ -779,6 +776,25 @@ public class TaskManager { } /** + * Take records and add them to each respective task + * + * @param records Records, can be null + */ + void addRecordsToTasks(final ConsumerRecords records) { + for (final TopicPartition partition : records.partitions()) { + final Task task = partitionToTask.get(partition); + + if (task == null) { + log.error("Unable to locate active task for received-record partition {}. Current tasks: {}", + partition, toString(">")); + throw new NullPointerException("Task was unexpectedly missing for partition " + partition); + } + + task.addRecords(partition, records.records(partition)); + } + } + + /** * @throws TaskMigratedException if committing offsets failed (non-EOS) * or if the task producer got fenced (EOS) * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 4d38b2f..7f31d7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -694,7 +694,6 @@ public class TaskManagerTest { assertThat(taskManager.tryToCompleteRestoration(), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); assertEquals(newPartitionsSet, task00.inputPartitions()); - assertEquals(task00, taskManager.taskForInputPartition(t1p1)); verify(activeTaskCreator, consumer, changeLogReader); }