kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject git commit: kafka-1123; Broker IPv6 addresses parsed incorrectly; patched by Krzysztof Szafrański; reviewed by Jun Rao
Date Thu, 18 Sep 2014 22:54:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk c892c08df -> be5edd2f8


kafka-1123; Broker IPv6 addresses parsed incorrectly; patched by Krzysztof Szafrański; reviewed
by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be5edd2f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be5edd2f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be5edd2f

Branch: refs/heads/trunk
Commit: be5edd2f8d0c0355cb33feb2ac7482b7df7dccbc
Parents: c892c08
Author: Krzysztof Szafrański <k.p.szafranski@gmail.com>
Authored: Thu Sep 18 15:53:48 2014 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Sep 18 15:53:48 2014 -0700

----------------------------------------------------------------------
 .../apache/kafka/common/utils/ClientUtils.java  | 17 ++++---
 .../org/apache/kafka/common/utils/Utils.java    | 36 ++++++++++++++
 .../kafka/common/utils/ClientUtilsTest.java     | 42 ++++++++++++++++
 .../apache/kafka/common/utils/UtilsTest.java    | 51 ++++++++++++++++++++
 .../main/java/kafka/etl/impl/DataGenerator.java |  7 +--
 .../main/scala/kafka/admin/TopicCommand.scala   |  4 +-
 .../main/scala/kafka/api/TopicMetadata.scala    |  4 +-
 .../main/scala/kafka/client/ClientUtils.scala   | 14 ++----
 core/src/main/scala/kafka/cluster/Broker.scala  |  7 +--
 .../scala/kafka/consumer/SimpleConsumer.scala   |  3 +-
 .../scala/kafka/producer/SyncProducer.scala     |  8 +--
 core/src/main/scala/kafka/utils/Utils.scala     | 16 ++----
 .../test/scala/unit/kafka/utils/TestUtils.scala |  4 +-
 .../test/scala/unit/kafka/utils/UtilsTest.scala |  9 ++--
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala |  5 +-
 15 files changed, 176 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
index cb33e34..b987e7f 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
@@ -19,26 +19,31 @@ import java.util.List;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigException;
 
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
+
 public class ClientUtils {
+
     public static List<InetSocketAddress> parseAndValidateAddresses(List<String>
urls) {
         List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
         for (String url : urls) {
             if (url != null && url.length() > 0) {
-                String[] pieces = url.split(":");
-                if (pieces.length != 2)
+                String host = getHost(url);
+                Integer port = getPort(url);
+                if (host == null || port == null)
                     throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+ ": " + url);
                 try {
-                    InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
+                    InetSocketAddress address = new InetSocketAddress(host, port);
                     if (address.isUnresolved())
-                        throw new ConfigException("DNS resolution failed for metadata bootstrap
url: " + url);
+                        throw new ConfigException("DNS resolution failed for url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+ ": " + url);
                     addresses.add(address);
                 } catch (NumberFormatException e) {
-                    throw new ConfigException("Invalid port in metadata.broker.list: " +
url);
+                    throw new ConfigException("Invalid port in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+ ": " + url);
                 }
             }
         }
         if (addresses.size() < 1)
-            throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
+            throw new ConfigException("No bootstrap urls given in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
         return addresses;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 50af601..a0827f5 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -14,11 +14,15 @@ package org.apache.kafka.common.utils;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.kafka.common.KafkaException;
 
 public class Utils {
 
+    private static final Pattern HOST_PORT_PATTERN = Pattern.compile("\\[?(.+?)\\]?:(\\d+)");
+
     public static String NL = System.getProperty("line.separator");
 
     /**
@@ -217,4 +221,36 @@ public class Utils {
         return h;
     }
 
+    /**
+     * Extracts the hostname from a "host:port" address string.
+     * @param address address string to parse
+     * @return hostname or null if the given address is incorrect
+     */
+    public static String getHost(String address) {
+        Matcher matcher = HOST_PORT_PATTERN.matcher(address);
+        return matcher.matches() ? matcher.group(1) : null;
+    }
+
+    /**
+     * Extracts the port number from a "host:port" address string.
+     * @param address address string to parse
+     * @return port number or null if the given address is incorrect
+     */
+    public static Integer getPort(String address) {
+        Matcher matcher = HOST_PORT_PATTERN.matcher(address);
+        return matcher.matches() ? Integer.parseInt(matcher.group(2)) : null;
+    }
+
+    /**
+     * Formats hostname and port number as a "host:port" address string,
+     * surrounding IPv6 addresses with braces '[', ']'
+     * @param host hostname
+     * @param port port number
+     * @return address string
+     */
+    public static String formatAddress(String host, Integer port) {
+        return host.contains(":")
+                ? "[" + host + "]:" + port // IPv6
+                : host + ":" + port;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
new file mode 100644
index 0000000..6e37ea5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class ClientUtilsTest {
+
+    @Test
+    public void testParseAndValidateAddresses() {
+        check("127.0.0.1:8000");
+        check("mydomain.com:8080");
+        check("[::1]:8000");
+        check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000");
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testNoPort() {
+        check("127.0.0.1");
+    }
+
+    private void check(String... url) {
+        ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
new file mode 100644
index 0000000..a39fab5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.apache.kafka.common.utils.Utils.formatAddress;
+import static org.junit.Assert.*;
+
+public class UtilsTest {
+
+    @Test
+    public void testGetHost() {
+        assertEquals("127.0.0.1", getHost("127.0.0.1:8000"));
+        assertEquals("mydomain.com", getHost("mydomain.com:8080"));
+        assertEquals("::1", getHost("[::1]:1234"));
+        assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
+    }
+
+    @Test
+    public void testGetPort() {
+        assertEquals(8000, getPort("127.0.0.1:8000").intValue());
+        assertEquals(8080, getPort("mydomain.com:8080").intValue());
+        assertEquals(1234, getPort("[::1]:1234").intValue());
+        assertEquals(5678, getPort("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678").intValue());
+    }
+
+    @Test
+    public void testFormatAddress() {
+        assertEquals("127.0.0.1:8000", formatAddress("127.0.0.1", 8000));
+        assertEquals("mydomain.com:8080", formatAddress("mydomain.com", 8080));
+        assertEquals("[::1]:1234", formatAddress("::1", 1234));
+        assertEquals("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678", formatAddress("2001:db8:85a3:8d3:1319:8a2e:370:7348",
5678));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
index f3fb3fd..d27a511 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
@@ -27,7 +27,6 @@ import kafka.etl.KafkaETLKey;
 import kafka.etl.KafkaETLRequest;
 import kafka.etl.Props;
 import kafka.javaapi.producer.Producer;
-import kafka.message.Message;
 import kafka.producer.ProducerConfig;
 import kafka.producer.KeyedMessage;
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +35,8 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.JobConf;
 
+import static org.apache.kafka.common.utils.Utils.formatAddress;
+
 /**
  * Use this class to produce test events to Kafka server. Each event contains a
  * random timestamp in text format.
@@ -70,7 +71,7 @@ public class DataGenerator {
 		
 		System.out.println("server uri:" + _uri.toString());
         Properties producerProps = new Properties();
-        producerProps.put("metadata.broker.list", String.format("%s:%d", _uri.getHost(),
_uri.getPort()));
+        producerProps.put("metadata.broker.list", formatAddress(_uri.getHost(), _uri.getPort()));
         producerProps.put("send.buffer.bytes", String.valueOf(TCP_BUFFER_SIZE));
         producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
         producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
@@ -108,7 +109,7 @@ public class DataGenerator {
         if (fs.exists(outPath)) fs.delete(outPath);
         
         KafkaETLRequest request =
-            new KafkaETLRequest(_topic, "tcp://" + _uri.getHost() + ":" + _uri.getPort(),
0);
+            new KafkaETLRequest(_topic, "tcp://" + formatAddress(_uri.getHost(), _uri.getPort()),
0);
 
         System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString());
         byte[] bytes = request.toString().getBytes("UTF-8");

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index b3f2e82..3b2166a 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -19,7 +19,6 @@ package kafka.admin
 
 import joptsimple._
 import java.util.Properties
-import kafka.admin.AdminOperationException
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -29,6 +28,7 @@ import kafka.cluster.Broker
 import kafka.log.LogConfig
 import kafka.consumer.Whitelist
 import kafka.server.OffsetManager
+import org.apache.kafka.common.utils.Utils.formatAddress
 
 
 object TopicCommand {
@@ -193,7 +193,7 @@ object TopicCommand {
     }
   }
   
-  def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port +
")"
+  def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host, broker.port)
+ ")"
   
   def parseTopicConfigsToBeAdded(opts: TopicCommandOptions): Properties = {
     val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*"""))

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 51380a6..0190076 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -21,8 +21,8 @@ import kafka.cluster.Broker
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import kafka.utils.Logging
-import collection.mutable.ArrayBuffer
 import kafka.common._
+import org.apache.kafka.common.utils.Utils._
 
 object TopicMetadata {
   
@@ -149,7 +149,7 @@ case class PartitionMetadata(partitionId: Int,
     partitionMetadataString.toString()
   }
 
-  private def formatBroker(broker: Broker) = broker.id + " (" + broker.host + ":" + broker.port
+ ")"
+  private def formatBroker(broker: Broker) = broker.id + " (" + formatAddress(broker.host,
broker.port) + ")"
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index ce7ede3..ebba87f 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -28,6 +28,7 @@ import util.Random
  import kafka.utils.ZkUtils._
  import org.I0Itec.zkclient.ZkClient
  import java.io.IOException
+import org.apache.kafka.common.utils.Utils.{getHost, getPort}
 
  /**
  * Helper functions common to clients (producer, consumer, or admin)
@@ -85,7 +86,7 @@ object ClientUtils extends Logging{
   def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], clientId: String, timeoutMs:
Int,
                          correlationId: Int = 0): TopicMetadataResponse = {
     val props = new Properties()
-    props.put("metadata.broker.list", brokers.map(_.getConnectionString()).mkString(","))
+    props.put("metadata.broker.list", brokers.map(_.connectionString).mkString(","))
     props.put("client.id", clientId)
     props.put("request.timeout.ms", timeoutMs.toString)
     val producerConfig = new ProducerConfig(props)
@@ -98,14 +99,9 @@ object ClientUtils extends Logging{
   def parseBrokerList(brokerListStr: String): Seq[Broker] = {
     val brokersStr = Utils.parseCsvList(brokerListStr)
 
-    brokersStr.zipWithIndex.map(b =>{
-      val brokerStr = b._1
-      val brokerId = b._2
-      val brokerInfos = brokerStr.split(":")
-      val hostName = brokerInfos(0)
-      val port = brokerInfos(1).toInt
-      new Broker(brokerId, hostName, port)
-    })
+    brokersStr.zipWithIndex.map { case (address, brokerId) =>
+      new Broker(brokerId, getHost(address), getPort(address))
+    }
   }
 
    /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index ccc3fc1..0060add 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -22,6 +22,7 @@ import kafka.utils.Json
 import kafka.api.ApiUtils._
 import java.nio.ByteBuffer
 import kafka.common.{KafkaException, BrokerNotAvailableException}
+import org.apache.kafka.common.utils.Utils._
 
 /**
  * A Kafka broker
@@ -54,11 +55,11 @@ object Broker {
   }
 }
 
-case class Broker(val id: Int, val host: String, val port: Int) {
+case class Broker(id: Int, host: String, port: Int) {
   
-  override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" +
port)
+  override def toString: String = "id:" + id + ",host:" + host + ",port:" + port
 
-  def getConnectionString(): String = host + ":" + port
+  def connectionString: String = formatAddress(host, port)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(id)

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 8db9203..d349a30 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -21,6 +21,7 @@ import kafka.api._
 import kafka.network._
 import kafka.utils._
 import kafka.common.{ErrorMapping, TopicAndPartition}
+import org.apache.kafka.common.utils.Utils._
 
 /**
  * A consumer of kafka messages
@@ -46,7 +47,7 @@ class SimpleConsumer(val host: String,
   }
 
   private def disconnect() = {
-    debug("Disconnecting from " + host + ":" + port)
+    debug("Disconnecting from " + formatAddress(host, port))
     blockingChannel.disconnect()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 489f007..42c9503 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -22,6 +22,8 @@ import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
 import kafka.utils._
 import java.util.Random
 
+import org.apache.kafka.common.utils.Utils._
+
 object SyncProducer {
   val RequestKey: Short = 0
   val randomGenerator = new Random
@@ -126,7 +128,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
    */
   private def disconnect() {
     try {
-      info("Disconnecting from " + config.host + ":" + config.port)
+      info("Disconnecting from " + formatAddress(config.host, config.port))
       blockingChannel.disconnect()
     } catch {
       case e: Exception => error("Error on disconnect: ", e)
@@ -137,11 +139,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
     if (!blockingChannel.isConnected && !shutdown) {
       try {
         blockingChannel.connect()
-        info("Connected to " + config.host + ":" + config.port + " for producing")
+        info("Connected to " + formatAddress(config.host, config.port) + " for producing")
       } catch {
         case e: Exception => {
           disconnect()
-          error("Producer connection to " +  config.host + ":" + config.port + " unsuccessful",
e)
+          error("Producer connection to " + formatAddress(config.host, config.port) + " unsuccessful",
e)
           throw e
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index da52b42..29d5a17 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -395,21 +395,13 @@ object Utils extends Logging {
   }
 
   /**
-   * Parse a host and port out of a string
-   */
-  def parseHostPort(hostport: String) : (String, Int) = {
-    val splits = hostport.split(":")
-    (splits(0), splits(1).toInt)
-  }
-
-  /**
    * Get the stack trace from an exception as a string
    */
   def stackTrace(e: Throwable): String = {
-    val sw = new StringWriter;
-    val pw = new PrintWriter(sw);
-    e.printStackTrace(pw);
-    sw.toString();
+    val sw = new StringWriter
+    val pw = new PrintWriter(sw)
+    e.printStackTrace(pw)
+    sw.toString()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c4e13c5..2dbdd3c 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -24,6 +24,8 @@ import java.nio.channels._
 import java.util.Random
 import java.util.Properties
 
+import org.apache.kafka.common.utils.Utils._
+
 import collection.mutable.Map
 import collection.mutable.ListBuffer
 
@@ -142,7 +144,7 @@ object TestUtils extends Logging {
   }
 
   def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
-    configs.map(c => c.hostName + ":" + c.port).mkString(",")
+    configs.map(c => formatAddress(c.hostName, c.port)).mkString(",")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index a502349..0d0f0e2 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -20,7 +20,6 @@ package kafka.utils
 import java.util.Arrays
 import java.util.concurrent.locks.ReentrantLock
 import java.nio.ByteBuffer
-import java.io._
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
@@ -73,7 +72,7 @@ class UtilsTest extends JUnitSuite {
     assertEquals(1, Utils.abs(1))
     assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
   }
-  
+
   @Test
   def testReplaceSuffix() {
     assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
@@ -81,7 +80,7 @@ class UtilsTest extends JUnitSuite {
     assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", ""))
     assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt"))
   }
-  
+
   @Test
   def testReadInt() {
     val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2,
Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue)
@@ -90,7 +89,6 @@ class UtilsTest extends JUnitSuite {
       buffer.putInt(i*4, values(i))
       assertEquals("Written value should match read value.", values(i), Utils.readInt(buffer.array,
i*4))
     }
-
   }
 
   @Test
@@ -105,7 +103,7 @@ class UtilsTest extends JUnitSuite {
     assertTrue(emptyStringList.equals(emptyListFromNullString))
     assertTrue(emptyStringList.equals(emptyList))
   }
-  
+
   @Test
   def testInLock() {
     val lock = new ReentrantLock()
@@ -115,6 +113,5 @@ class UtilsTest extends JUnitSuite {
     }
     assertEquals(2, result)
     assertFalse("Should be unlocked", lock.isLocked)
-    
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/be5edd2f/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 3021a8c..3151561 100644
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -18,20 +18,19 @@
 package kafka.zk
 
 import org.apache.zookeeper.server.ZooKeeperServer
-import org.apache.zookeeper.server.NIOServerCnxn
 import org.apache.zookeeper.server.NIOServerCnxnFactory
 import kafka.utils.TestUtils
 import java.net.InetSocketAddress
 import kafka.utils.Utils
+import org.apache.kafka.common.utils.Utils.getPort
 
 class EmbeddedZookeeper(val connectString: String) {
   val snapshotDir = TestUtils.tempDir()
   val logDir = TestUtils.tempDir()
   val tickTime = 500
   val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
-  val port = connectString.split(":")(1).toInt
   val factory = new NIOServerCnxnFactory()
-  factory.configure(new InetSocketAddress("127.0.0.1", port),0)
+  factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0)
   factory.startup(zookeeper)
 
   def shutdown() {


Mime
View raw message