This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new da332f2 MINOR:Start processor inside verify message (#6029)
da332f2 is described below
commit da332f2241f1781aca64d1007c0c209b50ec5ff0
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Thu Dec 13 23:30:04 2018 -0500
MINOR:Start processor inside verify message (#6029)
This PR fixes a flaky system test.
I ran six runs of branch builder, and each run was parameterized to repeat the test 25
times for a total of 150 runs. All test runs passed.
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2122/
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2123/
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2124/
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2128/
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2129/
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/2130/
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>,
John Roesler <vvcephei@users.noreply.github.com>
---
.../apache/kafka/streams/tests/StreamsNamedRepartitionTest.java | 1 +
.../tests/streams/streams_named_repartition_topic_test.py | 8 ++++----
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
index 660de39..f836baa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -63,6 +63,7 @@ public class StreamsNamedRepartitionTest {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(),
Serdes.String()));
+ sourceStream.peek((k, v) -> System.out.println(String.format("input data key=%s,
value=%s", k, v)));
final KStream<String, String> mappedStream = sourceStream.selectKey((k, v)
-> keyFunction.apply(v));
diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
index 5baf612..b9894ee 100644
--- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
+++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
@@ -41,7 +41,7 @@ class StreamsNamedRepartitionTopicTest(Test):
}
self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
- self.kafka = KafkaService(self.test_context, num_nodes=1,
+ self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zookeeper, topics=self.topics)
self.producer = VerifiableProducer(self.test_context,
@@ -66,7 +66,6 @@ class StreamsNamedRepartitionTopicTest(Test):
for processor in processors:
processor.CLEAN_NODE_ENABLED = False
self.set_topics(processor)
- processor.start()
self.verify_running(processor, 'REBALANCING -> RUNNING')
self.verify_processing(processors)
@@ -76,7 +75,6 @@ class StreamsNamedRepartitionTopicTest(Test):
self.verify_stopped(processor)
# will tell app to add operations before repartition topic
processor.ADD_ADDITIONAL_OPS = 'true'
- processor.start()
self.verify_running(processor, 'UPDATED Topology')
self.verify_processing(processors)
@@ -89,7 +87,9 @@ class StreamsNamedRepartitionTopicTest(Test):
@staticmethod
def verify_running(processor, message):
- with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+ node = processor.node
+ with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+ processor.start()
monitor.wait_until(message,
timeout_sec=60,
err_msg="Never saw '%s' message " % message + str(processor.node.account))
|