kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] 02/03: HOTFIX: Fix spotsbug failure in Kafka examples (#8051)
Date Wed, 12 Feb 2020 20:46:12 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c853929c3a7c57f2292e676efe4286e34ddeea10
Author: Boyang Chen <boyang@confluent.io>
AuthorDate: Thu Feb 6 10:58:05 2020 -0800

    HOTFIX: Fix spotsbug failure in Kafka examples (#8051)
    
    Reviewers: Jason Gustafson <jason@confluent.io>
---
 .../kafka/examples/KafkaConsumerProducerDemo.java    |  8 +++++++-
 .../java/kafka/examples/KafkaExactlyOnceDemo.java    | 20 ++++++++++++++------
 2 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index 561732b..21d85c3 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -16,6 +16,8 @@
  */
 package kafka.examples;
 
+import org.apache.kafka.common.errors.TimeoutException;
+
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -28,7 +30,11 @@ public class KafkaConsumerProducerDemo {
 
         Consumer consumerThread = new Consumer(KafkaProperties.TOPIC, "DemoConsumer", false,
10000, latch);
         consumerThread.start();
-        latch.await(5, TimeUnit.MINUTES);
+
+        if (!latch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for demo producer
and consumer to finish");
+        }
+
         consumerThread.shutdown();
         System.out.println("All finished!");
     }
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
index d418eba..288b786 100644
--- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -19,6 +19,7 @@ package kafka.examples;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 
@@ -77,9 +78,9 @@ public class KafkaExactlyOnceDemo {
         }
 
         String mode = args[0];
-        int numPartitions = Integer.valueOf(args[1]);
-        int numInstances = Integer.valueOf(args[2]);
-        int numRecords = Integer.valueOf(args[3]);
+        int numPartitions = Integer.parseInt(args[1]);
+        int numInstances = Integer.parseInt(args[2]);
+        int numRecords = Integer.parseInt(args[3]);
 
         /* Stage 1: topic cleanup and recreation */
         recreateTopics(numPartitions);
@@ -90,7 +91,9 @@ public class KafkaExactlyOnceDemo {
         Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, numRecords,
prePopulateLatch);
         producerThread.start();
 
-        prePopulateLatch.await(5, TimeUnit.MINUTES);
+        if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for data pre-population");
+        }
 
         CountDownLatch transactionalCopyLatch = new CountDownLatch(numInstances);
 
@@ -102,7 +105,9 @@ public class KafkaExactlyOnceDemo {
             messageProcessor.start();
         }
 
-        transactionalCopyLatch.await(5, TimeUnit.MINUTES);
+        if (!transactionalCopyLatch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for transactionally
message copy");
+        }
 
         CountDownLatch consumeLatch = new CountDownLatch(1);
 
@@ -110,7 +115,10 @@ public class KafkaExactlyOnceDemo {
         Consumer consumerThread = new Consumer(OUTPUT_TOPIC, "Verify-consumer", true, numRecords,
consumeLatch);
         consumerThread.start();
 
-        consumeLatch.await(5, TimeUnit.MINUTES);
+        if (!consumeLatch.await(5, TimeUnit.MINUTES)) {
+            throw new TimeoutException("Timeout after 5 minutes waiting for output data consumption");
+        }
+
         consumerThread.shutdown();
         System.out.println("All finished!");
     }


Mime
View raw message