kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/3] kafka git commit: kafka-1926; Replace kafka.utils.Utils with o.a.k.common.utils.Utils; patched by Tong Li; reviewed by Jun Rao
Date Mon, 06 Apr 2015 04:46:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 53f31432a -> 9c23d9355


http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
new file mode 100755
index 0000000..8445894
--- /dev/null
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple.OptionParser
+import java.util.Properties
+import java.util.Random
+import java.io._
+import kafka.consumer._
+import kafka.serializer._
+import kafka.utils._
+import kafka.log.FileMessageSet
+import kafka.log.Log
+import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
+
+/**
+ * This is a torture test that runs against an existing broker. Here is how it works:
+ * 
+ * It produces a series of specially formatted messages to one or more partitions. Each message it produces
+ * it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space.
+ * 
+ * The broker will clean its log as the test runs.
+ * 
+ * When the specified number of messages have been produced we create a consumer and consume all the messages in the topic
+ * and write that out to another text file.
+ * 
+ * Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key. 
+ * Then we compare the final message in both logs for each key. If this final message is not the same for all keys we
+ * print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0.
+ */
+object TestLogCleaning {
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.")
+                               .withRequiredArg
+                               .describedAs("count")
+                               .ofType(classOf[java.lang.Long])
+                               .defaultsTo(Long.MaxValue)
+    val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.")
+                           .withRequiredArg
+                           .describedAs("count")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(5)
+    val brokerOpt = parser.accepts("broker", "Url to connect to.")
+                          .withRequiredArg
+                          .describedAs("url")
+                          .ofType(classOf[String])
+    val topicsOpt = parser.accepts("topics", "The number of topics to test.")
+                          .withRequiredArg
+                          .describedAs("count")
+                          .ofType(classOf[java.lang.Integer])
+                          .defaultsTo(1)
+    val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.")
+    		                      .withRequiredArg
+    		                      .describedAs("percent")
+    		                      .ofType(classOf[java.lang.Integer])
+    		                      .defaultsTo(0)
+    val zkConnectOpt = parser.accepts("zk", "Zk url.")
+                             .withRequiredArg
+                             .describedAs("url")
+                             .ofType(classOf[String])
+    val sleepSecsOpt = parser.accepts("sleep", "Time to sleep between production and consumption.")
+                             .withRequiredArg
+                             .describedAs("ms")
+                             .ofType(classOf[java.lang.Integer])
+                             .defaultsTo(0)
+    val dumpOpt = parser.accepts("dump", "Dump the message contents of a topic partition that contains test data from this test to standard out.")
+                        .withRequiredArg
+                        .describedAs("directory")
+                        .ofType(classOf[String])
+    
+    val options = parser.parse(args:_*)
+    
+    if(args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "An integration test for log cleaning.")
+    
+    if(options.has(dumpOpt)) {
+      dumpLog(new File(options.valueOf(dumpOpt)))
+      System.exit(0)
+    }
+    
+    CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt)
+    
+    // parse options
+    val messages = options.valueOf(numMessagesOpt).longValue
+    val percentDeletes = options.valueOf(percentDeletesOpt).intValue
+    val dups = options.valueOf(numDupsOpt).intValue
+    val brokerUrl = options.valueOf(brokerOpt)
+    val topicCount = options.valueOf(topicsOpt).intValue
+    val zkUrl = options.valueOf(zkConnectOpt)
+    val sleepSecs = options.valueOf(sleepSecsOpt).intValue
+    
+    val testId = new Random().nextInt(Int.MaxValue)
+    val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
+    
+    println("Producing %d messages...".format(messages))
+    val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes)
+    println("Sleeping for %d seconds...".format(sleepSecs))
+    Thread.sleep(sleepSecs * 1000)
+    println("Consuming messages...")
+    val consumedDataFile = consumeMessages(zkUrl, topics)
+    
+    val producedLines = lineCount(producedDataFile)
+    val consumedLines = lineCount(consumedDataFile)
+    val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
+    println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
+    
+    println("De-duplicating and validating output files...")
+    validateOutput(producedDataFile, consumedDataFile)
+    producedDataFile.delete()
+    consumedDataFile.delete()
+  }
+  
+  def dumpLog(dir: File) {
+    require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
+    for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
+      val ms = new FileMessageSet(new File(dir, file))
+      for(entry <- ms) {
+        val key = TestUtils.readString(entry.message.key)
+        val content = 
+          if(entry.message.isNull)
+            null
+          else
+            TestUtils.readString(entry.message.payload)
+        println("offset = %s, key = %s, content = %s".format(entry.offset, key, content))
+      }
+    }
+  }
+  
+  def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size
+  
+  def validateOutput(producedDataFile: File, consumedDataFile: File) {
+    val producedReader = externalSort(producedDataFile)
+    val consumedReader = externalSort(consumedDataFile)
+    val produced = valuesIterator(producedReader)
+    val consumed = valuesIterator(consumedReader)
+    val producedDedupedFile = new File(producedDataFile.getAbsolutePath + ".deduped")
+    val producedDeduped = new BufferedWriter(new FileWriter(producedDedupedFile), 1024*1024)
+    val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + ".deduped")
+    val consumedDeduped = new BufferedWriter(new FileWriter(consumedDedupedFile), 1024*1024)
+    var total = 0
+    var mismatched = 0
+    while(produced.hasNext && consumed.hasNext) {
+      val p = produced.next()
+      producedDeduped.write(p.toString)
+      producedDeduped.newLine()
+      val c = consumed.next()
+      consumedDeduped.write(c.toString)
+      consumedDeduped.newLine()
+      if(p != c)
+        mismatched += 1
+      total += 1
+    }
+    producedDeduped.close()
+    consumedDeduped.close()
+    require(!produced.hasNext, "Additional values produced not found in consumer log.")
+    require(!consumed.hasNext, "Additional values consumed not found in producer log.")
+    println("Validated " + total + " values, " + mismatched + " mismatches.")
+    require(mismatched == 0, "Non-zero number of row mismatches.")
+    // if all the checks worked out we can delete the deduped files
+    producedDedupedFile.delete()
+    consumedDedupedFile.delete()
+  }
+  
+  def valuesIterator(reader: BufferedReader) = {
+    new IteratorTemplate[TestRecord] {
+      def makeNext(): TestRecord = {
+        var next = readNext(reader)
+        while(next != null && next.delete)
+          next = readNext(reader)
+        if(next == null)
+          allDone()
+        else
+          next
+      }
+    }
+  }
+  
+  def readNext(reader: BufferedReader): TestRecord = {
+    var line = reader.readLine()
+    if(line == null)
+      return null
+    var curr = new TestRecord(line)
+    while(true) {
+      line = peekLine(reader)
+      if(line == null)
+        return curr
+      val next = new TestRecord(line)
+      if(next == null || next.topicAndKey != curr.topicAndKey)
+        return curr
+      curr = next
+      reader.readLine()
+    }
+    null
+  }
+  
+  def peekLine(reader: BufferedReader) = {
+    reader.mark(4096)
+    val line = reader.readLine
+    reader.reset()
+    line
+  }
+  
+  def externalSort(file: File): BufferedReader = {
+    val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath)
+    val process = builder.start()
+    new Thread() {
+      override def run() {
+        val exitCode = process.waitFor()
+        if(exitCode != 0) {
+          System.err.println("Process exited abnormally.")
+          while(process.getErrorStream.available > 0) {
+            System.err.write(process.getErrorStream().read())
+          }
+        }
+      }
+    }.start()
+    new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024)
+  }
+  
+  def produceMessages(brokerUrl: String, 
+                      topics: Array[String], 
+                      messages: Long, 
+                      dups: Int,
+                      percentDeletes: Int): File = {
+    val producerProps = new Properties
+    producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
+    producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
+    val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+    val rand = new Random(1)
+    val keyCount = (messages / dups).toInt
+    val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
+    println("Logging produce requests to " + producedFile.getAbsolutePath)
+    val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024)
+    for(i <- 0L until (messages * topics.length)) {
+      val topic = topics((i % topics.length).toInt)
+      val key = rand.nextInt(keyCount)
+      val delete = i % 100 < percentDeletes
+      val msg = 
+        if(delete)
+          new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null)
+        else
+          new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), i.toString.getBytes())
+      producer.send(msg)
+      producedWriter.write(TestRecord(topic, key, i, delete).toString)
+      producedWriter.newLine()
+    }
+    producedWriter.close()
+    producer.close()
+    producedFile
+  }
+  
+  def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = {
+    val consumerProps = new Properties
+    consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
+    consumerProps.setProperty("zookeeper.connect", zkUrl)
+    consumerProps.setProperty("consumer.timeout.ms", (20*1000).toString)
+    consumerProps.setProperty("auto.offset.reset", "smallest")
+    new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
+  }
+  
+  def consumeMessages(zkUrl: String, topics: Array[String]): File = {
+    val connector = makeConsumer(zkUrl, topics)
+    val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder)
+    val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt")
+    println("Logging consumed messages to " + consumedFile.getAbsolutePath)
+    val consumedWriter = new BufferedWriter(new FileWriter(consumedFile))
+    for(topic <- topics) {
+      val stream = streams(topic).head
+      try {
+        for(item <- stream) {
+          val delete = item.message == null
+          val value = if(delete) -1L else item.message.toLong
+          consumedWriter.write(TestRecord(topic, item.key.toInt, value, delete).toString)
+          consumedWriter.newLine()
+        }
+      } catch {
+        case e: ConsumerTimeoutException => 
+      }
+    }
+    consumedWriter.close()
+    connector.shutdown()
+    consumedFile
+  }
+  
+}
+
+case class TestRecord(val topic: String, val key: Int, val value: Long, val delete: Boolean) {
+  def this(pieces: Array[String]) = this(pieces(0), pieces(1).toInt, pieces(2).toLong, pieces(3) == "d")
+  def this(line: String) = this(line.split("\t"))
+  override def toString() = topic + "\t" +  key + "\t" + value + "\t" + (if(delete) "d" else "u")
+  def topicAndKey = topic + key
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/other/kafka/DeleteZKPath.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/DeleteZKPath.scala b/core/src/test/scala/other/kafka/DeleteZKPath.scala
old mode 100644
new mode 100755
index 2554503..33c3ef8
--- a/core/src/test/scala/other/kafka/DeleteZKPath.scala
+++ b/core/src/test/scala/other/kafka/DeleteZKPath.scala
@@ -18,8 +18,9 @@
 package kafka
 
 import consumer.ConsumerConfig
-import utils.{ZKStringSerializer, ZkUtils, Utils}
+import utils.{ZKStringSerializer, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
+import org.apache.kafka.common.utils.Utils
 
 object DeleteZKPath {
   def main(args: Array[String]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
old mode 100644
new mode 100755
index e19b8b2..c0e248d
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -50,7 +50,7 @@ object StressTestLog {
         running.set(false)
         writer.join()
         reader.join()
-        Utils.rm(dir)
+        CoreUtils.rm(dir)
       }
     })
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/other/kafka/TestCrcPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestCrcPerformance.scala b/core/src/test/scala/other/kafka/TestCrcPerformance.scala
old mode 100644
new mode 100755
index 42e3c62..0c1e1ad
--- a/core/src/test/scala/other/kafka/TestCrcPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestCrcPerformance.scala
@@ -18,7 +18,8 @@ package kafka.log
 
 import java.util.Random
 import kafka.message._
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.utils.Utils
 
 object TestCrcPerformance {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
old mode 100644
new mode 100755
index 7211c25..3034c4f
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -193,7 +193,7 @@ object TestLinearWriteSpeed {
   }
   
   class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable {
-    Utils.rm(dir)
+    CoreUtils.rm(dir)
     val log = new Log(dir, config, 0L, scheduler, SystemTime)
     def write(): Int = {
       log.append(messages, true)
@@ -201,7 +201,7 @@ object TestLinearWriteSpeed {
     }
     def close() {
       log.close()
-      Utils.rm(log.dir)
+      CoreUtils.rm(log.dir)
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
old mode 100644
new mode 100755
index 76b8b24..ab5d16c
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -22,7 +22,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{ZkUtils, Utils, TestUtils}
+import kafka.utils.{ZkUtils, CoreUtils, TestUtils}
 import kafka.cluster.Broker
 import kafka.client.ClientUtils
 import kafka.server.{KafkaConfig, KafkaServer}
@@ -56,7 +56,7 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   override def tearDown() {
     servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDirs))
+    servers.map(server => CoreUtils.rm(server.config.logDirs))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
old mode 100644
new mode 100755
index 0305f70..cfe38df
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -397,7 +397,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       checkConfig(2*maxMessageSize, 2 * retentionMs)
     } finally {
       server.shutdown()
-      server.config.logDirs.map(Utils.rm(_))
+      server.config.logDirs.map(CoreUtils.rm(_))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
old mode 100644
new mode 100755
index bb25467..db5302f
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -81,7 +81,7 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
     assertFalse(iter.hasNext)
     assertEquals(0, queue.size) // Shutdown command has been consumed.
     assertEquals(5, receivedMessages.size)
-    val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => Utils.readString(m.message.payload))
+    val unconsumed = messageSet.filter(_.offset >= consumedOffset).map(m => TestUtils.readString(m.message.payload))
     assertEquals(unconsumed, receivedMessages)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
old mode 100644
new mode 100755
index f95fb62..447e421
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -21,7 +21,7 @@ import java.util.Arrays
 
 import scala.collection.mutable.Buffer
 import kafka.server._
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{CoreUtils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common.KafkaException
@@ -64,7 +64,7 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
 
   override def tearDown() {
     servers.map(server => server.shutdown())
-    servers.map(server => server.config.logDirs.map(Utils.rm(_)))
+    servers.map(server => server.config.logDirs.map(CoreUtils.rm(_)))
     super.tearDown
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
old mode 100644
new mode 100755
index f601d31..6a758a7
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -27,7 +27,7 @@ import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
-import kafka.utils.{StaticPartitioner, TestUtils, Utils}
+import kafka.utils.{StaticPartitioner, TestUtils, CoreUtils}
 import kafka.serializer.StringEncoder
 import java.util.Properties
 
@@ -82,7 +82,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     assertTrue(messageSet.iterator.hasNext)
 
     val fetchedMessageAndOffset = messageSet.head
-    assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
+    assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
   }
 
   def testDefaultEncoderProducerAndFetchWithCompression() {
@@ -104,7 +104,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     assertTrue(messageSet.iterator.hasNext)
 
     val fetchedMessageAndOffset = messageSet.head
-    assertEquals("test-message", Utils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
+    assertEquals("test-message", TestUtils.readString(fetchedMessageAndOffset.message.payload, "UTF-8"))
   }
 
   private def produceAndMultiFetch(producer: Producer[String, String]) {
@@ -128,7 +128,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
       val response = consumer.fetch(request)
       for((topic, partition) <- topics) {
         val fetched = response.messageSet(topic, partition)
-        assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
+        assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload)))
       }
     }
 
@@ -193,7 +193,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     val response = consumer.fetch(request)
     for((topic, partition) <- topics) {
       val fetched = response.messageSet(topic, partition)
-      assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
+      assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload)))
     }
   }
 
@@ -258,7 +258,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     val response = consumer.fetch(request)
     for( (topic, partition) <- topics) {
       val fetched = response.messageSet(topic, partition)
-      assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
+      assertEquals(messages(topic), fetched.map(messageAndOffset => TestUtils.readString(messageAndOffset.message.payload)))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
old mode 100644
new mode 100755
index 130b205..1113619
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -21,7 +21,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.server.{KafkaConfig, KafkaServer}
 
 class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -41,7 +41,7 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   override def tearDown() {
     servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDirs))
+    servers.map(server => CoreUtils.rm(server.config.logDirs))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
old mode 100644
new mode 100755
index 7c87b81..a130089
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -31,7 +31,7 @@ import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException}
 import kafka.producer.{KeyedMessage, Producer}
 import kafka.serializer.StringEncoder
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.Utils
+import kafka.utils.CoreUtils
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 
@@ -79,7 +79,7 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   override def tearDown() {
     servers.map(server => shutdownServer(server))
-    servers.map(server => Utils.rm(server.config.logDirs))
+    servers.map(server => CoreUtils.rm(server.config.logDirs))
 
     // restore log levels
     kafkaApisLogger.setLevel(Level.ERROR)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
old mode 100644
new mode 100755
index fa4a8ad..375555f
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -45,7 +45,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression: Strin
 
   @After
   def tearDown() {
-    Utils.rm(logDir)
+    CoreUtils.rm(logDir)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
old mode 100644
new mode 100755
index c20e423..9792ed6
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -27,6 +27,7 @@ import kafka.common._
 import kafka.utils._
 import kafka.message._
 import java.util.concurrent.atomic.AtomicLong
+import org.apache.kafka.common.utils.Utils
 
 /**
  * Unit tests for the log cleaning logic
@@ -40,7 +41,7 @@ class CleanerTest extends JUnitSuite {
   
   @After
   def teardown() {
-    Utils.rm(dir)
+    CoreUtils.rm(dir)
   }
   
   /**
@@ -123,7 +124,7 @@ class CleanerTest extends JUnitSuite {
   
   /* extract all the keys from a log */
   def keysInLog(log: Log): Iterable[Int] =
-    log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => Utils.readString(m.message.key).toInt))
+    log.logSegments.flatMap(s => s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => TestUtils.readString(m.message.key).toInt))
 
   def unkeyedMessageCountInLog(log: Log) =
     log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
old mode 100644
new mode 100755
index 07acd46..3b5aa9d
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -82,8 +82,8 @@ class LogCleanerIntegrationTest extends JUnitSuite {
   
   def readFromLog(log: Log): Iterable[(Int, Int)] = {
     for(segment <- log.logSegments; message <- segment.log) yield {
-      val key = Utils.readString(message.message.key).toInt
-      val value = Utils.readString(message.message.payload).toInt
+      val key = TestUtils.readString(message.message.key).toInt
+      val value = TestUtils.readString(message.message.payload).toInt
       key -> value
     }
   }
@@ -99,7 +99,7 @@ class LogCleanerIntegrationTest extends JUnitSuite {
     
   @After
   def teardown() {
-    Utils.rm(logDir)
+    CoreUtils.rm(logDir)
   }
   
   /* create a cleaner instance and logs with the given parameters */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
old mode 100644
new mode 100755
index 90cd530..0a26f5f
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -47,8 +47,8 @@ class LogManagerTest extends JUnit3Suite {
   override def tearDown() {
     if(logManager != null)
       logManager.shutdown()
-    Utils.rm(logDir)
-    logManager.logDirs.map(Utils.rm(_))
+    CoreUtils.rm(logDir)
+    logManager.logDirs.map(CoreUtils.rm(_))
     super.tearDown()
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
old mode 100644
new mode 100755
index bd9a409..069aa02
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -43,7 +43,7 @@ class LogTest extends JUnitSuite {
 
   @After
   def tearDown() {
-    Utils.rm(logDir)
+    CoreUtils.rm(logDir)
   }
   
   def createEmptyLogs(dir: File, offsets: Int*) {
@@ -714,7 +714,7 @@ class LogTest extends JUnitSuite {
       log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
       assertEquals(numMessages, log.logEndOffset)
       assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList))
-      Utils.rm(logDir)
+      CoreUtils.rm(logDir)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
old mode 100644
new mode 100755
index 18361c1..41366a1
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -19,7 +19,7 @@ package kafka.log4j
 
 import kafka.consumer.SimpleConsumer
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{TestUtils, Utils, Logging}
+import kafka.utils.{TestUtils, CoreUtils, Logging}
 import kafka.api.FetchRequestBuilder
 import kafka.producer.async.MissingConfigException
 import kafka.serializer.Encoder
@@ -63,7 +63,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with
   override def tearDown() {
     simpleConsumerZk.close
     server.shutdown
-    Utils.rm(logDirZk)
+    CoreUtils.rm(logDirZk)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
old mode 100644
new mode 100755
index 7b74a0d..11c0f81
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -24,7 +24,8 @@ import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{Before, Test}
 import kafka.utils.TestUtils
-import kafka.utils.Utils
+import kafka.utils.CoreUtils
+import org.apache.kafka.common.utils.Utils
 
 case class MessageTestVal(val key: Array[Byte], 
                           val payload: Array[Byte], 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
old mode 100644
new mode 100755
index 3b82fb3..2169a5c
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -231,11 +231,11 @@ class AsyncProducerTest extends JUnit3Suite {
                                                          topicPartitionInfos = topicPartitionInfos)
 
     val serializedData = handler.serialize(produceData)
-    val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
+    val deserializedData = serializedData.map(d => new KeyedMessage[String,String](d.topic, TestUtils.readString(d.message.payload)))
 
     // Test that the serialize handles seq from a Stream
     val streamedSerializedData = handler.serialize(Stream(produceData:_*))
-    val deserializedStreamData = streamedSerializedData.map(d => new KeyedMessage[String,String](d.topic, Utils.readString(d.message.payload)))
+    val deserializedStreamData = streamedSerializedData.map(d => new KeyedMessage[String,String](d.topic, TestUtils.readString(d.message.payload)))
 
     TestUtils.checkEquals(produceData.iterator, deserializedData.iterator)
     TestUtils.checkEquals(produceData.iterator, deserializedStreamData.iterator)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
old mode 100644
new mode 100755
index a7ca142..4d2536b
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -92,8 +92,8 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{
 
     server1.shutdown
     server2.shutdown
-    Utils.rm(server1.config.logDirs)
-    Utils.rm(server2.config.logDirs)
+    CoreUtils.rm(server1.config.logDirs)
+    CoreUtils.rm(server2.config.logDirs)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
old mode 100644
new mode 100755
index 48d3143..e899b02
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import junit.framework.Assert._
-import kafka.utils.{TestUtils, Utils, ZkUtils}
+import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.scalatest.junit.JUnit3Suite
@@ -41,7 +41,7 @@ class AdvertiseBrokerTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   override def tearDown() {
     server.shutdown()
-    Utils.rm(server.config.logDirs)
+    CoreUtils.rm(server.config.logDirs)
     super.tearDown()
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
old mode 100644
new mode 100755
index 142e28e..60cd824
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -25,7 +25,7 @@ import org.junit._
 import org.junit.Assert._
 import kafka.common._
 import kafka.cluster.Replica
-import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils}
+import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
 import java.util.concurrent.atomic.AtomicBoolean
 
 class HighwatermarkPersistenceTest extends JUnit3Suite {
@@ -41,7 +41,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
   @After
   def teardown() {
     for(manager <- logManagers; dir <- manager.logDirs)
-      Utils.rm(dir)
+      CoreUtils.rm(dir)
   }
 
   def testHighWatermarkPersistenceSinglePartition() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
old mode 100644
new mode 100755
index ca46ba9..b394719
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 
 import junit.framework.Assert._
 import kafka.api.{ApiVersion, KAFKA_082}
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{TestUtils, CoreUtils}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Test
@@ -159,14 +159,14 @@ class KafkaConfigTest extends JUnit3Suite {
     props.put("port", "1111")
 
     val conf = KafkaConfig.fromProps(props)
-    assertEquals(Utils.listenerListToEndPoints("PLAINTEXT://myhost:1111"), conf.listeners)
+    assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://myhost:1111"), conf.listeners)
 
     // configuration with null host
     props.remove("host.name")
 
     val conf2 = KafkaConfig.fromProps(props)
-    assertEquals(Utils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.listeners)
-    assertEquals(Utils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.advertisedListeners)
+    assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.listeners)
+    assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.advertisedListeners)
     assertEquals(null, conf2.listeners(SecurityProtocol.PLAINTEXT).host)
 
     // configuration with advertised host and port, and no advertised listeners
@@ -174,7 +174,7 @@ class KafkaConfigTest extends JUnit3Suite {
     props.put("advertised.port", "2222")
 
     val conf3 = KafkaConfig.fromProps(props)
-    assertEquals(conf3.advertisedListeners, Utils.listenerListToEndPoints("PLAINTEXT://otherhost:2222"))
+    assertEquals(conf3.advertisedListeners, CoreUtils.listenerListToEndPoints("PLAINTEXT://otherhost:2222"))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
old mode 100644
new mode 100755
index d9bdcef..26572f7
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -19,11 +19,11 @@ package kafka.server
 
 import junit.framework.Assert._
 import kafka.api._
+import kafka.utils.{TestUtils, ZkUtils, CoreUtils}
 import kafka.cluster.Broker
 import kafka.common.ErrorMapping
 import kafka.controller.{ControllerChannelManager, ControllerContext, LeaderIsrAndControllerEpoch}
 import kafka.utils.TestUtils._
-import kafka.utils.{TestUtils, Utils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.scalatest.junit.JUnit3Suite
@@ -50,7 +50,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   override def tearDown() {
     servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDirs))
+    servers.map(server => CoreUtils.rm(server.config.logDirs))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
old mode 100644
new mode 100755
index 496bf0d..e57c1de
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -57,7 +57,7 @@ class LogOffsetTest extends JUnit3Suite with ZooKeeperTestHarness {
   override def tearDown() {
     simpleConsumer.close
     server.shutdown
-    Utils.rm(logDir)
+    CoreUtils.rm(logDir)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
old mode 100644
new mode 100755
index 92e49df..7688f26
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 import java.util.Properties
 
 import kafka.utils.TestUtils._
-import kafka.utils.{IntEncoder, Utils, TestUtils}
+import kafka.utils.{IntEncoder, CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common._
 import kafka.producer.{KeyedMessage, Producer}
@@ -90,7 +90,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     producer.close()
     for(server <- servers) {
       server.shutdown()
-      Utils.rm(server.config.logDirs(0))
+      CoreUtils.rm(server.config.logDirs(0))
     }
     super.tearDown()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
old mode 100644
new mode 100755
index a6bb690..652208a
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -71,7 +71,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
   override def tearDown() {
     simpleConsumer.close
     server.shutdown
-    Utils.rm(logDir)
+    CoreUtils.rm(logDir)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
old mode 100644
new mode 100755
index 2bfaeb3..12269cd
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 import java.util.Properties
 
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{TestUtils, CoreUtils}
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import junit.framework.Assert._
@@ -51,7 +51,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
     server1.startup()
     assertEquals(server1.config.brokerId, 1001)
     server1.shutdown()
-    Utils.rm(server1.config.logDirs)
+    CoreUtils.rm(server1.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus
   }
 
@@ -75,9 +75,9 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001))
     assertTrue(verifyBrokerMetadata(server2.config.logDirs,0))
     assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002))
-    Utils.rm(server1.config.logDirs)
-    Utils.rm(server2.config.logDirs)
-    Utils.rm(server3.config.logDirs)
+    CoreUtils.rm(server1.config.logDirs)
+    CoreUtils.rm(server2.config.logDirs)
+    CoreUtils.rm(server3.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus
   }
 
@@ -100,7 +100,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
     server1.startup()
     server1.shutdown()
     assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
-    Utils.rm(server1.config.logDirs)
+    CoreUtils.rm(server1.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus
   }
 
@@ -117,7 +117,7 @@ class ServerGenerateBrokerIdTest extends JUnit3Suite with ZooKeeperTestHarness {
       case e: kafka.common.InconsistentBrokerIdException => //success
     }
     server1.shutdown()
-    Utils.rm(server1.config.logDirs)
+    CoreUtils.rm(server1.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
old mode 100644
new mode 100755
index a20321f..95534e3
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 import kafka.zk.ZooKeeperTestHarness
 import kafka.consumer.SimpleConsumer
 import kafka.producer._
-import kafka.utils.{IntEncoder, TestUtils, Utils}
+import kafka.utils.{IntEncoder, TestUtils, CoreUtils}
 import kafka.utils.TestUtils._
 import kafka.api.FetchRequestBuilder
 import kafka.message.ByteBufferMessageSet
@@ -84,7 +84,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
       fetchedMessage = fetched.messageSet(topic, 0)
     }
-    assertEquals(sent1, fetchedMessage.map(m => Utils.readString(m.message.payload)))
+    assertEquals(sent1, fetchedMessage.map(m => TestUtils.readString(m.message.payload)))
     val newOffset = fetchedMessage.last.nextOffset
 
     // send some more messages
@@ -95,12 +95,12 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
       fetchedMessage = fetched.messageSet(topic, 0)
     }
-    assertEquals(sent2, fetchedMessage.map(m => Utils.readString(m.message.payload)))
+    assertEquals(sent2, fetchedMessage.map(m => TestUtils.readString(m.message.payload)))
 
     consumer.close()
     producer.close()
     server.shutdown()
-    Utils.rm(server.config.logDirs)
+    CoreUtils.rm(server.config.logDirs)
     verifyNonDaemonThreadsStatus
   }
 
@@ -113,7 +113,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     server.startup()
     server.shutdown()
     server.awaitShutdown()
-    Utils.rm(server.config.logDirs)
+    CoreUtils.rm(server.config.logDirs)
     verifyNonDaemonThreadsStatus
   }
 
@@ -141,7 +141,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
         server.shutdown()
       server.awaitShutdown()
     }
-    Utils.rm(server.config.logDirs)
+    CoreUtils.rm(server.config.logDirs)
     verifyNonDaemonThreadsStatus
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
old mode 100644
new mode 100755
index 661ddd5..60e10b3
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.ZkUtils
-import kafka.utils.Utils
+import kafka.utils.CoreUtils
 import kafka.utils.TestUtils
 
 import kafka.zk.ZooKeeperTestHarness
@@ -39,7 +39,7 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertTrue(pathExists)
 
     server.shutdown()
-    Utils.rm(server.config.logDirs)
+    CoreUtils.rm(server.config.logDirs)
   }
 
   def testConflictBrokerRegistration {
@@ -64,6 +64,6 @@ class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(brokerRegistration, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
 
     server1.shutdown()
-    Utils.rm(server1.config.logDirs)
+    CoreUtils.rm(server1.config.logDirs)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
old mode 100644
new mode 100755
index a8ed142..5a9e84d
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -22,6 +22,7 @@ import java.nio._
 import java.nio.channels._
 import java.util.Random
 import java.util.Properties
+import charset.Charset
 
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Utils._
@@ -82,7 +83,7 @@ object TestUtils extends Logging {
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
       override def run() = {
-        Utils.rm(f)
+        CoreUtils.rm(f)
       }
     })
 
@@ -835,6 +836,17 @@ object TestUtils extends Logging {
     }), "Cleaner offset for deleted partition should have been removed")
   }
 
+  /**
+   * Translate the given buffer into a string
+   * @param buffer The buffer to translate
+   * @param encoding The encoding to use in translating bytes to characters
+   */
+  def readString(buffer: ByteBuffer, encoding: String = Charset.defaultCharset.toString): String = {
+    val bytes = new Array[Byte](buffer.remaining)
+    buffer.get(bytes)
+    new String(bytes, encoding)
+  }
+
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
old mode 100644
new mode 100755
index 8c3797a..9e8869c
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -24,8 +24,9 @@ import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
 import kafka.common.KafkaException
-import kafka.utils.Utils.inLock
+import kafka.utils.CoreUtils.inLock
 import org.junit.Test
+import org.apache.kafka.common.utils.Utils
 
 
 class UtilsTest extends JUnitSuite {
@@ -34,13 +35,13 @@ class UtilsTest extends JUnitSuite {
 
   @Test
   def testSwallow() {
-    Utils.swallow(logger.info, throw new KafkaException("test"))
+    CoreUtils.swallow(logger.info, throw new KafkaException("test"))
   }
 
   @Test
   def testCircularIterator() {
     val l = List(1, 2)
-    val itl = Utils.circularIterator(l)
+    val itl = CoreUtils.circularIterator(l)
     assertEquals(1, itl.next())
     assertEquals(2, itl.next())
     assertEquals(1, itl.next())
@@ -48,7 +49,7 @@ class UtilsTest extends JUnitSuite {
     assertFalse(itl.hasDefiniteSize)
 
     val s = Set(1, 2)
-    val its = Utils.circularIterator(s)
+    val its = CoreUtils.circularIterator(s)
     assertEquals(1, its.next())
     assertEquals(2, its.next())
     assertEquals(1, its.next())
@@ -75,10 +76,10 @@ class UtilsTest extends JUnitSuite {
 
   @Test
   def testReplaceSuffix() {
-    assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
-    assertEquals("blah.foo", Utils.replaceSuffix("blah.foo.txt", ".txt", ""))
-    assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", ""))
-    assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt"))
+    assertEquals("blah.foo.text", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
+    assertEquals("blah.foo", CoreUtils.replaceSuffix("blah.foo.txt", ".txt", ""))
+    assertEquals("txt.txt", CoreUtils.replaceSuffix("txt.txt.txt", ".txt", ""))
+    assertEquals("foo.txt", CoreUtils.replaceSuffix("foo", "", ".txt"))
   }
 
   @Test
@@ -87,7 +88,7 @@ class UtilsTest extends JUnitSuite {
     val buffer = ByteBuffer.allocate(4 * values.size)
     for(i <- 0 until values.length) {
       buffer.putInt(i*4, values(i))
-      assertEquals("Written value should match read value.", values(i), Utils.readInt(buffer.array, i*4))
+      assertEquals("Written value should match read value.", values(i), CoreUtils.readInt(buffer.array, i*4))
     }
   }
 
@@ -95,8 +96,8 @@ class UtilsTest extends JUnitSuite {
   def testCsvList() {
     val emptyString:String = ""
     val nullString:String = null
-    val emptyList = Utils.parseCsvList(emptyString)
-    val emptyListFromNullString = Utils.parseCsvList(nullString)
+    val emptyList = CoreUtils.parseCsvList(emptyString)
+    val emptyListFromNullString = CoreUtils.parseCsvList(nullString)
     val emptyStringList = Seq.empty[String]
     assertTrue(emptyList!=null)
     assertTrue(emptyListFromNullString!=null)
@@ -107,32 +108,32 @@ class UtilsTest extends JUnitSuite {
   @Test
   def testCsvMap() {
     val emptyString: String = ""
-    val emptyMap = Utils.parseCsvMap(emptyString)
+    val emptyMap = CoreUtils.parseCsvMap(emptyString)
     val emptyStringMap = Map.empty[String, String]
     assertTrue(emptyMap != null)
     assertTrue(emptyStringMap.equals(emptyStringMap))
 
     val kvPairsIpV6: String = "a:b:c:v,a:b:c:v"
-    val ipv6Map = Utils.parseCsvMap(kvPairsIpV6)
+    val ipv6Map = CoreUtils.parseCsvMap(kvPairsIpV6)
     for (m <- ipv6Map) {
       assertTrue(m._1.equals("a:b:c"))
       assertTrue(m._2.equals("v"))
     }
 
     val singleEntry:String = "key:value"
-    val singleMap = Utils.parseCsvMap(singleEntry)
+    val singleMap = CoreUtils.parseCsvMap(singleEntry)
     val value = singleMap.getOrElse("key", 0)
     assertTrue(value.equals("value"))
 
     val kvPairsIpV4: String = "192.168.2.1/30:allow, 192.168.2.1/30:allow"
-    val ipv4Map = Utils.parseCsvMap(kvPairsIpV4)
+    val ipv4Map = CoreUtils.parseCsvMap(kvPairsIpV4)
     for (m <- ipv4Map) {
       assertTrue(m._1.equals("192.168.2.1/30"))
       assertTrue(m._2.equals("allow"))
     }
 
     val kvPairsSpaces: String = "key:value      , key:   value"
-    val spaceMap = Utils.parseCsvMap(kvPairsSpaces)
+    val spaceMap = CoreUtils.parseCsvMap(kvPairsSpaces)
     for (m <- spaceMap) {
       assertTrue(m._1.equals("key"))
       assertTrue(m._2.equals("value"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
old mode 100644
new mode 100755
index 1d87506..2bca2cf
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -21,7 +21,7 @@ import org.apache.zookeeper.server.ZooKeeperServer
 import org.apache.zookeeper.server.NIOServerCnxnFactory
 import kafka.utils.TestUtils
 import java.net.InetSocketAddress
-import kafka.utils.Utils
+import kafka.utils.CoreUtils
 import org.apache.kafka.common.utils.Utils.getPort
 
 class EmbeddedZookeeper() {
@@ -36,10 +36,10 @@ class EmbeddedZookeeper() {
   val port = zookeeper.getClientPort()
 
   def shutdown() {
-    Utils.swallow(zookeeper.shutdown())
-    Utils.swallow(factory.shutdown())
-    Utils.rm(logDir)
-    Utils.rm(snapshotDir)
+    CoreUtils.swallow(zookeeper.shutdown())
+    CoreUtils.swallow(factory.shutdown())
+    CoreUtils.rm(logDir)
+    CoreUtils.rm(snapshotDir)
   }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9c23d935/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
old mode 100644
new mode 100755
index fedefb5..86bddea
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,7 +19,7 @@ package kafka.zk
 
 import org.scalatest.junit.JUnit3Suite
 import org.I0Itec.zkclient.ZkClient
-import kafka.utils.{ZKStringSerializer, Utils}
+import kafka.utils.{ZKStringSerializer, CoreUtils}
 
 trait ZooKeeperTestHarness extends JUnit3Suite {
   var zkPort: Int = -1
@@ -38,8 +38,8 @@ trait ZooKeeperTestHarness extends JUnit3Suite {
   }
 
   override def tearDown() {
-    Utils.swallow(zkClient.close())
-    Utils.swallow(zookeeper.shutdown())
+    CoreUtils.swallow(zkClient.close())
+    CoreUtils.swallow(zookeeper.shutdown())
     super.tearDown
   }
 


Mime
View raw message