kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Control plane listener tests should not use static port (#7033)
Date Thu, 04 Jul 2019 15:22:57 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2b529d1  MINOR: Control plane listener tests should not use static port (#7033)
2b529d1 is described below

commit 2b529d15022c022a1f6705555442f089394d6ba3
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Thu Jul 4 08:22:45 2019 -0700

    MINOR: Control plane listener tests should not use static port (#7033)
    
    We recently saw a few failing tests recently due to the static reliance on port 5000.
For example:
    ```
    org.apache.kafka.common.KafkaException: Socket server failed to bind to localhost:5000:
Address already in use.
    	at kafka.network.Acceptor.openServerSocket(SocketServer.scala:605)
    	at kafka.network.Acceptor.<init>(SocketServer.scala:481)
    	at kafka.network.SocketServer.createAcceptor(SocketServer.scala:253)
    	at kafka.network.SocketServer.$anonfun$createControlPlaneAcceptorAndProcessor$1(SocketServer.scala:234)
    	at kafka.network.SocketServer.$anonfun$createControlPlaneAcceptorAndProcessor$1$adapted(SocketServer.scala:232)
    	at scala.Option.foreach(Option.scala:438)
    	at kafka.network.SocketServer.createControlPlaneAcceptorAndProcessor(SocketServer.scala:232)
    	at kafka.network.SocketServer.startup(SocketServer.scala:119)
    	at kafka.network.SocketServerTest.withTestableServer(SocketServerTest.scala:1139)
    	at kafka.network.SocketServerTest.testControlPlaneRequest(SocketServerTest.scala:198)
    ```
    This patch fixes the failing tests to dynamically select the port.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../kafka/controller/ControllerIntegrationTest.scala | 20 +++++++++++---------
 .../scala/unit/kafka/network/SocketServerTest.scala  | 10 +++++++---
 2 files changed, 18 insertions(+), 12 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index a69a4a2..473fb03 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -89,7 +89,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
 
   @Test
   def testMetadataPropagationOnControlPlane(): Unit = {
-    servers = makeServers(1, listeners = Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:5000"),
listenerSecurityProtocolMap = Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"),
+    servers = makeServers(1,
+      listeners = Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:0"),
+      listenerSecurityProtocolMap = Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"),
       controlPlaneListenerName = Some("CONTROLLER"))
     TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
     val controlPlaneMetricMap = mutable.Map[String, KafkaMetric]()
@@ -102,14 +104,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
         dataPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric)
       }
     }
-    assertEquals(1e-0, controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double],
0)
-    assertEquals(0e-0, dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double],
0)
-    assertEquals(1e-0, controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double],
0)
-    assertEquals(0e-0, dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double],
0)
-    assertTrue(controlPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double]
> 1.0)
-    assertTrue(dataPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double]
== 0.0)
-    assertTrue(controlPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double]
== 2.0)
-    assertTrue(dataPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double]
== 0.0)
+    assertEquals(1e-0, controlPlaneMetricMap("response-total").metricValue().asInstanceOf[Double],
0)
+    assertEquals(0e-0, dataPlaneMetricMap("response-total").metricValue().asInstanceOf[Double],
0)
+    assertEquals(1e-0, controlPlaneMetricMap("request-total").metricValue().asInstanceOf[Double],
0)
+    assertEquals(0e-0, dataPlaneMetricMap("request-total").metricValue().asInstanceOf[Double],
0)
+    assertTrue(controlPlaneMetricMap("incoming-byte-total").metricValue().asInstanceOf[Double]
> 1.0)
+    assertTrue(dataPlaneMetricMap("incoming-byte-total").metricValue().asInstanceOf[Double]
== 0.0)
+    assertTrue(controlPlaneMetricMap("network-io-total").metricValue().asInstanceOf[Double]
== 2.0)
+    assertTrue(dataPlaneMetricMap("network-io-total").metricValue().asInstanceOf[Double]
== 0.0)
   }
 
   // This test case is used to ensure that there will be no correctness issue after we avoid
sending out full
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 7bed0d5..82b8bf9 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -135,7 +135,10 @@ class SocketServerTest {
     channel.sendResponse(new RequestChannel.SendResponse(request, send, Some(request.header.toString),
None))
   }
 
-  def connect(s: SocketServer = server, listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
localAddr: InetAddress = null, port: Int = 0) = {
+  def connect(s: SocketServer = server,
+              listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+              localAddr: InetAddress = null,
+              port: Int = 0) = {
     val socket = new Socket("localhost", s.boundPort(listenerName), localAddr, port)
     sockets += socket
     socket
@@ -191,12 +194,13 @@ class SocketServerTest {
   def testControlPlaneRequest(): Unit = {
     val testProps = new Properties
     testProps ++= props
-    testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
     testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
     testProps.put("control.plane.listener.name", "CONTROLLER")
     val config = KafkaConfig.fromProps(testProps)
     withTestableServer(config, { testableServer =>
-      val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr
= InetAddress.getLocalHost, port = 5000)
+      val socket = connect(testableServer, config.controlPlaneListenerName.get,
+        localAddr = InetAddress.getLocalHost)
       sendAndReceiveControllerRequest(socket, testableServer)
     })
   }


Mime
View raw message