kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vvcep...@apache.org
Subject [kafka] branch 2.6 updated: KAFKA-10084: Fix EosTestDriver end offset (#8785)
Date Wed, 03 Jun 2020 16:33:54 GMT
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 432c540  KAFKA-10084: Fix EosTestDriver end offset (#8785)
432c540 is described below

commit 432c540a223bd885446aabb488a53f04775a3492
Author: John Roesler <vvcephei@users.noreply.github.com>
AuthorDate: Wed Jun 3 11:12:47 2020 -0500

    KAFKA-10084: Fix EosTestDriver end offset (#8785)
    
    Check the uncommitted end offset after the committed end offset,
    so we can be sure never to miss a pending end-transaction marker.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>
---
 .../apache/kafka/streams/tests/EosTestDriver.java  | 43 ++++++++++------------
 1 file changed, 20 insertions(+), 23 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index e95c354..45843aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -581,35 +581,32 @@ public class EosTestDriver extends SmokeTestUtil {
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
 
-        final Map<TopicPartition, Long> topicEndOffsets;
-
-        try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new KafkaConsumer<>(consumerProps))
{
-            topicEndOffsets = consumerUncommitted.endOffsets(partitions);
-        }
 
         final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
-        while (!topicEndOffsets.isEmpty() && System.currentTimeMillis() < maxWaitTime)
{
-            consumer.seekToEnd(partitions);
-
-            final Iterator<TopicPartition> iterator = partitions.iterator();
-            while (iterator.hasNext()) {
-                final TopicPartition topicPartition = iterator.next();
-                final long position = consumer.position(topicPartition);
-
-                if (position == topicEndOffsets.get(topicPartition)) {
-                    iterator.remove();
-                    topicEndOffsets.remove(topicPartition);
-                    System.out.println("Removing " + topicPartition + " at position " + position);
-                } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition))
{
-                    throw new IllegalStateException("Offset for partition " + topicPartition
+ " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition));
-                } else {
-                    System.out.println("Retry " + topicPartition + " at position " + position);
+        try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new KafkaConsumer<>(consumerProps))
{
+            while (System.currentTimeMillis() < maxWaitTime) {
+                consumer.seekToEnd(partitions);
+                final Map<TopicPartition, Long> topicEndOffsets = consumerUncommitted.endOffsets(partitions);
+
+                final Iterator<TopicPartition> iterator = partitions.iterator();
+                while (iterator.hasNext()) {
+                    final TopicPartition topicPartition = iterator.next();
+                    final long position = consumer.position(topicPartition);
+
+                    if (position == topicEndOffsets.get(topicPartition)) {
+                        iterator.remove();
+                        System.out.println("Removing " + topicPartition + " at position "
+ position);
+                    } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition))
{
+                        throw new IllegalStateException("Offset for partition " + topicPartition
+ " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition));
+                    } else {
+                        System.out.println("Retry " + topicPartition + " at position " +
position);
+                    }
                 }
+                sleep(1000L);
             }
-            sleep(1000L);
         }
 
-        if (!topicEndOffsets.isEmpty()) {
+        if (!partitions.isEmpty()) {
             throw new RuntimeException("Could not read all verification records. Did not
receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec.");
         }
     }


Mime
View raw message