kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1387796 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/consumer/ main/scala/kafka/controller/ main/scala/kafka/javaapi/ main/scala/kafka/javaapi/consumer/ main/scala/kafka/javaap...
Date Wed, 19 Sep 2012 22:15:53 GMT
Author: junrao
Date: Wed Sep 19 22:15:52 2012
New Revision: 1387796

URL: http://svn.apache.org/viewvc?rev=1387796&view=rev
Log:
javaapi support for getTopoicMetaData; patched by Swapnil Ghike; reviewed by Jun Rao; KAFKA-500

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Wed Sep 19 22:15:52 2012
@@ -121,15 +121,14 @@ object AdminUtils extends Logging {
               case e => throw new ReplicaNotAvailableException(e)
             }
 
-            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError,
-              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+            new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
           }catch {
-            case e: ReplicaNotAvailableException => new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
-              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
-              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
-            case le: LeaderNotAvailableException => new PartitionMetadata(partition, None, replicaInfo, isrInfo,
-              ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]),
-              None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
+            case e: ReplicaNotAvailableException =>
+              new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
+                                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+            case le: LeaderNotAvailableException =>
+              new PartitionMetadata(partition, None, replicaInfo, isrInfo,
+                                    ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]))
           }
         }
         new TopicMetadata(topic, partitionMetadata)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadata.scala Wed Sep 19 22:15:52 2012
@@ -51,10 +51,6 @@ sealed trait LeaderRequest { def request
 case object LeaderExists extends LeaderRequest { val requestId: Byte = 1 }
 case object LeaderDoesNotExist extends LeaderRequest { val requestId: Byte = 0 }
 
-sealed trait LogSegmentMetadataRequest { def requestId: Byte }
-case object LogSegmentMetadataExists extends LogSegmentMetadataRequest { val requestId: Byte = 1 }
-case object LogSegmentMetadataDoesNotExist extends LogSegmentMetadataRequest { val requestId: Byte = 0 }
-
 object TopicMetadata {
 
   def readFrom(buffer: ByteBuffer): TopicMetadata = {
@@ -114,28 +110,7 @@ object PartitionMetadata {
       isr(i) = Broker.readFrom(buffer)
     }
 
-    val doesLogMetadataExist = getLogSegmentMetadataRequest(buffer.get)
-    val logMetadata = doesLogMetadataExist match {
-      case LogSegmentMetadataExists =>
-        val numLogSegments = getIntInRange(buffer, "total number of log segments", (0, Int.MaxValue))
-        val totalDataSize = getLongInRange(buffer, "total data size", (0, Long.MaxValue))
-        val numSegmentMetadata = getIntInRange(buffer, "number of log segment metadata", (0, Int.MaxValue))
-        val segmentMetadata = numSegmentMetadata match {
-          case 0 => None
-          case _ =>
-            val metadata = new ListBuffer[LogSegmentMetadata]()
-            for(i <- 0 until numSegmentMetadata) {
-              val beginningOffset = getLongInRange(buffer, "beginning offset", (0, Long.MaxValue))
-              val lastModified = getLongInRange(buffer, "last modified time", (0, Long.MaxValue))
-              val size = getLongInRange(buffer, "size of log segment", (0, Long.MaxValue))
-              metadata += new LogSegmentMetadata(beginningOffset, lastModified, size)
-            }
-            Some(metadata)
-        }
-        Some(new LogMetadata(numLogSegments, totalDataSize, segmentMetadata))
-      case LogSegmentMetadataDoesNotExist => None
-    }
-    new PartitionMetadata(partitionId, leader, replicas, isr, errorCode, logMetadata)
+    new PartitionMetadata(partitionId, leader, replicas, isr, errorCode)
   }
 
   private def getLeaderRequest(requestId: Byte): LeaderRequest = {
@@ -145,17 +120,10 @@ object PartitionMetadata {
       case _ => throw new KafkaException("Unknown leader request id " + requestId)
     }
   }
-
-  private def getLogSegmentMetadataRequest(requestId: Byte): LogSegmentMetadataRequest = {
-    requestId match {
-      case LogSegmentMetadataExists.requestId => LogSegmentMetadataExists
-      case LogSegmentMetadataDoesNotExist.requestId => LogSegmentMetadataDoesNotExist
-    }
-  }
 }
 
 case class PartitionMetadata(partitionId: Int, val leader: Option[Broker], replicas: Seq[Broker], isr: Seq[Broker] = Seq.empty,
-                             errorCode: Short = ErrorMapping.NoError, logMetadata: Option[LogMetadata] = None) {
+                             errorCode: Short = ErrorMapping.NoError) {
   def sizeInBytes: Int = {
     var size: Int = 2 /* error code */ + 4 /* partition id */ + 1 /* if leader exists*/
 
@@ -169,11 +137,6 @@ case class PartitionMetadata(partitionId
     size += 2 /* number of in sync replicas */
     size += isr.foldLeft(0)(_ + _.sizeInBytes)
 
-    size += 1 /* if log segment metadata exists */
-    logMetadata match {
-      case Some(metadata) => size += metadata.sizeInBytes
-      case None =>
-    }
     debug("Size of partition metadata = " + size)
     size
   }
@@ -198,53 +161,6 @@ case class PartitionMetadata(partitionId
     /* number of in-sync replicas */
     buffer.putShort(isr.size.toShort)
     isr.foreach(r => r.writeTo(buffer))
-
-    /* if log segment metadata exists */
-    logMetadata match {
-      case Some(metadata) =>
-        buffer.put(LogSegmentMetadataExists.requestId)
-        metadata.writeTo(buffer)
-      case None => buffer.put(LogSegmentMetadataDoesNotExist.requestId)
-    }
-
-  }
-}
-
-case class LogMetadata(numLogSegments: Int, totalSize: Long, logSegmentMetadata: Option[Seq[LogSegmentMetadata]]) {
-  def sizeInBytes: Int = {
-    var size: Int = 4 /* num log segments */ + 8 /* total data size */ + 4 /* number of log segment metadata */
-    logSegmentMetadata match {
-      case Some(segmentMetadata) => size += segmentMetadata.foldLeft(0)(_ + _.sizeInBytes)
-      case None =>
-    }
-    debug("Size of log metadata = " + size)
-    size
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(numLogSegments)
-    buffer.putLong(totalSize)
-    /* if segment metadata exists */
-    logSegmentMetadata match {
-      case Some(segmentMetadata) =>
-        /* number of log segments */
-        buffer.putInt(segmentMetadata.size)
-        segmentMetadata.foreach(m => m.writeTo(buffer))
-      case None =>
-        buffer.putInt(0)
-    }
-  }
-}
-
-case class LogSegmentMetadata(beginningOffset: Long, lastModified: Long, size: Long) {
-  def sizeInBytes: Int = {
-    8 /* beginning offset */ + 8 /* last modified timestamp */ + 8 /* log segment size in bytes */
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putLong(beginningOffset)
-    buffer.putLong(lastModified)
-    buffer.putLong(size)
   }
 }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Wed Sep 19 22:15:52 2012
@@ -21,11 +21,6 @@ import java.nio.ByteBuffer
 import kafka.utils.Utils._
 import collection.mutable.ListBuffer
 import kafka.utils._
-import kafka.common.KafkaException
-
-sealed trait DetailedMetadataRequest { def requestId: Short }
-case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] }
-case object NoSegmentMetadata extends DetailedMetadataRequest { val requestId = 0.asInstanceOf[Short] }
 
 object TopicMetadataRequest {
   val CurrentVersion = 1.shortValue()
@@ -33,20 +28,8 @@ object TopicMetadataRequest {
 
   /**
    * TopicMetadataRequest has the following format -
-   *
    * number of topics (4 bytes) list of topics (2 bytes + topic.length per topic) detailedMetadata (2 bytes) timestamp (8 bytes) count (4 bytes)
-   *
-   * The detailedMetadata field is a placeholder for requesting various details about partition and log metadata
-   * By default, the value for this field is 0, which means it will just return leader, replica and ISR metadata for
-   * all partitions of the list of topics mentioned in the request.
    */
-  def getDetailedMetadataRequest(requestId: Short): DetailedMetadataRequest = {
-    requestId match {
-      case SegmentMetadata.requestId => SegmentMetadata
-      case NoSegmentMetadata.requestId => NoSegmentMetadata
-      case _ => throw new KafkaException("Unknown detailed metadata request id " + requestId)
-    }
-  }
 
   def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
     val versionId = buffer.getShort
@@ -56,60 +39,27 @@ object TopicMetadataRequest {
     for(i <- 0 until numTopics)
       topics += readShortString(buffer, "UTF-8")
     val topicsList = topics.toList
-    val returnDetailedMetadata = getDetailedMetadataRequest(buffer.getShort)
-    var timestamp: Option[Long] = None
-    var count: Option[Int] = None
-    returnDetailedMetadata match {
-      case NoSegmentMetadata =>
-      case SegmentMetadata =>
-        timestamp = Some(buffer.getLong)
-        count = Some(buffer.getInt)
-      case _ => throw new KafkaException("Invalid value for the detailed metadata request "
-                                                    + returnDetailedMetadata.requestId)
-    }
-    debug("topic = %s, detailed metadata request = %d"
-          .format(topicsList.head, returnDetailedMetadata.requestId))
-    new TopicMetadataRequest(versionId, clientId, topics.toList, returnDetailedMetadata, timestamp, count)
+    debug("topic = %s".format(topicsList.head))
+    new TopicMetadataRequest(versionId, clientId, topics.toList)
   }
 }
 
 case class TopicMetadataRequest(val versionId: Short,
                                 val clientId: String,
-                                val topics: Seq[String],
-                                val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
-                                val timestamp: Option[Long] = None, val count: Option[Int] = None)
+                                val topics: Seq[String])
  extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
 
 def this(topics: Seq[String]) =
-  this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None)
-
-
-
+  this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
     buffer.putInt(topics.size)
     topics.foreach(topic => writeShortString(buffer, topic))
-    buffer.putShort(detailedMetadata.requestId)
-    detailedMetadata match {
-      case SegmentMetadata =>
-        buffer.putLong(timestamp.get)
-        buffer.putInt(count.get)
-      case NoSegmentMetadata =>
-      case _ => throw new KafkaException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
-    }
   }
 
   def sizeInBytes(): Int = {
-    var size: Int = 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
-                    2 /* detailed metadata */
-    detailedMetadata match {
-      case SegmentMetadata =>
-        size += 8 /* timestamp */ + 4 /* count */
-      case NoSegmentMetadata =>
-      case _ => throw new KafkaException("Invalid value for the detailed metadata request " + detailedMetadata.requestId)
-    }
-    size
+    2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
   }
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala?rev=1387796&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala Wed Sep 19 22:15:52 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+object TopicMetadataResponse {
+
+  def readFrom(buffer: ByteBuffer): TopicMetadataResponse = {
+    val versionId = buffer.getShort
+    val topicCount = buffer.getInt
+    val topicsMetadata = new Array[TopicMetadata](topicCount)
+    for( i <- 0 until topicCount) {
+      topicsMetadata(i) = TopicMetadata.readFrom(buffer)
+    }
+    new TopicMetadataResponse(versionId, topicsMetadata.toSeq)
+  }
+}
+
+case class TopicMetadataResponse(versionId: Short,
+                                 topicsMetadata: Seq[TopicMetadata]) extends RequestOrResponse
+{
+  val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes)
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    /* topic metadata */
+    buffer.putInt(topicsMetadata.length)
+    topicsMetadata.foreach(_.writeTo(buffer))
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Wed Sep 19 22:15:52 2012
@@ -86,6 +86,11 @@ class SimpleConsumer( val host: String,
     }
   }
 
+  def send(request: TopicMetadataRequest): TopicMetadataResponse = {
+    val response = sendRequest(request)
+    TopicMetadataResponse.readFrom(response.buffer)
+  }
+
   /**
    *  Fetch a set of messages from a topic.
    *
@@ -128,4 +133,4 @@ class SimpleConsumer( val host: String,
 object FetchRequestAndResponseStat extends KafkaMetricsGroup {
   val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
   val respondSizeHist = newHistogram("FetchResponseSize")
-}
\ No newline at end of file
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala Wed Sep 19 22:15:52 2012
@@ -22,7 +22,7 @@ import collection.JavaConversions._
 import kafka.api.LeaderAndIsr
 import kafka.common.StateChangeFailedException
 import java.util.concurrent.atomic.AtomicBoolean
-import org.I0Itec.zkclient.{IZkChildListener}
+import org.I0Itec.zkclient.IZkChildListener
 
 /**
  * This class represents the state machine for replicas. It defines the states that a replica can be in, and

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala Wed Sep 19 22:15:52 2012
@@ -17,15 +17,7 @@
 
 package kafka.javaapi
 
-import kafka.api.PartitionData
-import kafka.common.TopicAndPartition
-
-
-class FetchResponse( val versionId: Short,
-                     val correlationId: Int,
-                     private val data: Map[TopicAndPartition, PartitionData] ) {
-
-  private val underlying = kafka.api.FetchResponse(versionId, correlationId, data)
+class FetchResponse(private val underlying: kafka.api.FetchResponse) {
 
   def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = {
     import Implicits._

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Wed Sep 19 22:15:52 2012
@@ -19,14 +19,21 @@ package kafka.javaapi
 import kafka.utils.Logging
 
 private[javaapi] object Implicits extends Logging {
-  implicit def javaMessageSetToScalaMessageSet(messageSet: kafka.javaapi.message.ByteBufferMessageSet):
-     kafka.message.ByteBufferMessageSet = messageSet.underlying
 
   implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
-     kafka.javaapi.message.ByteBufferMessageSet = {
-    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer, messageSet.initialOffset)
-  }
+      kafka.javaapi.message.ByteBufferMessageSet = 
+    new kafka.javaapi.message.ByteBufferMessageSet(messageSet)
 
   implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse =
-    new kafka.javaapi.FetchResponse(response.versionId, response.correlationId, response.data)
+    new kafka.javaapi.FetchResponse(response)
+
+  implicit def toJavaTopicMetadataResponse(response: kafka.api.TopicMetadataResponse): kafka.javaapi.TopicMetadataResponse =
+    new kafka.javaapi.TopicMetadataResponse(response)
+
+  implicit def optionToJavaRef[T](opt: Option[T]): T = {
+    opt match {
+      case Some(obj) => obj
+      case None => null
+    }
+  }
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadata.scala?rev=1387796&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadata.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadata.scala Wed Sep 19 22:15:52 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+import kafka.cluster.Broker
+import scala.collection.JavaConversions.asList
+
+private[javaapi] object MetadataListImplicits {
+  implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]):
+  java.util.List[kafka.javaapi.TopicMetadata] =
+    topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))
+
+  implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]):
+  java.util.List[kafka.javaapi.PartitionMetadata] =
+    partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))
+}
+
+class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
+  def topic: String = underlying.topic
+
+  def partitionsMetadata: java.util.List[PartitionMetadata] = {
+    import kafka.javaapi.MetadataListImplicits._
+    underlying.partitionsMetadata
+  }
+
+  def errorCode: Short = underlying.errorCode
+
+  def sizeInBytes: Int = underlying.sizeInBytes
+}
+
+
+class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
+  def partitionId: Int = underlying.partitionId
+
+  def leader: Broker = {
+    import kafka.javaapi.Implicits._
+    underlying.leader
+  }
+
+  def replicas: java.util.List[Broker] = underlying.replicas
+
+  def isr: java.util.List[Broker] = underlying.isr
+
+  def errorCode: Short = underlying.errorCode
+
+  def sizeInBytes: Int = underlying.sizeInBytes
+}
+

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala?rev=1387796&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala Wed Sep 19 22:15:52 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+import kafka.api._
+import java.nio.ByteBuffer
+import scala.collection.JavaConversions.asBuffer
+
+class TopicMetadataRequest(val versionId: Short,
+                           val clientId: String,
+                           val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
+  val underlying: kafka.api.TopicMetadataRequest =
+    new kafka.api.TopicMetadataRequest(versionId, clientId, topics)
+
+  def this(topics: java.util.List[String]) =
+    this(kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics)
+
+  def writeTo(buffer: ByteBuffer) = underlying.writeTo(buffer)
+
+  def sizeInBytes: Int = underlying.sizeInBytes()
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala?rev=1387796&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataResponse.scala Wed Sep 19 22:15:52 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.javaapi
+
+class TopicMetadataResponse(private val underlying: kafka.api.TopicMetadataResponse) {
+  def sizeInBytes: Int = underlying.sizeInBytes
+
+  def topicsMetadata: java.util.List[kafka.javaapi.TopicMetadata] = {
+    import kafka.javaapi.MetadataListImplicits._
+    underlying.topicsMetadata
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala Wed Sep 19 22:15:52 2012
@@ -28,7 +28,7 @@ class SimpleConsumer(val host: String,
                      val port: Int,
                      val soTimeout: Int,
                      val bufferSize: Int) {
-  val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
+  private val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
 
   /**
    *  Fetch a set of messages from a topic. This version of the fetch method
@@ -55,6 +55,17 @@ class SimpleConsumer(val host: String,
   }
 
   /**
+   *  Fetch metadata for a sequence of topics.
+   *  
+   *  @param request specifies the versionId, clientId, sequence of topics.
+   *  @return metadata for each topic in the request.
+   */
+  def send(request: kafka.javaapi.TopicMetadataRequest): kafka.javaapi.TopicMetadataResponse = {
+    import kafka.javaapi.Implicits._
+    underlying.send(request.underlying)
+  }
+
+  /**
    *  Get a list of valid offsets (up to maxSize) before the given time.
    *  The result is a list of offsets, in descending order.
    *

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala Wed Sep 19 22:15:52 2012
@@ -62,7 +62,7 @@ private[kafka] class ZookeeperConsumerCo
                                  val enableFetcher: Boolean) // for testing only
     extends ConsumerConnector {
 
-  val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
+  private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
 
   def this(config: ConsumerConfig) = this(config, true)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Wed Sep 19 22:15:52 2012
@@ -16,20 +16,9 @@
 */
 package kafka.javaapi.message
 
-import java.nio.ByteBuffer
 import kafka.message._
 
-class ByteBufferMessageSet(private val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet {
-  val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, initialOffset)
-  def this(buffer: ByteBuffer) = this(buffer, 0L)
-
-  def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
-    this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*), 0L)
-  }
-
-  def this(messages: java.util.List[Message]) {
-    this(NoCompressionCodec, messages)
-  }
+class ByteBufferMessageSet(private val underlying: kafka.message.ByteBufferMessageSet) extends MessageSet {
 
   def validBytes: Long = underlying.validBytes
 
@@ -53,13 +42,11 @@ class ByteBufferMessageSet(private val b
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        underlying.equals(that.underlying)
       case _ => false
     }
   }
 
-  def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
-
   override def hashCode: Int = underlying.hashCode
 
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Wed Sep 19 22:15:52 2012
@@ -181,13 +181,11 @@ class ByteBufferMessageSet(val buffer: B
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        buffer.equals(that.buffer) && initialOffset == that.initialOffset
       case _ => false
     }
   }
 
-  override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
-
   override def hashCode: Int = {
     var hash = 17
     hash = hash * 31 + buffer.hashCode

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala Wed Sep 19 22:15:52 2012
@@ -73,16 +73,16 @@ class BrokerPartitionInfo(producerConfig
     var fetchMetaDataSucceeded: Boolean = false
     var i: Int = 0
     val topicMetadataRequest = new TopicMetadataRequest(topics)
-    var topicMetaDataResponse: Seq[TopicMetadata] = Nil
+    var topicsMetadata: Seq[TopicMetadata] = Nil
     var t: Throwable = null
     while(i < brokers.size && !fetchMetaDataSucceeded) {
       val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
       info("Fetching metadata for topic %s".format(topics))
       try {
-        topicMetaDataResponse = producer.send(topicMetadataRequest)
+        topicsMetadata = producer.send(topicMetadataRequest).topicsMetadata
         fetchMetaDataSucceeded = true
         // throw partition specific exception
-        topicMetaDataResponse.foreach(tmd =>{
+        topicsMetadata.foreach(tmd =>{
           trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
           if(tmd.errorCode == ErrorMapping.NoError){
             topicPartitionInfo.put(tmd.topic, tmd)
@@ -94,7 +94,7 @@ class BrokerPartitionInfo(producerConfig
             }
           })
         })
-        producerPool.updateProducer(topicMetaDataResponse)
+        producerPool.updateProducer(topicsMetadata)
       } catch {
         case e =>
           warn("fetching broker partition metadata for topics [%s] from broker [%s] failed".format(topics, brokers(i).toString), e)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Wed Sep 19 22:15:52 2012
@@ -21,7 +21,6 @@ import kafka.api._
 import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
 import kafka.utils._
 import java.util.Random
-import kafka.common.ErrorMapping
 import java.util.concurrent.TimeUnit
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 
@@ -109,12 +108,9 @@ class SyncProducer(val config: SyncProdu
     ProducerResponse.readFrom(response.buffer)
   }
 
-  def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {
+  def send(request: TopicMetadataRequest): TopicMetadataResponse = {
     val response = doSend(request)
-    val topicMetaDataResponse = TopicMetaDataResponse.readFrom(response.buffer)
-    // try to throw exception based on global error codes
-    ErrorMapping.maybeThrowException(topicMetaDataResponse.errorCode)
-    topicMetaDataResponse.topicsMetadata
+    TopicMetadataResponse.readFrom(response.buffer)
   }
 
   def close() = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Sep 19 22:15:52 2012
@@ -378,40 +378,39 @@ class KafkaApis(val requestChannel: Requ
     trace("Handling topic metadata request " + metadataRequest.toString())
 
     val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
-    var errorCode = ErrorMapping.NoError
     val config = replicaManager.config
-    try {
-      val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
-      metadataRequest.topics.zip(topicMetadataList).foreach(
-        topicAndMetadata =>{
-          val topic = topicAndMetadata._1
-          topicAndMetadata._2.errorCode match {
-            case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
-            case ErrorMapping.UnknownTopicOrPartitionCode =>
+    val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
+    metadataRequest.topics.zip(topicMetadataList).foreach(
+      topicAndMetadata => {
+        val topic = topicAndMetadata._1
+        topicAndMetadata._2.errorCode match {
+          case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
+          case ErrorMapping.UnknownTopicOrPartitionCode =>
+            try {
               /* check if auto creation of topics is turned on */
-              if(config.autoCreateTopics) {
+              if (config.autoCreateTopics) {
                 CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor,
                                                topicNameValidator = topicNameValidator)
                 info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
                              .format(topic, config.numPartitions, config.defaultReplicationFactor))
                 val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+                topicsMetadata += newTopicMetadata
                 newTopicMetadata.errorCode match {
-                  case ErrorMapping.NoError => topicsMetadata += newTopicMetadata
-                  case _ =>
-                    throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic))
+                  case ErrorMapping.NoError =>
+                  case _ => throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic))
                 }
               }
-            case _ => error("Error while fetching topic metadata for topic " + topic,
-                            ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
-          }
-        })
-    } catch {
-      case e => error("Error while retrieving topic metadata", e)
-      // convert exception type to error code
-      errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
-    }
+            } catch {
+              case e => error("Error while retrieving topic metadata", e)
+            }
+          case _ => 
+            error("Error while fetching topic metadata for topic " + topic,
+                  ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
+            topicsMetadata += topicAndMetadata._2
+        }
+      })
     topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
-    val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
+    val response = new TopicMetadataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala Wed Sep 19 22:15:52 2012
@@ -19,7 +19,7 @@ package kafka.server
 import kafka.utils.ZkUtils._
 import kafka.utils.Logging
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
-import org.I0Itec.zkclient.{IZkDataListener}
+import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Wed Sep 19 22:15:52 2012
@@ -29,7 +29,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
 import kafka.common.ErrorMapping
-import kafka.api.{RequestKeys, TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
+import kafka.api.{RequestKeys, TopicMetadata, TopicMetadataResponse, TopicMetadataRequest}
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -76,7 +76,6 @@ class TopicMetadataTest extends JUnit3Su
     val partitionMetadata = topicMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
     assertEquals(1, partitionMetadata.head.replicas.size)
   }
 
@@ -90,7 +89,6 @@ class TopicMetadataTest extends JUnit3Su
     val partitionMetadata = topicMetadata.head.partitionsMetadata
     assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
     assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
     assertEquals(0, partitionMetadata.head.replicas.size)
     assertEquals(None, partitionMetadata.head.leader)
     assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode)
@@ -117,7 +115,7 @@ class TopicMetadataTest extends JUnit3Su
     val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
     
     // check assertions
-    val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
+    val topicMetadata = TopicMetadataResponse.readFrom(metadataResponse).topicsMetadata
 
     topicMetadata
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala Wed Sep 19 22:15:52 2012
@@ -22,40 +22,24 @@ import org.junit.Test
 import kafka.message.{DefaultCompressionCodec, CompressionCodec, NoCompressionCodec, Message}
 
 class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestCases {
+  override def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet =
+    new ByteBufferMessageSet(new kafka.message.ByteBufferMessageSet(compressed, messages: _*))
+
+  val msgSeq: Seq[Message] = Seq(new Message("hello".getBytes()), new Message("there".getBytes()))
 
-  override def createMessageSet(messages: Seq[Message],
-                                compressed: CompressionCodec = NoCompressionCodec): ByteBufferMessageSet =
-    new ByteBufferMessageSet(compressed, getMessageList(messages: _*))
-  
   @Test
   def testEquals() {
-    val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                               messages = getMessageList(new Message("hello".getBytes()),
-                                                                         new Message("there".getBytes())))
-    val moreMessages = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                                messages = getMessageList(new Message("hello".getBytes()),
-                                                                          new Message("there".getBytes())))
-
+    val messageList = createMessageSet(msgSeq, NoCompressionCodec)
+    val moreMessages = createMessageSet(msgSeq, NoCompressionCodec)
     assertEquals(messageList, moreMessages)
     assertTrue(messageList.equals(moreMessages))
   }
 
   @Test
   def testEqualsWithCompression () {
-    val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
-                                            messages = getMessageList(new Message("hello".getBytes()),
-                                                                      new Message("there".getBytes())))
-    val moreMessages = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
-                                                messages = getMessageList(new Message("hello".getBytes()),
-                                                                          new Message("there".getBytes())))
-
+    val messageList = createMessageSet(msgSeq, DefaultCompressionCodec)
+    val moreMessages = createMessageSet(msgSeq, DefaultCompressionCodec)
     assertEquals(messageList, moreMessages)
     assertTrue(messageList.equals(moreMessages))
   }
-
-  private def getMessageList(messages: Message*): java.util.List[Message] = {
-    val messageList = new java.util.ArrayList[Message]()
-    messages.foreach(m => messageList.add(m))
-    messageList
-  }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala?rev=1387796&r1=1387795&r2=1387796&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala Wed Sep 19 22:15:52 2012
@@ -121,8 +121,8 @@ object RpcDataSerializationTestUtils{
     new TopicMetadataRequest(1, "client 1", Seq(topic1, topic2))
   }
 
-  def createTestTopicMetadataResponse: TopicMetaDataResponse = {
-    new TopicMetaDataResponse(1, Seq(topicmetaData1, topicmetaData2))
+  def createTestTopicMetadataResponse: TopicMetadataResponse = {
+    new TopicMetadataResponse(1, Seq(topicmetaData1, topicmetaData2))
   }
 }
 
@@ -215,7 +215,7 @@ class RpcDataSerializationTest extends J
     buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes)
     topicMetadataResponse.writeTo(buffer)
     buffer.rewind()
-    val deserializedTopicMetadataResponse = TopicMetaDataResponse.readFrom(buffer)
+    val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer)
     assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse,
                  deserializedTopicMetadataResponse)
   }



Mime
View raw message