kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose
Date Mon, 04 Mar 2019 19:08:53 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9ee5f92  KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose
9ee5f92 is described below

commit 9ee5f920d5e4b837c3240dff948e120aaef7cd23
Author: Manikumar Reddy <manikumar.reddy@gmail.com>
AuthorDate: Tue Mar 5 00:38:15 2019 +0530

    KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose
    
    Port 22 is used by ssh, which causes the AdminClient to throw an OOM:
    
    > java.lang.OutOfMemoryError: Java heap space
    > 	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    > 	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    > 	at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
    > 	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
    > 	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    > 	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    > 	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:640)
    > 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:561)
    > 	at org.apache.kafka.common.network.Selector.poll(Selector.java:472)
    > 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
    > 	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1140)
    > 	at java.lang.Thread.run(Thread.java:748)
    >
    >
    
    Author: Manikumar Reddy <manikumar.reddy@gmail.com>
    Author: Ismael Juma <ismael@juma.me.uk>
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
    
    Closes #6360 from omkreddy/KAFKA-7312
---
 .../scala/integration/kafka/api/AdminClientIntegrationTest.scala    | 6 +++---
 core/src/test/scala/unit/kafka/utils/TestUtils.scala                | 4 ++++
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 6975328..1ee2234 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1045,7 +1045,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
   @Test
   def testForceClose(): Unit = {
     val config = createConfig()
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
     client = AdminClient.create(config)
     // Because the bootstrap servers are set up incorrectly, this call will not complete,
but must be
     // cancelled by the close operation.
@@ -1062,7 +1062,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
   @Test
   def testMinimumRequestTimeouts(): Unit = {
     val config = createConfig()
-    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:22")
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:${TestUtils.IncorrectBrokerPort}")
     config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0")
     client = AdminClient.create(config)
     val startTimeMs = Time.SYSTEM.milliseconds()
@@ -1070,7 +1070,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
       new CreateTopicsOptions().timeoutMs(2)).all()
     assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
     val endTimeMs = Time.SYSTEM.milliseconds()
-    assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs);
+    assertTrue("Expected the timeout to take at least one millisecond.", endTimeMs > startTimeMs)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ce0714d..d61771b 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -71,6 +71,10 @@ object TestUtils extends Logging {
   /* 0 gives a random port; you can then retrieve the assigned port from the Socket object.
*/
   val RandomPort = 0
 
+  /* Incorrect broker port which can used by kafka clients in tests. This port should not
be used
+   by any other service and hence we use a reserved port. */
+  val IncorrectBrokerPort = 225
+
   /** Port to use for unit tests that mock/don't require a real ZK server. */
   val MockZkPort = 1
   /** ZooKeeper connection string to use for unit tests that mock/don't require a real ZK
server. */


Mime
View raw message