kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7140; Remove deprecated poll usages (#5319)
Date Sat, 11 Aug 2018 05:51:28 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a78d76  KAFKA-7140; Remove deprecated poll usages (#5319)
8a78d76 is described below

commit 8a78d76466bacd8a2a3487cc84890d29c9bc4a3d
Author: Viktor Somogyi <viktorsomogyi@gmail.com>
AuthorDate: Sat Aug 11 07:51:17 2018 +0200

    KAFKA-7140; Remove deprecated poll usages (#5319)
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Jason Gustafson <jason@confluent.io>
---
 .../kafka/connect/runtime/WorkerSinkTask.java       |  3 ++-
 .../apache/kafka/connect/util/KafkaBasedLog.java    |  3 ++-
 .../connect/runtime/ErrorHandlingTaskTest.java      |  5 +++--
 .../kafka/connect/runtime/WorkerSinkTaskTest.java   | 15 ++++++++-------
 .../connect/runtime/WorkerSinkTaskThreadedTest.java |  9 +++++----
 .../main/scala/kafka/tools/ConsoleConsumer.scala    |  5 +++--
 .../scala/kafka/tools/ConsumerPerformance.scala     |  4 ++--
 .../main/scala/kafka/tools/EndToEndLatency.scala    | 21 +++++++++++++--------
 core/src/main/scala/kafka/tools/MirrorMaker.scala   |  3 ++-
 .../src/main/scala/kafka/tools/StreamsResetter.java | 10 +++++++---
 examples/src/main/java/kafka/examples/Consumer.java |  3 ++-
 .../kafka/tools/TransactionalMessageCopier.java     |  3 ++-
 .../org/apache/kafka/tools/VerifiableConsumer.java  |  3 ++-
 .../kafka/trogdor/workload/ConsumeBenchWorker.java  |  3 ++-
 .../kafka/trogdor/workload/RoundTripWorker.java     |  3 ++-
 15 files changed, 57 insertions(+), 36 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 47f8529..692331e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -53,6 +53,7 @@ import org.apache.kafka.connect.util.SinkUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -441,7 +442,7 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) {
-        ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
+        ConsumerRecords<byte[], byte[]> msgs = consumer.poll(Duration.ofMillis(timeoutMs));
 
         // Exceptions raised from the task during a rebalance should be rethrown to stop
the worker
         if (rebalanceException != null) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index de1ceb3..ea9b4c6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -35,6 +35,7 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -253,7 +254,7 @@ public class KafkaBasedLog<K, V> {
 
     private void poll(long timeoutMs) {
         try {
-            ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
+            ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(timeoutMs));
             for (ConsumerRecord<K, V> record : records)
                 consumedCallback.onCompletion(null, record);
         } catch (WakeupException e) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 1bf9c71..6d92c34 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -65,6 +65,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -180,8 +181,8 @@ public class ErrorHandlingTaskTest {
         // bad json
         ConsumerRecord<byte[], byte[]> record2 = new ConsumerRecord<>(TOPIC,
PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes());
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record1));
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record2));
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record1));
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andReturn(records(record2));
 
         sinkTask.put(EasyMock.anyObject());
         EasyMock.expectLastCall().times(2);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 4a7c760..33ab2ef 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -58,6 +58,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -458,7 +459,7 @@ public class WorkerSinkTaskTest {
         sinkTask.open(partitions);
         EasyMock.expectLastCall();
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
@@ -893,7 +894,7 @@ public class WorkerSinkTaskTest {
         // Expect the next poll to discover and perform the rebalance, THEN complete the
previous callback handler,
         // and then return one record for TP1 and one for TP3.
         final AtomicBoolean rebalanced = new AtomicBoolean();
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
@@ -1273,7 +1274,7 @@ public class WorkerSinkTaskTest {
         sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
         EasyMock.expectLastCall().andReturn(Collections.emptyMap());
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
@@ -1298,7 +1299,7 @@ public class WorkerSinkTaskTest {
         sinkTask.open(partitions);
         EasyMock.expectLastCall().andThrow(e);
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
@@ -1315,7 +1316,7 @@ public class WorkerSinkTaskTest {
         sinkTask.open(partitions);
         EasyMock.expectLastCall();
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[],
byte[]>>() {
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(new
IAnswer<ConsumerRecords<byte[], byte[]>>() {
             @Override
             public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                 rebalanceListener.getValue().onPartitionsAssigned(partitions);
@@ -1332,7 +1333,7 @@ public class WorkerSinkTaskTest {
     private void expectConsumerWakeup() {
         consumer.wakeup();
         EasyMock.expectLastCall();
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andThrow(new WakeupException());
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andThrow(new
WakeupException());
     }
 
     private void expectConsumerPoll(final int numMessages) {
@@ -1340,7 +1341,7 @@ public class WorkerSinkTaskTest {
     }
 
     private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType
timestampType) {
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 73689d3..d0089e9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -55,6 +55,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -525,7 +526,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         sinkTask.open(partitions);
         EasyMock.expectLastCall();
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[],
byte[]>>() {
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(new
IAnswer<ConsumerRecords<byte[], byte[]>>() {
             @Override
             public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                 rebalanceListener.getValue().onPartitionsAssigned(partitions);
@@ -557,7 +558,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs)
throws Exception {
         // Stub out all the consumer stream/iterator responses, which we just want to verify
occur,
         // but don't care about the exact details here.
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andStubAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
@@ -595,7 +596,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         // Currently the SinkTask's put() method will not be invoked unless we provide some
data, so instead of
         // returning empty data, we return one record. The expectation is that the data will
be ignored by the
         // response behavior specified using the return value of this method.
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
@@ -625,7 +626,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         final Map<TopicPartition, Long> offsets = new HashMap<>();
         offsets.put(TOPIC_PARTITION, startOffset);
 
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+        EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong()))).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable
{
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 7e2c564..365652a 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -19,6 +19,7 @@ package kafka.tools
 
 import java.io.PrintStream
 import java.nio.charset.StandardCharsets
+import java.time.Duration
 import java.util.concurrent.CountDownLatch
 import java.util.regex.Pattern
 import java.util.{Collections, Locale, Properties, Random}
@@ -388,7 +389,7 @@ object ConsoleConsumer extends Logging {
   private[tools] class ConsumerWrapper(topic: Option[String], partitionId: Option[Int], offset:
Option[Long], whitelist: Option[String],
                                        consumer: Consumer[Array[Byte], Array[Byte]], val
timeoutMs: Long = Long.MaxValue) {
     consumerInit()
-    var recordIter = consumer.poll(0).iterator
+    var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], Array[Byte]]]().iterator()
 
     def consumerInit() {
       (topic, partitionId, offset, whitelist) match {
@@ -432,7 +433,7 @@ object ConsoleConsumer extends Logging {
 
     def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
       if (!recordIter.hasNext) {
-        recordIter = consumer.poll(timeoutMs).iterator
+        recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator
         if (!recordIter.hasNext)
           throw new TimeoutException()
       }
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 5af55a8..2e7b8dd 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -28,8 +28,8 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
 import kafka.utils.{CommandLineUtils, ToolsUtils}
 import java.util.{Collections, Properties, Random}
-
 import java.text.SimpleDateFormat
+import java.time.Duration
 
 import com.typesafe.scalalogging.LazyLogging
 
@@ -127,7 +127,7 @@ object ConsumerPerformance extends LazyLogging {
     var currentTimeMillis = lastConsumedTime
 
     while (messagesRead < count && currentTimeMillis - lastConsumedTime <=
timeout) {
-      val records = consumer.poll(100).asScala
+      val records = consumer.poll(Duration.ofMillis(100)).asScala
       currentTimeMillis = System.currentTimeMillis
       if (records.nonEmpty)
         lastConsumedTime = currentTimeMillis
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 3beaf82..4849b1e 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -18,11 +18,13 @@
 package kafka.tools
 
 import java.nio.charset.StandardCharsets
-import java.util.{Arrays, Collections, Properties}
+import java.time.Duration
+import java.util.{Arrays, Properties}
 
 import kafka.utils.Exit
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -69,9 +71,7 @@ object EndToEndLatency {
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal
batching
-
     val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
-    consumer.subscribe(Collections.singletonList(topic))
 
     val producerProps = loadProps
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
@@ -82,16 +82,21 @@ object EndToEndLatency {
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
 
+    // sends a dummy message to create the topic if it doesn't exist
+    producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, Array[Byte]())).get()
+
     def finalise() {
       consumer.commitSync()
       producer.close()
       consumer.close()
     }
 
-    //Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually
performs the seek only when
-    //a poll() or position() request is issued. Hence we need to poll after we seek to ensure
we see our first write.
-    consumer.seekToEnd(Collections.emptyList())
-    consumer.poll(0)
+
+    val topicPartitions = consumer.partitionsFor(topic).asScala
+      .map(p => new TopicPartition(p.topic(), p.partition())).asJava
+    consumer.assign(topicPartitions)
+    consumer.seekToEnd(topicPartitions)
+    consumer.assignment().asScala.foreach(consumer.position)
 
     var totalTime = 0.0
     val latencies = new Array[Long](numMessages)
@@ -103,7 +108,7 @@ object EndToEndLatency {
 
       //Send message (of random bytes) synchronously then immediately poll for it
       producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get()
-      val recordIter = consumer.poll(timeout).iterator
+      val recordIter = consumer.poll(Duration.ofMillis(timeout)).iterator
 
       val elapsed = System.nanoTime - begin
 
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d7e09e4..d55d96b 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -17,6 +17,7 @@
 
 package kafka.tools
 
+import java.time.Duration
 import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
@@ -452,7 +453,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         // uncommitted record since last poll. Using one second as poll's timeout ensures
that
         // offsetCommitIntervalMs, of value greater than 1 second, does not see delays in
offset
         // commit.
-        recordIter = consumer.poll(1000).iterator
+        recordIter = consumer.poll(Duration.ofSeconds(1)).iterator
         if (!recordIter.hasNext)
           throw new NoRecordsException
       }
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 3c045c6..09d3b9e 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -47,6 +47,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -57,6 +58,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * {@link StreamsResetter} resets the processing state of a Kafka Streams application so
that, for example, you can reprocess its input from scratch.
@@ -313,10 +315,12 @@ public class StreamsResetter {
         config.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
         try (final KafkaConsumer<byte[], byte[]> client = new KafkaConsumer<>(config,
new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
-            client.subscribe(topicsToSubscribe);
-            client.poll(1);
+            Collection<TopicPartition> partitions = topicsToSubscribe.stream().map(client::partitionsFor)
+                    .flatMap(Collection::stream)
+                    .map(info -> new TopicPartition(info.topic(), info.partition()))
+                    .collect(Collectors.toList());
+            client.assign(partitions);
 
-            final Set<TopicPartition> partitions = client.assignment();
             final Set<TopicPartition> inputTopicPartitions = new HashSet<>();
             final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>();
 
diff --git a/examples/src/main/java/kafka/examples/Consumer.java b/examples/src/main/java/kafka/examples/Consumer.java
index be062b3..26d6e23 100644
--- a/examples/src/main/java/kafka/examples/Consumer.java
+++ b/examples/src/main/java/kafka/examples/Consumer.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
 
@@ -47,7 +48,7 @@ public class Consumer extends ShutdownableThread {
     @Override
     public void doWork() {
         consumer.subscribe(Collections.singletonList(this.topic));
-        ConsumerRecords<Integer, String> records = consumer.poll(1000);
+        ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
         for (ConsumerRecord<Integer, String> record : records) {
             System.out.println("Received message: (" + record.key() + ", " + record.value()
+ ") at offset " + record.offset());
         }
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
index 0d74645..27e7c7f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -287,7 +288,7 @@ public class TransactionalMessageCopier {
                 try {
                     producer.beginTransaction();
                     while (messagesInCurrentTransaction < numMessagesForNextTransaction)
{
-                        ConsumerRecords<String, String> records = consumer.poll(200L);
+                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
                         for (ConsumerRecord<String, String> record : records) {
                             producer.send(producerRecordFromConsumerRecord(outputTopic, record));
                             messagesInCurrentTransaction++;
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index cc09b23..58f3471 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.utils.Utils;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -220,7 +221,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback,
Cons
             consumer.subscribe(Collections.singletonList(topic), this);
 
             while (!isFinished()) {
-                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
+                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                 Map<TopicPartition, OffsetAndMetadata> offsets = onRecordsReceived(records);
 
                 if (!useAutoCommit) {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index 1a85296..c3a90e4 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.kafka.trogdor.task.TaskWorker;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
@@ -135,7 +136,7 @@ public class ConsumeBenchWorker implements TaskWorker {
             long startBatchMs = startTimeMs;
             try {
                 while (messagesConsumed < spec.maxMessages()) {
-                    ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
+                    ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(50));
                     if (records.isEmpty()) {
                         continue;
                     }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 570f6a1..669fafc 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -337,7 +338,7 @@ public class RoundTripWorker implements TaskWorker {
                 while (true) {
                     try {
                         pollInvoked++;
-                        ConsumerRecords<byte[], byte[]> records = consumer.poll(50);
+                        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(50));
                         for (Iterator<ConsumerRecord<byte[], byte[]>> iter =
records.iterator(); iter.hasNext(); ) {
                             ConsumerRecord<byte[], byte[]> record = iter.next();
                             int messageIndex = ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();


Mime
View raw message