kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR:Start processor inside verify message (#6029)
Date Fri, 14 Dec 2018 04:30:13 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da332f2  MINOR:Start processor inside verify message (#6029)
da332f2 is described below

commit da332f2241f1781aca64d1007c0c209b50ec5ff0
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Thu Dec 13 23:30:04 2018 -0500

    MINOR:Start processor inside verify message (#6029)
    
    This PR fixes a flaky system test.
    
    I ran six runs of branch builder, and each run was parameterized to repeat the test 25
times for a total of 150 runs. All test runs passed.
    
    https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2122/
    https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2123/
    https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2124/
    https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2128/
    https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2129/
    https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2130/
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>,
John Roesler <vvcephei@users.noreply.github.com>
---
 .../apache/kafka/streams/tests/StreamsNamedRepartitionTest.java   | 1 +
 .../tests/streams/streams_named_repartition_topic_test.py         | 8 ++++----
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
index 660de39..f836baa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -63,6 +63,7 @@ public class StreamsNamedRepartitionTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(),
Serdes.String()));
+        sourceStream.peek((k, v) -> System.out.println(String.format("input data key=%s,
value=%s", k, v)));
 
         final KStream<String, String> mappedStream = sourceStream.selectKey((k, v)
-> keyFunction.apply(v));
 
diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
index 5baf612..b9894ee 100644
--- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
+++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
@@ -41,7 +41,7 @@ class StreamsNamedRepartitionTopicTest(Test):
         }
 
         self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
-        self.kafka = KafkaService(self.test_context, num_nodes=1,
+        self.kafka = KafkaService(self.test_context, num_nodes=3,
                                   zk=self.zookeeper, topics=self.topics)
 
         self.producer = VerifiableProducer(self.test_context,
@@ -66,7 +66,6 @@ class StreamsNamedRepartitionTopicTest(Test):
         for processor in processors:
             processor.CLEAN_NODE_ENABLED = False
             self.set_topics(processor)
-            processor.start()
             self.verify_running(processor, 'REBALANCING -> RUNNING')
 
         self.verify_processing(processors)
@@ -76,7 +75,6 @@ class StreamsNamedRepartitionTopicTest(Test):
             self.verify_stopped(processor)
             #  will tell app to add operations before repartition topic
             processor.ADD_ADDITIONAL_OPS = 'true'
-            processor.start()
             self.verify_running(processor, 'UPDATED Topology')
 
         self.verify_processing(processors)
@@ -89,7 +87,9 @@ class StreamsNamedRepartitionTopicTest(Test):
 
     @staticmethod
     def verify_running(processor, message):
-        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+        node = processor.node
+        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            processor.start()
             monitor.wait_until(message,
                                timeout_sec=60,
                                err_msg="Never saw '%s' message " % message + str(processor.node.account))


Mime
View raw message