kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 1.1 updated: MINOR: follow up on Streams EOS system tests (#4593)
Date Wed, 21 Feb 2018 18:23:52 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 6859529  MINOR: follow up on Streams EOS system tests (#4593)
6859529 is described below

commit 68595292f6b26e8005015bae878576066843b0c7
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Tue Feb 20 13:07:50 2018 -0800

    MINOR: follow up on Streams EOS system tests (#4593)
---
 .../org/apache/kafka/streams/tests/EosTestDriver.java     | 15 ++++++++++++---
 tests/kafkatest/tests/streams/streams_eos_test.py         |  1 -
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
index 7c7485d..752cdd6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
@@ -511,7 +511,6 @@ public class EosTestDriver extends SmokeTestUtil {
     private static void verifyCnt(final Map<TopicPartition, List<ConsumerRecord<byte[],
byte[]>>> inputPerTopicPerPartition,
                                   final Map<TopicPartition, List<ConsumerRecord<byte[],
byte[]>>> cntPerTopicPerPartition) {
         final StringDeserializer stringDeserializer = new StringDeserializer();
-        final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
         final LongDeserializer longDeserializer = new LongDeserializer();
 
         final HashMap<String, Long> currentSumPerKey = new HashMap<>();
@@ -552,7 +551,7 @@ public class EosTestDriver extends SmokeTestUtil {
                                                      final boolean withRepartitioning) {
         final String[] topics;
         if (withRepartitioning) {
-            topics = new String[] {"echo", "min", "sum", "repartition", "max", "min"};
+            topics = new String[] {"echo", "min", "sum", "repartition", "max", "cnt"};
         } else {
             topics = new String[] {"echo", "min", "sum"};
         }
@@ -560,7 +559,9 @@ public class EosTestDriver extends SmokeTestUtil {
         final List<TopicPartition> partitions = getAllPartitions(consumer, topics);
         consumer.assign(partitions);
         consumer.seekToEnd(partitions);
-        consumer.poll(0);
+        for (final TopicPartition tp : partitions) {
+            System.out.println(tp + " at position " + consumer.position(tp));
+        }
 
         final Properties producerProps = new Properties();
         producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "VerifyProducer");
@@ -591,6 +592,12 @@ public class EosTestDriver extends SmokeTestUtil {
         long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
         while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime)
{
             final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
+            if (records.isEmpty()) {
+                System.out.println("No data received.");
+                for (final TopicPartition tp : partitions) {
+                    System.out.println(tp + " at position " + consumer.position(tp));
+                }
+            }
             for (final ConsumerRecord<byte[], byte[]> record : records) {
                 maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
                 final String topic = record.topic();
@@ -604,6 +611,8 @@ public class EosTestDriver extends SmokeTestUtil {
                         throw new RuntimeException("Post transactions verification failed.
Received unexpected verification record: " +
                             "Expected record <'key','value'> from one of " + partitions
+ " but got"
                             + " <" + key + "," + value + "> [" + record.topic() + ",
" + record.partition() + "]");
+                    } else {
+                        System.out.println("Verifying " + tp + " successful.");
                     }
                 } catch (final SerializationException e) {
                     throw new RuntimeException("Post transactions verification failed. Received
unexpected verification record: " +
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py
index 986702c..d6ac600 100644
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ b/tests/kafkatest/tests/streams/streams_eos_test.py
@@ -38,7 +38,6 @@ class StreamsEosTest(KafkaTest):
         self.driver = StreamsEosTestDriverService(test_context, self.kafka)
         self.test_context = test_context
 
-    @ignored
     @cluster(num_nodes=9)
     def test_rebalance_simple(self):
         self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message