kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Retry setting aligned time until set (#4893)
Date Thu, 19 Apr 2018 18:51:29 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 93fd570  MINOR: Retry setting aligned time until set (#4893)
93fd570 is described below

commit 93fd5707fab218abb2c9d13b034608f0a606bfc7
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Thu Apr 19 14:51:24 2018 -0400

    MINOR: Retry setting aligned time until set (#4893)
    
    In the AbstractResetIntegrationTest we can have a transient error when setting the time
for the test where the new time is less than the original time, for those cases we should
catch the exception and re-try setting the time once versus letting the test fail.
    
    For testing, ran the entire streams test suite.
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
---
 .../integration/AbstractResetIntegrationTest.java  | 31 ++++++++++++++++------
 1 file changed, 23 insertions(+), 8 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 8a82bf9..249e2c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -16,15 +16,13 @@
  */
 package org.apache.kafka.streams.integration;
 
-import kafka.admin.AdminClient;
-import kafka.tools.StreamsResetter;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -65,6 +63,9 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import kafka.admin.AdminClient;
+import kafka.tools.StreamsResetter;
+
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -106,11 +107,25 @@ public abstract class AbstractResetIntegrationTest {
             kafkaAdminClient = (KafkaAdminClient) org.apache.kafka.clients.admin.AdminClient.create(commonClientConfig);
         }
 
-        // we align time to seconds to get clean window boundaries and thus ensure the same
result for each run
-        // otherwise, input records could fall into different windows for different runs
depending on the initial mock time
-        final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000;
-        mockTime = cluster.time;
-        mockTime.setCurrentTimeMs(alignedTime);
+        boolean timeSet = false;
+        while (!timeSet) {
+            timeSet = setCurrentTime();
+        }
+    }
+
+    private boolean setCurrentTime() {
+        boolean currentTimeSet = false;
+        try {
+            mockTime = cluster.time;
+            // we align time to seconds to get clean window boundaries and thus ensure the
same result for each run
+            // otherwise, input records could fall into different windows for different runs
depending on the initial mock time
+            final long alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000;
+            mockTime.setCurrentTimeMs(alignedTime);
+            currentTimeSet = true;
+        } catch (final IllegalArgumentException e) {
+            // don't care will retry until set
+        }
+        return currentTimeSet;
     }
 
     private void prepareConfigs() {

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message