kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch 2.4 updated: KAFKA-9079: Fix reset logic in transactional message copier
Date Wed, 06 Nov 2019 10:31:09 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

The following commit(s) were added to refs/heads/2.4 by this push:
     new fba3e7e  KAFKA-9079: Fix reset logic in transactional message copier
fba3e7e is described below

commit fba3e7e1f9c88bf5026239c876a5278510e5e445
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Wed Nov 6 15:59:51 2019 +0530

    KAFKA-9079: Fix reset logic in transactional message copier
    The consumer's `committed` API does not return an entry in the response map for a requested
partition if there is no committed offset. The transactional message copier, which is used
in the transaction system test, did not account for this. If the first transaction attempted
by the copier was randomly aborted, then we would not seek to the beginning as expected, which
means we would fail to copy some of the records.
    This patch fixes the problem by iterating over the assignment rather than the result of
`committed` when resetting offsets. It also adds enables additional logging in the transaction
message copier service to make finding problems easier in the future.
    Author: Jason Gustafson <jason@confluent.io>
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
    Closes #7653 from hachikuji/fix-transaction-system-test
    (cherry picked from commit 903d66e2f9220fc8a016f68b798ef11930372557)
    Signed-off-by: Manikumar Reddy <manikumar@confluent.io>
 tests/kafkatest/services/transactional_message_copier.py               | 3 ++-
 .../main/java/org/apache/kafka/tools/TransactionalMessageCopier.java   | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/tests/kafkatest/services/transactional_message_copier.py b/tests/kafkatest/services/transactional_message_copier.py
index 53fffa4..1a6a34c 100644
--- a/tests/kafkatest/services/transactional_message_copier.py
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -63,7 +63,8 @@ class TransactionalMessageCopier(KafkaPathResolverMixin, BackgroundThreadService
         self.stop_timeout_sec = 60
         self.enable_random_aborts = enable_random_aborts
         self.loggers = {
-            "org.apache.kafka.clients.producer.internals": "TRACE"
+            "org.apache.kafka.clients.producer": "TRACE",
+            "org.apache.kafka.clients.consumer": "TRACE"
     def _worker(self, idx, node):
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index cfbac1a..746b8fe 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -196,7 +196,8 @@ public class TransactionalMessageCopier {
     private static void resetToLastCommittedPositions(KafkaConsumer<String, String>
consumer) {
         final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
-        committed.forEach((tp, offsetAndMetadata) -> {
+        consumer.assignment().forEach(tp -> {
+            OffsetAndMetadata offsetAndMetadata = committed.get(tp);
             if (offsetAndMetadata != null)
                 consumer.seek(tp, offsetAndMetadata.offset());

View raw message