kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: Update code to not use deprecated methods (#6434)
Date Sat, 16 Mar 2019 02:48:01 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e1961b8  MINOR: Update code to not use deprecated methods (#6434)
e1961b8 is described below

commit e1961b8298c6d7ec028cd95721f4b498689c506e
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Fri Mar 15 19:47:40 2019 -0700

    MINOR: Update code to not use deprecated methods (#6434)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>,
Colin P. McCabe <cmccabe@confluent.io>
---
 core/src/main/scala/kafka/tools/MirrorMaker.scala           |  4 ++--
 .../integration/kafka/api/AdminClientIntegrationTest.scala  | 10 +++++-----
 .../scala/integration/kafka/api/BaseProducerSendTest.scala  | 13 +++++++------
 .../test/scala/integration/kafka/api/BaseQuotaTest.scala    |  7 ++++---
 .../scala/integration/kafka/api/ConsumerBounceTest.scala    | 11 ++++++-----
 .../integration/kafka/api/IntegrationTestHarness.scala      |  2 +-
 .../test/scala/integration/kafka/api/TransactionsTest.scala |  3 ++-
 .../kafka/server/DynamicBrokerReconfigurationTest.scala     |  4 ++--
 8 files changed, 29 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 823f0fd..fda1812 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -324,7 +324,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         // uncommitted record since last poll. Using one second as poll's timeout ensures
that
         // offsetCommitIntervalMs, of value greater than 1 second, does not see delays in
offset
         // commit.
-        recordIter = consumer.poll(Duration.ofSeconds(1)).iterator
+        recordIter = consumer.poll(Duration.ofSeconds(1L)).iterator
         if (!recordIter.hasNext)
           throw new NoRecordsException
       }
@@ -387,7 +387,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
 
     def close(timeout: Long) {
-      this.producer.close(timeout, TimeUnit.MILLISECONDS)
+      this.producer.close(Duration.ofMillis(timeout))
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index cf019a8..027266d 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.api
 
-import java.util
+import java.{time, util}
 import java.util.{Collections, Properties}
 import java.util.Arrays.asList
 import java.util.concurrent.{ExecutionException, TimeUnit}
@@ -1066,11 +1066,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     val topics = Seq("mytopic", "mytopic2")
     val newTopics = topics.map(new NewTopic(_, 1, 1))
     val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
-    client.close(2, TimeUnit.HOURS)
+    client.close(time.Duration.ofHours(2))
     val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
     assertFutureExceptionTypeEquals(future2, classOf[TimeoutException])
     future.get
-    client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect
+    client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout should have
no effect
   }
 
   /**
@@ -1086,7 +1086,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     // cancelled by the close operation.
     val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
       new CreateTopicsOptions().timeoutMs(900000)).all()
-    client.close(0, TimeUnit.MILLISECONDS)
+    client.close(time.Duration.ZERO)
     assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
   }
 
@@ -1164,7 +1164,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
           override def run {
             consumer.subscribe(Collections.singleton(testTopicName))
             while (true) {
-              consumer.poll(5000)
+              consumer.poll(time.Duration.ofSeconds(5L))
               consumer.commitSync()
             }
           }
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 9d454e9..2ce16d2 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.api
 
+import java.time.Duration
 import java.nio.charset.StandardCharsets
 import java.util.Properties
 import java.util.concurrent.TimeUnit
@@ -193,7 +194,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
           s"value$i".getBytes(StandardCharsets.UTF_8))
         producer.send(record)
       }
-      producer.close(timeoutMs, TimeUnit.MILLISECONDS)
+      producer.close(Duration.ofMillis(timeoutMs))
       val lastOffset = futures.foldLeft(0) { (offset, future) =>
         val recordMetadata = future.get
         assertEquals(topic, recordMetadata.topic)
@@ -248,7 +249,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
           s"value$i".getBytes(StandardCharsets.UTF_8))
         (record, producer.send(record, callback))
       }
-      producer.close(20000L, TimeUnit.MILLISECONDS)
+      producer.close(Duration.ofSeconds(20L))
       recordAndFutures.foreach { case (record, future) =>
         val recordMetadata = future.get
         if (timestampType == TimestampType.LOG_APPEND_TIME)
@@ -445,7 +446,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
       val responses = (0 until numRecords) map (_ => producer.send(record0))
       assertTrue("No request is complete.", responses.forall(!_.isDone()))
-      producer.close(0, TimeUnit.MILLISECONDS)
+      producer.close(Duration.ZERO)
       responses.foreach { future =>
         try {
           future.get()
@@ -454,7 +455,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
           case e: ExecutionException => assertEquals(classOf[KafkaException], e.getCause.getClass)
         }
       }
-      assertEquals("Fetch response should have no message returned.", 0, consumer.poll(50).count)
+      assertEquals("Fetch response should have no message returned.", 0, consumer.poll(Duration.ofMillis(50L)).count)
     }
   }
 
@@ -476,9 +477,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         if (sendRecords)
           (0 until numRecords) foreach (_ => producer.send(record))
         // The close call will be called by all the message callbacks. This tests idempotence
of the close call.
-        producer.close(0, TimeUnit.MILLISECONDS)
+        producer.close(Duration.ZERO)
         // Test close with non zero timeout. Should not block at all.
-        producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
+        producer.close()
       }
     }
     for (i <- 0 until 50) {
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index b28a40f..4b278f0 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -14,6 +14,7 @@
 
 package kafka.api
 
+import java.time.Duration
 import java.util.{Collections, HashMap, Properties}
 
 import kafka.api.QuotaTestClients._
@@ -140,7 +141,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     val endTimeMs = System.currentTimeMillis + 10000
     var throttled = false
     while ((!throttled || quotaTestClients.exemptRequestMetric == null) && System.currentTimeMillis
< endTimeMs) {
-      consumer.poll(100)
+      consumer.poll(Duration.ofMillis(100L))
       val throttleMetric = quotaTestClients.throttleMetric(QuotaType.Request, consumerClientId)
       throttled = throttleMetric != null && metricValue(throttleMetric) > 0
     }
@@ -197,7 +198,7 @@ abstract class QuotaTestClients(topic: String,
     var numConsumed = 0
     var throttled = false
     do {
-      numConsumed += consumer.poll(100).count
+      numConsumed += consumer.poll(Duration.ofMillis(100L)).count
       val metric = throttleMetric(QuotaType.Fetch, consumerClientId)
       throttled = metric != null && metricValue(metric) > 0
     }  while (numConsumed < maxRecords && !throttled)
@@ -206,7 +207,7 @@ abstract class QuotaTestClients(topic: String,
     if (throttled && numConsumed < maxRecords && waitForRequestCompletion)
{
       val minRecords = numConsumed + 1
       while (numConsumed < minRecords)
-        numConsumed += consumer.poll(100).count
+        numConsumed += consumer.poll(Duration.ofMillis(100L)).count
     }
     numConsumed
   }
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index e535104..1a1f37e 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -178,7 +178,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
     executor.schedule(new Runnable {
         def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor =
numBrokers)
       }, 2, TimeUnit.SECONDS)
-    consumer.poll(0)
+    consumer.poll(time.Duration.ZERO)
 
     val producer = createProducer()
 
@@ -481,14 +481,15 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
               revokeSemaphore.foreach(s => s.release())
             }
           })
-          consumer.poll(0)
+        // requires to used deprecated `poll(long)` to trigger metadata update
+          consumer.poll(0L)
         }, 0)
     }
 
     def waitForRebalance(timeoutMs: Long, future: Future[Any], otherConsumers: KafkaConsumer[Array[Byte],
Array[Byte]]*) {
       val startMs = System.currentTimeMillis
       while (System.currentTimeMillis < startMs + timeoutMs && !future.isDone)
-          otherConsumers.foreach(consumer => consumer.poll(100))
+          otherConsumers.foreach(consumer => consumer.poll(time.Duration.ofMillis(100L)))
       assertTrue("Rebalance did not complete in time", future.isDone)
     }
 
@@ -569,7 +570,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
       val closeGraceTimeMs = 2000
       val startNanos = System.nanoTime
       info("Closing consumer with timeout " + closeTimeoutMs + " ms.")
-      consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS)
+      consumer.close(time.Duration.ofMillis(closeTimeoutMs))
       val timeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startNanos)
       maxCloseTimeMs.foreach { ms =>
         assertTrue("Close took too long " + timeTakenMs, timeTakenMs < ms + closeGraceTimeMs)
@@ -592,7 +593,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
       }
       def onPartitionsRevoked(partitions: Collection[TopicPartition]) {
       }})
-    consumer.poll(3000)
+    consumer.poll(time.Duration.ofSeconds(3L))
     assertTrue("Assignment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS))
     if (committedRecords > 0)
       assertEquals(committedRecords, consumer.committed(tp).offset)
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 5a20005..640244d 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -126,7 +126,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
 
   @After
   override def tearDown() {
-    producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
+    producers.foreach(_.close(Duration.ZERO))
     consumers.foreach(_.wakeup())
     consumers.foreach(_.close(Duration.ZERO))
     producers.clear()
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 34dea70..e3bfdc0 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -18,6 +18,7 @@
 package kafka.api
 
 import java.lang.{Long => JLong}
+import java.time.Duration
 import java.util.{Optional, Properties}
 import java.util.concurrent.TimeUnit
 
@@ -578,7 +579,7 @@ class TransactionsTest extends KafkaServerTestHarness {
     try {
       producer.commitTransaction()
     } finally {
-      producer.close(0, TimeUnit.MILLISECONDS)
+      producer.close(Duration.ZERO)
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c14f41e..e8aa081 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -146,7 +146,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with
SaslSet
     clientThreads.foreach(_.initiateShutdown())
     clientThreads.foreach(_.join(5 * 1000))
     executors.foreach(_.shutdownNow())
-    producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
+    producers.foreach(_.close(Duration.ZERO))
     consumers.foreach(_.close(Duration.ofMillis(0)))
     adminClients.foreach(_.close())
     TestUtils.shutdownServers(servers)
@@ -1470,7 +1470,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness
with SaslSet
     override def doWork(): Unit = {
       try {
         while (isRunning || (lastReceived != producerThread.lastSent && System.currentTimeMillis
< endTimeMs)) {
-          val records = consumer.poll(50)
+          val records = consumer.poll(Duration.ofMillis(50L))
           received += records.count
           if (!records.isEmpty) {
             lastBatch = records


Mime
View raw message