kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/9] kafka git commit: KAFKA-4390; Replace MessageSet usage with client-side alternatives
Date Tue, 13 Dec 2016 18:41:28 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6626b058c -> 67f1e5b91


http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 3d1b485..4467394 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -24,12 +24,12 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
-import org.apache.kafka.common.record.{LogEntry, MemoryRecords}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.LogEntry
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.serialization.StringSerializer
-import org.junit.Test
 import org.junit.Assert._
+import org.junit.Test
 
 import scala.collection.JavaConverters._
 import scala.util.Random
@@ -120,13 +120,13 @@ class FetchRequestTest extends BaseRequestTest {
     val fetchResponse3 = sendFetchRequest(leaderId, fetchRequest3)
     assertEquals(shuffledTopicPartitions3, fetchResponse3.responseData.keySet.asScala.toSeq)
     val responseSize3 = fetchResponse3.responseData.asScala.values.map { partitionData =>
-      logEntries(partitionData).map(_.size).sum
+      logEntries(partitionData).map(_.sizeInBytes).sum
     }.sum
     assertTrue(responseSize3 <= maxResponseBytes)
     val partitionData3 = fetchResponse3.responseData.get(partitionWithLargeMessage1)
     assertEquals(Errors.NONE.code, partitionData3.errorCode)
     assertTrue(partitionData3.highWatermark > 0)
-    val size3 = logEntries(partitionData3).map(_.size).sum
+    val size3 = logEntries(partitionData3).map(_.sizeInBytes).sum
     assertTrue(s"Expected $size3 to be smaller than $maxResponseBytes", size3 <= maxResponseBytes)
     assertTrue(s"Expected $size3 to be larger than $maxPartitionBytes", size3 > maxPartitionBytes)
     assertTrue(maxPartitionBytes < partitionData3.records.sizeInBytes)
@@ -138,13 +138,13 @@ class FetchRequestTest extends BaseRequestTest {
     val fetchResponse4 = sendFetchRequest(leaderId, fetchRequest4)
     assertEquals(shuffledTopicPartitions4, fetchResponse4.responseData.keySet.asScala.toSeq)
     val nonEmptyPartitions4 = fetchResponse4.responseData.asScala.toSeq.collect {
-      case (tp, partitionData) if logEntries(partitionData).map(_.size).sum > 0 =>
tp
+      case (tp, partitionData) if logEntries(partitionData).map(_.sizeInBytes).sum > 0
=> tp
     }
     assertEquals(Seq(partitionWithLargeMessage2), nonEmptyPartitions4)
     val partitionData4 = fetchResponse4.responseData.get(partitionWithLargeMessage2)
     assertEquals(Errors.NONE.code, partitionData4.errorCode)
     assertTrue(partitionData4.highWatermark > 0)
-    val size4 = logEntries(partitionData4).map(_.size).sum
+    val size4 = logEntries(partitionData4).map(_.sizeInBytes).sum
     assertTrue(s"Expected $size4 to be larger than $maxResponseBytes", size4 > maxResponseBytes)
     assertTrue(maxResponseBytes < partitionData4.records.sizeInBytes)
   }
@@ -161,12 +161,11 @@ class FetchRequestTest extends BaseRequestTest {
     assertEquals(Errors.NONE.code, partitionData.errorCode)
     assertTrue(partitionData.highWatermark > 0)
     assertEquals(maxPartitionBytes, partitionData.records.sizeInBytes)
-    assertEquals(0, logEntries(partitionData).map(_.size).sum)
+    assertEquals(0, logEntries(partitionData).map(_.sizeInBytes).sum)
   }
 
   private def logEntries(partitionData: FetchResponse.PartitionData): Seq[LogEntry] = {
-    val memoryRecords = partitionData.records
-    memoryRecords.iterator.asScala.toIndexedSeq
+    partitionData.records.deepIterator.asScala.toIndexedSeq
   }
 
   private def checkFetchResponse(expectedPartitions: Seq[TopicPartition], fetchResponse:
FetchResponse,
@@ -181,25 +180,25 @@ class FetchRequestTest extends BaseRequestTest {
       assertEquals(Errors.NONE.code, partitionData.errorCode)
       assertTrue(partitionData.highWatermark > 0)
 
-      val memoryRecords = partitionData.records
-      responseBufferSize += memoryRecords.sizeInBytes
+      val records = partitionData.records
+      responseBufferSize += records.sizeInBytes
 
-      val messages = memoryRecords.iterator.asScala.toIndexedSeq
-      assertTrue(messages.size < numMessagesPerPartition)
-      val messagesSize = messages.map(_.size).sum
-      responseSize += messagesSize
-      if (messagesSize == 0 && !emptyResponseSeen) {
-        assertEquals(0, memoryRecords.sizeInBytes)
+      val entries = records.shallowIterator.asScala.toIndexedSeq
+      assertTrue(entries.size < numMessagesPerPartition)
+      val entriesSize = entries.map(_.sizeInBytes).sum
+      responseSize += entriesSize
+      if (entriesSize == 0 && !emptyResponseSeen) {
+        assertEquals(0, records.sizeInBytes)
         emptyResponseSeen = true
       }
-      else if (messagesSize != 0 && !emptyResponseSeen) {
-        assertTrue(messagesSize <= maxPartitionBytes)
-        assertEquals(maxPartitionBytes, memoryRecords.sizeInBytes)
+      else if (entriesSize != 0 && !emptyResponseSeen) {
+        assertTrue(entriesSize <= maxPartitionBytes)
+        assertEquals(maxPartitionBytes, records.sizeInBytes)
       }
-      else if (messagesSize != 0 && emptyResponseSeen)
-        fail(s"Expected partition with size 0, but found $tp with size $messagesSize")
-      else if (memoryRecords.sizeInBytes != 0 && emptyResponseSeen)
-        fail(s"Expected partition buffer with size 0, but found $tp with size ${memoryRecords.sizeInBytes}")
+      else if (entriesSize != 0 && emptyResponseSeen)
+        fail(s"Expected partition with size 0, but found $tp with size $entriesSize")
+      else if (records.sizeInBytes != 0 && emptyResponseSeen)
+        fail(s"Expected partition buffer with size 0, but found $tp with size ${records.sizeInBytes}")
 
     }
 
@@ -208,7 +207,7 @@ class FetchRequestTest extends BaseRequestTest {
   }
 
   private def createTopics(numTopics: Int, numPartitions: Int): Map[TopicPartition, Int]
= {
-    val topics = (0 until numPartitions).map(t => s"topic${t}")
+    val topics = (0 until numPartitions).map(t => s"topic$t")
     val topicConfig = new Properties
     topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString)
     topics.flatMap { topic =>
@@ -223,7 +222,7 @@ class FetchRequestTest extends BaseRequestTest {
       tp <- topicPartitions.toSeq
       messageIndex <- 0 until numMessagesPerPartition
     } yield {
-      val suffix = s"${tp}-${messageIndex}"
+      val suffix = s"$tp-$messageIndex"
       new ProducerRecord(tp.topic, tp.partition, s"key $suffix", s"value $suffix")
     }
     records.map(producer.send).foreach(_.get)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 2d51be9..aad37d1 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -17,21 +17,20 @@
 package kafka.server
 
 import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
 
-import org.apache.kafka.common.metrics.Metrics
-import org.junit.{After, Before, Test}
-
-import collection.mutable.HashMap
-import collection.mutable.Map
 import kafka.cluster.{Partition, Replica}
-import org.easymock.EasyMock
 import kafka.log.Log
-import org.junit.Assert._
 import kafka.utils._
-import java.util.concurrent.atomic.AtomicBoolean
-
-import kafka.message.MessageSet
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.utils.Time
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable.{HashMap, Map}
+
 
 class IsrExpirationTest {
 
@@ -76,7 +75,7 @@ class IsrExpirationTest {
 
     // let the follower catch up to the Leader logEndOffset (15)
     (partition0.assignedReplicas() - leaderReplica).foreach(
-      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
MessageSet.Empty),
+      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
MemoryRecords.EMPTY),
                                                    -1L,
                                                    -1,
                                                    true)))
@@ -127,7 +126,7 @@ class IsrExpirationTest {
 
     // Make the remote replica not read to the end of log. It should be not be out of sync
for at least 100 ms
     for(replica <- partition0.assignedReplicas() - leaderReplica)
-      replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L),
MessageSet.Empty), -1L, -1, false))
+      replica.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(10L),
MemoryRecords.EMPTY), -1L, -1, false))
 
     // Simulate 2 fetch requests spanning more than 100 ms which do not read to the end of
the log.
     // The replicas will no longer be in ISR. We do 2 fetches because we want to simulate
the case where the replica is lagging but is not stuck
@@ -137,7 +136,7 @@ class IsrExpirationTest {
     time.sleep(75)
 
     (partition0.assignedReplicas() - leaderReplica).foreach(
-      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L),
MessageSet.Empty), -1L, -1, false)))
+      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(11L),
MemoryRecords.EMPTY), -1L, -1, false)))
     partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
 
@@ -149,7 +148,7 @@ class IsrExpirationTest {
 
     // Now actually make a fetch to the end of the log. The replicas should be back in ISR
     (partition0.assignedReplicas() - leaderReplica).foreach(
-      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
MessageSet.Empty), -1L, -1, true)))
+      r => r.updateLogReadResult(new LogReadResult(FetchDataInfo(new LogOffsetMetadata(15L),
MemoryRecords.EMPTY), -1L, -1, true)))
     partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 70445d7..b577e7d 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -26,12 +26,12 @@ import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo
 import kafka.common.TopicAndPartition
 import kafka.consumer.SimpleConsumer
 import kafka.log.{Log, LogSegment}
-import kafka.message.{ByteBufferMessageSet, Message, NoCompressionCodec}
 import kafka.utils.TestUtils._
 import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.easymock.{EasyMock, IAnswer}
 import org.junit.Assert._
@@ -89,9 +89,9 @@ class LogOffsetTest extends ZooKeeperTestHarness {
                   "Log for partition [topic,0] should be created")
     val log = logManager.getLog(TopicAndPartition(topic, part)).get
 
-    val message = new Message(Integer.toString(42).getBytes())
+    val record = Record.create(Integer.toString(42).getBytes())
     for (_ <- 0 until 20)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+      log.append(MemoryRecords.withRecords(record))
     log.flush()
 
     val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.LatestTime,
15)
@@ -150,9 +150,9 @@ class LogOffsetTest extends ZooKeeperTestHarness {
 
     val logManager = server.getLogManager
     val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
-    val message = new Message(Integer.toString(42).getBytes())
+    val record = Record.create(Integer.toString(42).getBytes())
     for (_ <- 0 until 20)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+      log.append(MemoryRecords.withRecords(record))
     log.flush()
 
     val now = time.milliseconds + 30000 // pretend it is the future to avoid race conditions
with the fs
@@ -179,9 +179,9 @@ class LogOffsetTest extends ZooKeeperTestHarness {
 
     val logManager = server.getLogManager
     val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
-    val message = new Message(Integer.toString(42).getBytes())
+    val record = Record.create(Integer.toString(42).getBytes())
     for (_ <- 0 until 20)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+      log.append(MemoryRecords.withRecords(record))
     log.flush()
 
     val offsets = server.apis.fetchOffsets(logManager, new TopicPartition(topic, part), OffsetRequest.EarliestTime,
10)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index bd74dee..51be54c 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -54,11 +54,11 @@ class ProduceRequestTest extends BaseRequestTest {
     }
 
     sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.NONE,
-      new Record(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
+      Record.create(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0)
 
     sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.GZIP,
-      new Record(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
-      new Record(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
+      Record.create(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes),
+      Record.create(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1)
   }
 
   /* returns a pair of partition id and leader id */
@@ -74,7 +74,7 @@ class ProduceRequestTest extends BaseRequestTest {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
     val timestamp = 1000000
     val recordBuffer = JTestUtils.partitionRecordsBuffer(0, CompressionType.LZ4,
-      new Record(timestamp, "key".getBytes, "value".getBytes))
+      Record.create(timestamp, "key".getBytes, "value".getBytes))
     // Change the lz4 checksum value so that it doesn't match the contents
     recordBuffer.array.update(40, 0)
     val topicPartition = new TopicPartition("topic", partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 378d382..a643f63 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -16,28 +16,29 @@
   */
 package kafka.server
 
-
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.Replica
 import kafka.common.TopicAndPartition
 import kafka.log.Log
-import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.easymock.EasyMock
 import EasyMock._
 import org.junit.Assert._
 import org.junit.{After, Test}
 
+import scala.collection.JavaConverters._
+
 class ReplicaManagerQuotasTest {
   val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps(_,
new Properties()))
   val time = new MockTime
   val metrics = new Metrics
-  val message = new Message("some-data-in-a-message".getBytes())
+  val record = Record.create("some-data-in-a-message".getBytes())
   val topicAndPartition1 = TopicAndPartition("test-topic", 1)
   val topicAndPartition2 = TopicAndPartition("test-topic", 2)
   val fetchInfo = Seq(new TopicPartition(topicAndPartition1.topic, topicAndPartition1.partition)
-> new PartitionData(0, 100),
@@ -63,10 +64,10 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with only one throttled, we should get the first",
1,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
 
     assertEquals("But we shouldn't get the second", 0,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
   }
 
   @Test
@@ -88,9 +89,9 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with both throttled, we should get no messages",
0,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
     assertEquals("Given two partitions, with both throttled, we should get no messages",
0,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
   }
 
   @Test
@@ -112,9 +113,9 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages",
1,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages",
1,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
   }
 
   @Test
@@ -136,13 +137,13 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with only one throttled, we should get the first",
1,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowIterator.asScala.size)
 
     assertEquals("But we should get the second too since it's throttled but in sync", 1,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.messageSet.size)
+      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowIterator.asScala.size)
   }
 
-  def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], message: Message = this.message,
bothReplicasInSync: Boolean = false) {
+  def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: Record = this.record,
bothReplicasInSync: Boolean = false) {
     val zkUtils = createNiceMock(classOf[ZkUtils])
     val scheduler = createNiceMock(classOf[KafkaScheduler])
 
@@ -153,16 +154,16 @@ class ReplicaManagerQuotasTest {
 
     //if we ask for len 1 return a message
     expect(log.read(anyObject(), geq(1), anyObject(), anyObject())).andReturn(
-      new FetchDataInfo(
+      FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
-        new ByteBufferMessageSet(message)
+        MemoryRecords.withRecords(record)
       )).anyTimes()
 
     //if we ask for len = 0, return 0 messages
     expect(log.read(anyObject(), EasyMock.eq(0), anyObject(), anyObject())).andReturn(
-      new FetchDataInfo(
+      FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
-        new ByteBufferMessageSet()
+        MemoryRecords.EMPTY
       )).anyTimes()
     replay(log)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c6d66ba..421de32 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -17,18 +17,16 @@
 
 package kafka.server
 
-
 import java.io.File
 import java.util.concurrent.atomic.AtomicBoolean
 
-import kafka.api.FetchResponsePartitionData
 import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
-import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{MemoryRecords, Record, Records}
 import org.apache.kafka.common.requests.{LeaderAndIsrRequest, PartitionState}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -105,11 +103,11 @@ class ReplicaManagerTest {
       def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
         assert(responseStatus.values.head.errorCode == Errors.INVALID_REQUIRED_ACKS.code)
       }
-      rm.appendMessages(
+      rm.appendRecords(
         timeout = 0,
         requiredAcks = 3,
         internalTopicsAllowed = false,
-        messagesPerPartition = Map(new TopicPartition("test1", 0) -> new ByteBufferMessageSet(new
Message("first message".getBytes))),
+        entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(Record.create("first
message".getBytes()))),
         responseCallback = callback)
     } finally {
       rm.shutdown(checkpointHW = false)
@@ -135,7 +133,7 @@ class ReplicaManagerTest {
       }
 
       var fetchCallbackFired = false
-      def fetchCallback(responseStatus: Seq[(TopicAndPartition, FetchResponsePartitionData)])
= {
+      def fetchCallback(responseStatus: Seq[(TopicAndPartition, FetchPartitionData)]) = {
         assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code,
responseStatus.map(_._2).head.error)
         fetchCallbackFired = true
       }
@@ -158,11 +156,11 @@ class ReplicaManagerTest {
       rm.getLeaderReplicaIfLocal(topic, 0)
 
       // Append a message.
-      rm.appendMessages(
+      rm.appendRecords(
         timeout = 1000,
         requiredAcks = -1,
         internalTopicsAllowed = false,
-        messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new
Message("first message".getBytes))),
+        entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(Record.create("first
message".getBytes()))),
         responseCallback = produceCallback)
 
       // Fetch some messages
@@ -220,19 +218,19 @@ class ReplicaManagerTest {
       
       // Append a couple of messages.
       for(i <- 1 to 2)
-        rm.appendMessages(
+        rm.appendRecords(
           timeout = 1000,
           requiredAcks = -1,
           internalTopicsAllowed = false,
-          messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new
Message("message %d".format(i).getBytes))),
+          entriesPerPartition = Map(new TopicPartition(topic, 0) -> MemoryRecords.withRecords(Record.create("message
%d".format(i).getBytes))),
           responseCallback = produceCallback)
       
       var fetchCallbackFired = false
       var fetchError = 0
-      var fetchedMessages: MessageSet = null
-      def fetchCallback(responseStatus: Seq[(TopicAndPartition, FetchResponsePartitionData)])
= {
+      var fetchedRecords: Records = null
+      def fetchCallback(responseStatus: Seq[(TopicAndPartition, FetchPartitionData)]) = {
         fetchError = responseStatus.map(_._2).head.error
-        fetchedMessages = responseStatus.map(_._2).head.messages
+        fetchedRecords = responseStatus.map(_._2).head.records
         fetchCallbackFired = true
       }
       
@@ -249,7 +247,7 @@ class ReplicaManagerTest {
       
       assertTrue(fetchCallbackFired)
       assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
-      assertTrue("Should return some data", fetchedMessages.iterator.hasNext)
+      assertTrue("Should return some data", fetchedRecords.shallowIterator.hasNext)
       fetchCallbackFired = false
       
       // Fetch a message above the high watermark as a consumer
@@ -264,7 +262,7 @@ class ReplicaManagerTest {
           
         assertTrue(fetchCallbackFired)
         assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
-        assertEquals("Should return empty response", MessageSet.Empty, fetchedMessages)
+        assertEquals("Should return empty response", MemoryRecords.EMPTY, fetchedRecords)
     } finally {
       rm.shutdown(checkpointHW = false)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index b1ebeee..2f73a94 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -21,7 +21,6 @@ import kafka.utils._
 import kafka.cluster.Replica
 import kafka.common.TopicAndPartition
 import kafka.log.Log
-import kafka.message.{ByteBufferMessageSet, Message, MessageSet}
 import kafka.server.QuotaFactory.UnboundedQuota
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
@@ -30,8 +29,10 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.easymock.EasyMock
 import org.junit.Assert._
+import scala.collection.JavaConverters._
 
 class SimpleFetchTest {
 
@@ -53,8 +54,8 @@ class SimpleFetchTest {
   val partitionHW = 5
 
   val fetchSize = 100
-  val messagesToHW = new Message("messageToHW".getBytes())
-  val messagesToLEO = new Message("messageToLEO".getBytes())
+  val messagesToHW = Record.create("messageToHW".getBytes())
+  val messagesToLEO = Record.create("messageToLEO".getBytes())
 
   val topic = "test-topic"
   val partitionId = 0
@@ -79,14 +80,14 @@ class SimpleFetchTest {
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(leaderLEO)).anyTimes()
     EasyMock.expect(log.read(0, fetchSize, Some(partitionHW), true)).andReturn(
-      new FetchDataInfo(
+      FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
-        new ByteBufferMessageSet(messagesToHW)
+        MemoryRecords.withRecords(messagesToHW)
       )).anyTimes()
     EasyMock.expect(log.read(0, fetchSize, None, true)).andReturn(
-      new FetchDataInfo(
+      FetchDataInfo(
         new LogOffsetMetadata(0L, 0L, 0),
-        new ByteBufferMessageSet(messagesToLEO)
+        MemoryRecords.withRecords(messagesToLEO)
       )).anyTimes()
     EasyMock.replay(log)
 
@@ -110,7 +111,7 @@ class SimpleFetchTest {
     // create the follower replica with defined log end offset
     val followerReplica= new Replica(configs(1).brokerId, partition, time)
     val leo = new LogOffsetMetadata(followerLEO, 0L, followerLEO.toInt)
-    followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MessageSet.Empty),
-1L, -1, true))
+    followerReplica.updateLogReadResult(new LogReadResult(FetchDataInfo(leo, MemoryRecords.EMPTY),
-1L, -1, true))
 
     // add both of them to ISR
     val allReplicas = List(leaderReplica, followerReplica)
@@ -153,7 +154,7 @@ class SimpleFetchTest {
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
         readPartitionInfo = fetchInfo,
-        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
+        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowIterator.next().record)
     assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
       replicaManager.readFromLocalLog(
         replicaId = Request.OrdinaryConsumerId,
@@ -162,7 +163,7 @@ class SimpleFetchTest {
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
         readPartitionInfo = fetchInfo,
-        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.messageSet.head.message)
+        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowIterator().next().record)
 
     assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
     assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())

http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 33ab58c..ede145a 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.nio._
 import java.nio.channels._
 import java.util.concurrent.{Callable, Executors, TimeUnit}
-import java.util.{Properties, Random}
+import java.util.Properties
 import java.security.cert.X509Certificate
 import javax.net.ssl.X509TrustManager
 import charset.Charset
@@ -48,6 +48,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
Produce
 import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.network.Mode
+import org.apache.kafka.common.record._
 import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.test.{TestUtils => JTestUtils}
@@ -269,16 +270,16 @@ object TestUtils extends Logging {
   }
 
   /**
-   * Wrap the message in a message set
-   *
-   * @param payload The bytes of the message
+   * Wrap a single record log buffer.
    */
-  def singleMessageSet(payload: Array[Byte],
-                       codec: CompressionCodec = NoCompressionCodec,
+  def singletonRecords(value: Array[Byte],
                        key: Array[Byte] = null,
-                       timestamp: Long = Message.NoTimestamp,
-                       magicValue: Byte = Message.CurrentMagicValue) =
-    new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key,
timestamp, magicValue))
+                       codec: CompressionType = CompressionType.NONE,
+                       timestamp: Long = Record.NO_TIMESTAMP,
+                       magicValue: Byte = Record.CURRENT_MAGIC_VALUE) = {
+    val record = Record.create(magicValue, timestamp, key, value)
+    MemoryRecords.withRecords(codec, record)
+  }
 
   /**
    * Generate an array of random bytes


Mime
View raw message