kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] kafka git commit: MINOR: Replace TopicAndPartition with TopicPartition in `Log` and `ReplicaManager`
Date Wed, 21 Dec 2016 00:41:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 0f86dbe89 -> 68f204e01


http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
index 591fcf7..a8f9fc6 100644
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -26,6 +26,7 @@ import kafka.utils.TestUtils
 import TestUtils._
 import kafka.common._
 import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.StringSerializer
 
 abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness  {
@@ -75,10 +76,10 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness  {
     def logsMatch(): Boolean = {
       var result = true
       for (topic <- List(topic1, topic2)) {
-        val topicAndPart = TopicAndPartition(topic, partition)
-        val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset
+        val tp = new TopicPartition(topic, partition)
+        val expectedOffset = brokers.head.getLogManager().getLog(tp).get.logEndOffset
         result = result && expectedOffset > 0 && brokers.forall { item
=>
-          expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset
+          expectedOffset == item.getLogManager().getLog(tp).get.logEndOffset
         }
       }
       result

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 2e50d30..cf0dc6f 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -28,6 +28,7 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.utils._
 import kafka.common._
 import kafka.admin.{AdminOperationException, AdminUtils}
+import org.apache.kafka.common.TopicPartition
 
 import scala.collection.Map
 
@@ -40,7 +41,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
       this.servers.head.dynamicConfigHandlers.contains(ConfigType.Topic))
     val oldVal: java.lang.Long = 100000L
     val newVal: java.lang.Long = 200000L
-    val tp = TopicAndPartition("test", 0)
+    val tp = new TopicPartition("test", 0)
     val logProps = new Properties()
     logProps.put(FlushMessagesProp, oldVal.toString)
     AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps)

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 358b2a4..948b5ec 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -18,6 +18,7 @@ package kafka.server
 
 import kafka.log._
 import java.io.File
+
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Utils
 import org.easymock.EasyMock
@@ -28,6 +29,8 @@ import kafka.cluster.Replica
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
 import java.util.concurrent.atomic.AtomicBoolean
 
+import org.apache.kafka.common.TopicPartition
+
 class HighwatermarkPersistenceTest {
 
   val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
@@ -63,9 +66,9 @@ class HighwatermarkPersistenceTest {
       replicaManager.checkpointHighWatermarks()
       var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
       assertEquals(0L, fooPartition0Hw)
-      val partition0 = replicaManager.getOrCreatePartition(topic, 0)
+      val partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic, 0))
       // create leader and follower replicas
-      val log0 = logManagers.head.createLog(TopicAndPartition(topic, 0), LogConfig())
+      val log0 = logManagers.head.createLog(new TopicPartition(topic, 0), LogConfig())
       val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, time,
0, Some(log0))
       partition0.addReplicaIfNotExists(leaderReplicaPartition0)
       val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, time)
@@ -107,9 +110,9 @@ class HighwatermarkPersistenceTest {
       replicaManager.checkpointHighWatermarks()
       var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
       assertEquals(0L, topic1Partition0Hw)
-      val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
+      val topic1Partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic1,
0))
       // create leader log
-      val topic1Log0 = logManagers.head.createLog(TopicAndPartition(topic1, 0), LogConfig())
+      val topic1Log0 = logManagers.head.createLog(new TopicPartition(topic1, 0), LogConfig())
       // create a local replica for topic1
       val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0,
time, 0, Some(topic1Log0))
       topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
@@ -123,9 +126,9 @@ class HighwatermarkPersistenceTest {
       assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark.messageOffset)
       assertEquals(5L, topic1Partition0Hw)
       // add another partition and set highwatermark
-      val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
+      val topic2Partition0 = replicaManager.getOrCreatePartition(new TopicPartition(topic2,
0))
       // create leader log
-      val topic2Log0 = logManagers.head.createLog(TopicAndPartition(topic2, 0), LogConfig())
+      val topic2Log0 = logManagers.head.createLog(new TopicPartition(topic2, 0), LogConfig())
       // create a local replica for topic2
       val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0,
time, 0, Some(topic2Log0))
       topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
@@ -155,7 +158,8 @@ class HighwatermarkPersistenceTest {
   }
 
   def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
-    replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse(TopicAndPartition(topic,
partition), 0L)
+    replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs.head).getAbsolutePath).read.getOrElse(
+      new TopicPartition(topic, partition), 0L)
   }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index aad37d1..03203ad 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.utils.Time
@@ -158,7 +159,7 @@ class IsrExpirationTest {
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: Int, time: Time,
config: KafkaConfig,
                                                localLog: Log): Partition = {
     val leaderId = config.brokerId
-    val partition = replicaManager.getOrCreatePartition(topic, partitionId)
+    val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
     val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index b577e7d..135e04b 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -85,9 +85,9 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     AdminUtils.createTopic(zkUtils, topic, 1, 1)
 
     val logManager = server.getLogManager
-    waitUntilTrue(() => logManager.getLog(TopicAndPartition(topic, part)).isDefined,
+    waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
                   "Log for partition [topic,0] should be created")
-    val log = logManager.getLog(TopicAndPartition(topic, part)).get
+    val log = logManager.getLog(new TopicPartition(topic, part)).get
 
     val record = Record.create(Integer.toString(42).getBytes())
     for (_ <- 0 until 20)
@@ -115,9 +115,9 @@ class LogOffsetTest extends ZooKeeperTestHarness {
   @Test
   def testEmptyLogsGetOffsets() {
     val topicPartition = "kafka-" + random.nextInt(10)
-    val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
+    val topicPartitionPath = TestUtils.tempDir().getAbsolutePath + "/" + topicPartition
     topicLogDir = new File(topicPartitionPath)
-    topicLogDir.mkdir
+    topicLogDir.mkdir()
 
     val topic = topicPartition.split("-").head
 
@@ -149,7 +149,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
+    val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig)
     val record = Record.create(Integer.toString(42).getBytes())
     for (_ <- 0 until 20)
       log.append(MemoryRecords.withRecords(record))
@@ -178,7 +178,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     AdminUtils.createTopic(zkUtils, topic, 3, 1)
 
     val logManager = server.getLogManager
-    val log = logManager.createLog(TopicAndPartition(topic, part), logManager.defaultConfig)
+    val log = logManager.createLog(new TopicPartition(topic, part), logManager.defaultConfig)
     val record = Record.create(Integer.toString(42).getBytes())
     for (_ <- 0 until 20)
       log.append(MemoryRecords.withRecords(record))
@@ -237,7 +237,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     val props = new Properties
     props.put("broker.id", nodeId.toString)
     props.put("port", TestUtils.RandomPort.toString())
-    props.put("log.dir", getLogDir.getAbsolutePath)
+    props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("log.flush.interval.messages", "1")
     props.put("enable.zookeeper", "false")
     props.put("num.partitions", "20")
@@ -248,9 +248,4 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     props
   }
 
-  private def getLogDir(): File = {
-    val dir = TestUtils.tempDir()
-    dir
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 40ad0f3..40ac7ec 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -18,13 +18,13 @@ package kafka.server
 
 import java.util.Properties
 
-import kafka.utils.TestUtils._
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
+import TestUtils._
 import kafka.zk.ZooKeeperTestHarness
-import kafka.common._
 import java.io.File
 
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
 import org.apache.kafka.common.utils.Utils
 import org.junit.{After, Before, Test}
@@ -45,6 +45,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   var configs: Seq[KafkaConfig] = null
   val topic = "new-topic"
   val partitionId = 0
+  val topicPartition = new TopicPartition(topic, partitionId)
 
   var server1: KafkaServer = null
   var server2: KafkaServer = null
@@ -107,13 +108,13 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
     // give some time for the follower 1 to record leader HW
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == numMessages,
+      server2.replicaManager.getReplica(topicPartition).get.highWatermark.messageOffset ==
numMessages,
       "Failed to update high watermark for follower after timeout")
 
     servers.foreach(_.replicaManager.checkpointHighWatermarks())
-    val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
+    val leaderHW = hwFile1.read.getOrElse(topicPartition, 0L)
     assertEquals(numMessages, leaderHW)
-    val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
+    val followerHW = hwFile2.read.getOrElse(topicPartition, 0L)
     assertEquals(numMessages, followerHW)
   }
 
@@ -121,7 +122,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
   def testHWCheckpointWithFailuresSingleLogSegment {
     var leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId)
 
-    assertEquals(0L, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(0L, hwFile1.read.getOrElse(topicPartition, 0L))
 
     sendMessages(1)
     Thread.sleep(1000)
@@ -129,7 +130,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
     // kill the server hosting the preferred replica
     server1.shutdown()
-    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
 
     // check if leader moves to the other server
     leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt
= leader)
@@ -144,10 +145,10 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it
can move to broker 0",
       leader.isDefined && (leader.get == 0 || leader.get == 1))
 
-    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
     // since server 2 was never shut down, the hw value of 30 is probably not checkpointed
to disk yet
     server2.shutdown()
-    assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
 
     server2.startup()
     updateProducer()
@@ -160,12 +161,12 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
     // give some time for follower 1 to record leader HW of 60
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
+      server2.replicaManager.getReplica(topicPartition).get.highWatermark.messageOffset ==
hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(_.shutdown())
-    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
-    assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
+    assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
   }
 
   @Test
@@ -174,13 +175,13 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     val hw = 20L
     // give some time for follower 1 to record leader HW of 600
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
+      server2.replicaManager.getReplica(topicPartition).get.highWatermark.messageOffset ==
hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(_.shutdown())
-    val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
+    val leaderHW = hwFile1.read.getOrElse(topicPartition, 0L)
     assertEquals(hw, leaderHW)
-    val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
+    val followerHW = hwFile2.read.getOrElse(topicPartition, 0L)
     assertEquals(hw, followerHW)
   }
 
@@ -193,13 +194,13 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
 
     // allow some time for the follower to get the leader HW
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
+      server2.replicaManager.getReplica(topicPartition).get.highWatermark.messageOffset ==
hw,
       "Failed to update high watermark for follower after timeout")
     // kill the server hosting the preferred replica
     server1.shutdown()
     server2.shutdown()
-    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
-    assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
+    assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
 
     server2.startup()
     updateProducer()
@@ -207,29 +208,29 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt
= leader)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
-    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
 
     // bring the preferred replica back
     server1.startup()
     updateProducer()
 
-    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
-    assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
+    assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
 
     sendMessages(2)
     hw += 2
 
     // allow some time for the follower to create replica
-    TestUtils.waitUntilTrue(() => server1.replicaManager.getReplica(topic, 0).nonEmpty,
+    TestUtils.waitUntilTrue(() => server1.replicaManager.getReplica(topicPartition).nonEmpty,
       "Failed to create replica in follower after timeout")
     // allow some time for the follower to get the leader HW
     TestUtils.waitUntilTrue(() =>
-      server1.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
+      server1.replicaManager.getReplica(topicPartition).get.highWatermark.messageOffset ==
hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(_.shutdown())
-    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
-    assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L))
+    assertEquals(hw, hwFile2.read.getOrElse(topicPartition, 0L))
   }
 
   private def sendMessages(n: Int = 1) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 50c55b8..5b89bac 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -20,7 +20,6 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.Replica
-import kafka.common.TopicAndPartition
 import kafka.log.Log
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -39,10 +38,9 @@ class ReplicaManagerQuotasTest {
   val time = new MockTime
   val metrics = new Metrics
   val record = Record.create("some-data-in-a-message".getBytes())
-  val topicAndPartition1 = TopicAndPartition("test-topic", 1)
-  val topicAndPartition2 = TopicAndPartition("test-topic", 2)
-  val fetchInfo = Seq(new TopicPartition(topicAndPartition1.topic, topicAndPartition1.partition)
-> new PartitionData(0, 100),
-    new TopicPartition(topicAndPartition2.topic, topicAndPartition2.partition) -> new
PartitionData(0, 100))
+  val topicPartition1 = new TopicPartition("test-topic", 1)
+  val topicPartition2 = new TopicPartition("test-topic", 2)
+  val fetchInfo = Seq(topicPartition1 -> new PartitionData(0, 100), topicPartition2 ->
new PartitionData(0, 100))
   var replicaManager: ReplicaManager = null
 
   @Test
@@ -64,10 +62,10 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with only one throttled, we should get the first",
1,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition1).get._2.info.records.shallowEntries.asScala.size)
 
     assertEquals("But we shouldn't get the second", 0,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition2).get._2.info.records.shallowEntries.asScala.size)
   }
 
   @Test
@@ -89,9 +87,9 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with both throttled, we should get no messages",
0,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition1).get._2.info.records.shallowEntries.asScala.size)
     assertEquals("Given two partitions, with both throttled, we should get no messages",
0,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition2).get._2.info.records.shallowEntries.asScala.size)
   }
 
   @Test
@@ -113,9 +111,9 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages",
1,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition1).get._2.info.records.shallowEntries.asScala.size)
     assertEquals("Given two partitions, with both non-throttled, we should get both messages",
1,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition2).get._2.info.records.shallowEntries.asScala.size)
   }
 
   @Test
@@ -137,10 +135,10 @@ class ReplicaManagerQuotasTest {
       readPartitionInfo = fetchInfo,
       quota = quota)
     assertEquals("Given two partitions, with only one throttled, we should get the first",
1,
-      fetch.find(_._1 == topicAndPartition1).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition1).get._2.info.records.shallowEntries.asScala.size)
 
     assertEquals("But we should get the second too since it's throttled but in sync", 1,
-      fetch.find(_._1 == topicAndPartition2).get._2.info.records.shallowEntries.asScala.size)
+      fetch.find(_._1 == topicPartition2).get._2.info.records.shallowEntries.asScala.size)
   }
 
   def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: Record = this.record,
bothReplicasInSync: Boolean = false) {
@@ -179,7 +177,7 @@ class ReplicaManagerQuotasTest {
 
     //create the two replicas
     for ((p, _) <- fetchInfo) {
-      val partition = replicaManager.getOrCreatePartition(p.topic, p.partition)
+      val partition = replicaManager.getOrCreatePartition(p)
       val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
       leaderReplica.highWatermark = new LogOffsetMetadata(5)
       partition.leaderReplicaIdOpt = Some(leaderReplica.brokerId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 50a4cd6..b710a4f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -65,7 +65,7 @@ class ReplicaManagerTest {
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time),
mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
     try {
-      val partition = rm.getOrCreatePartition(topic, 1)
+      val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
       partition.getOrCreateReplica(1)
       rm.checkpointHighWatermarks()
     } finally {
@@ -83,7 +83,7 @@ class ReplicaManagerTest {
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time),
mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower)
     try {
-      val partition = rm.getOrCreatePartition(topic, 1)
+      val partition = rm.getOrCreatePartition(new TopicPartition(topic, 1))
       partition.getOrCreateReplica(1)
       rm.checkpointHighWatermarks()
     } finally {
@@ -107,7 +107,7 @@ class ReplicaManagerTest {
         timeout = 0,
         requiredAcks = 3,
         internalTopicsAllowed = false,
-        entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(Record.create("first
message".getBytes()))),
+        entriesPerPartition = Map(new TopicPartition("test1", 0) -> MemoryRecords.withRecords(Record.create("first
message".getBytes))),
         responseCallback = callback)
     } finally {
       rm.shutdown(checkpointHW = false)
@@ -133,7 +133,7 @@ class ReplicaManagerTest {
       }
 
       var fetchCallbackFired = false
-      def fetchCallback(responseStatus: Seq[(TopicAndPartition, FetchPartitionData)]) = {
+      def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
         assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code,
responseStatus.map(_._2).head.error)
         fetchCallbackFired = true
       }
@@ -146,14 +146,14 @@ class ReplicaManagerTest {
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1).asJava
       val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1).asJava
 
-      val partition = rm.getOrCreatePartition(topic, 0)
+      val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
       // Make this replica the leader.
       val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0,
0, 0, brokerList, 0, brokerSet)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava)
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
-      rm.getLeaderReplicaIfLocal(topic, 0)
+      rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
       // Append a message.
       rm.appendRecords(
@@ -204,7 +204,7 @@ class ReplicaManagerTest {
       val brokerList: java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
       val brokerSet: java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
       
-      val partition = rm.getOrCreatePartition(topic, 0)
+      val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
       
       // Make this replica the leader.
@@ -212,7 +212,7 @@ class ReplicaManagerTest {
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0,
0, 0, brokerList, 0, brokerSet)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava)
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
-      rm.getLeaderReplicaIfLocal(topic, 0)
+      rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
       def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
       
@@ -228,7 +228,7 @@ class ReplicaManagerTest {
       var fetchCallbackFired = false
       var fetchError = 0
       var fetchedRecords: Records = null
-      def fetchCallback(responseStatus: Seq[(TopicAndPartition, FetchPartitionData)]) = {
+      def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
         fetchError = responseStatus.map(_._2).head.error
         fetchedRecords = responseStatus.map(_._2).head.records
         fetchCallbackFired = true

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
index 3616b7b..54b506d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotaManagerTest.scala
@@ -18,12 +18,13 @@ package kafka.server
 
 import java.util.Collections
 
-import kafka.common.TopicAndPartition
 import kafka.server.QuotaType._
-import org.apache.kafka.common.metrics.{Quota, MetricConfig, Metrics}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
 import org.apache.kafka.common.utils.MockTime
-import org.junit.Assert.{assertFalse, assertTrue, assertEquals}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.Test
+
 import scala.collection.JavaConverters._
 
 class ReplicationQuotaManagerTest {
@@ -110,11 +111,11 @@ class ReplicationQuotaManagerTest {
     quota.markThrottled("MyTopic")
 
     //Then
-    assertTrue(quota.isThrottled(TopicAndPartition("MyTopic", 0)))
-    assertFalse(quota.isThrottled(TopicAndPartition("MyOtherTopic", 0)))
+    assertTrue(quota.isThrottled(new TopicPartition("MyTopic", 0)))
+    assertFalse(quota.isThrottled(new TopicPartition("MyOtherTopic", 0)))
   }
 
-  private def tp1(id: Int): TopicAndPartition = new TopicAndPartition("topic1", id)
+  private def tp1(id: Int): TopicPartition = new TopicPartition("topic1", id)
 
   private def newMetrics(): Metrics = {
     new Metrics(new MetricConfig(), Collections.emptyList(), time)

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 3a09737..984d340 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -21,7 +21,6 @@ import java.util.Properties
 
 import kafka.admin.AdminUtils
 import kafka.admin.AdminUtils._
-import kafka.common._
 import kafka.log.LogConfig._
 import kafka.server.KafkaConfig.fromProps
 import kafka.server.QuotaType._
@@ -29,8 +28,10 @@ import kafka.utils.TestUtils._
 import kafka.utils.CoreUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+
 import scala.collection.JavaConverters._
 
 /**
@@ -42,7 +43,6 @@ import scala.collection.JavaConverters._
   *
   * Anything over 100MB/s tends to fail as this is the non-throttled replication rate
   */
-
 class ReplicationQuotasTest extends ZooKeeperTestHarness {
   def percentError(percent: Int, value: Long): Long = Math.round(value * percent / 100)
 
@@ -172,7 +172,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
     assertTrue(s"Expected ${rate} > $rateLowerBound", rate > rateLowerBound)
   }
 
-  def tp(partition: Int): TopicAndPartition = new TopicAndPartition(topic, partition)
+  def tp(partition: Int): TopicPartition = new TopicPartition(topic, partition)
 
   @Test
   def shouldThrottleOldSegments(): Unit = {
@@ -221,7 +221,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
 
   private def waitForOffsetsToMatch(offset: Int, partitionId: Int, brokerId: Int): Boolean
= {
     waitUntilTrue(() => {
-      offset == brokerFor(brokerId).getLogManager.getLog(TopicAndPartition(topic, partitionId))
+      offset == brokerFor(brokerId).getLogManager.getLog(new TopicPartition(topic, partitionId))
         .map(_.logEndOffset).getOrElse(0)
     }, s"Offsets did not match for partition $partitionId on broker $brokerId", 60000)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ff72657..676fd3f 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -59,9 +59,9 @@ class SimpleFetchTest {
 
   val topic = "test-topic"
   val partitionId = 0
-  val topicAndPartition = TopicAndPartition(topic, partitionId)
+  val topicPartition = new TopicPartition(topic, partitionId)
 
-  val fetchInfo = Seq(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition)
-> new PartitionData(0, fetchSize))
+  val fetchInfo = Seq(topicPartition -> new PartitionData(0, fetchSize))
 
   var replicaManager: ReplicaManager = null
 
@@ -93,7 +93,7 @@ class SimpleFetchTest {
 
     // create the log manager that is aware of this mock log
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
-    EasyMock.expect(logManager.getLog(topicAndPartition)).andReturn(Some(log)).anyTimes()
+    EasyMock.expect(logManager.getLog(topicPartition)).andReturn(Some(log)).anyTimes()
     EasyMock.replay(logManager)
 
     // create the replica manager
@@ -101,7 +101,7 @@ class SimpleFetchTest {
       new AtomicBoolean(false), QuotaFactory.instantiate(configs.head, metrics, time).follower)
 
     // add the partition with two replicas, both in ISR
-    val partition = replicaManager.getOrCreatePartition(topic, partitionId)
+    val partition = replicaManager.getOrCreatePartition(new TopicPartition(topic, partitionId))
 
     // create the leader replica with the local log
     val leaderReplica = new Replica(configs.head.brokerId, partition, time, 0, Some(log))
@@ -154,7 +154,7 @@ class SimpleFetchTest {
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
         readPartitionInfo = fetchInfo,
-        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowEntries.iterator.next().record)
+        quota = UnboundedQuota).find(_._1 == topicPartition).get._2.info.records.shallowEntries.iterator.next().record)
     assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
       replicaManager.readFromLocalLog(
         replicaId = Request.OrdinaryConsumerId,
@@ -163,7 +163,7 @@ class SimpleFetchTest {
         fetchMaxBytes = Int.MaxValue,
         hardMaxBytesLimit = false,
         readPartitionInfo = fetchInfo,
-        quota = UnboundedQuota).find(_._1 == topicAndPartition).get._2.info.records.shallowEntries().iterator.next().record)
+        quota = UnboundedQuota).find(_._1 == topicPartition).get._2.info.records.shallowEntries.iterator.next().record)
 
     assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count())
     assertEquals("Counts should increment after fetch", initialAllTopicsCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count())

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index d4ddb2f..dbad66f 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -18,10 +18,10 @@
 package kafka.utils
 
 import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.server.{ReplicaFetcherManager, KafkaConfig}
+import kafka.server.{KafkaConfig, ReplicaFetcherManager}
 import kafka.api.LeaderAndIsr
 import kafka.zk.ZooKeeperTestHarness
-import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.easymock.EasyMock
@@ -59,7 +59,7 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
     EasyMock.replay(log)
 
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
-    EasyMock.expect(logManager.getLog(TopicAndPartition(topic, partitionId))).andReturn(Some(log)).anyTimes()
+    EasyMock.expect(logManager.getLog(new TopicPartition(topic, partitionId))).andReturn(Some(log)).anyTimes()
     EasyMock.replay(logManager)
 
     val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])

http://git-wip-us.apache.org/repos/asf/kafka/blob/68f204e0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ede145a..67c79ce 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -47,6 +47,7 @@ import org.junit.Assert._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
 import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.Mode
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
@@ -760,16 +761,7 @@ object TestUtils extends Logging {
   }
 
   def isLeaderLocalOnBroker(topic: String, partitionId: Int, server: KafkaServer): Boolean
= {
-    val partitionOpt = server.replicaManager.getPartition(topic, partitionId)
-    partitionOpt match {
-      case None => false
-      case Some(partition) =>
-        val replicaOpt = partition.leaderReplicaIfLocal
-        replicaOpt match {
-          case None => false
-          case Some(_) => true
-        }
-    }
+    server.replicaManager.getPartition(new TopicPartition(topic, partitionId)).exists(_.leaderReplicaIfLocal.isDefined)
   }
 
   def createRequestByteBuffer(request: RequestOrResponse): ByteBuffer = {
@@ -813,12 +805,11 @@ object TestUtils extends Logging {
 
   def waitUntilLeaderIsKnown(servers: Seq[KafkaServer], topic: String, partition: Int,
                              timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
+    val tp = new TopicPartition(topic, partition)
     TestUtils.waitUntilTrue(() =>
       servers.exists { server =>
-        server.replicaManager.getPartition(topic, partition).exists(_.leaderReplicaIfLocal().isDefined)
-      },
-      "Partition [%s,%d] leaders not made yet after %d ms".format(topic, partition, timeout),
-      waitTime = timeout
+        server.replicaManager.getPartition(tp).exists(_.leaderReplicaIfLocal.isDefined)
+      }, s"Partition $tp leaders not made yet after $timeout ms", waitTime = timeout
     )
   }
 
@@ -1015,7 +1006,7 @@ object TestUtils extends Logging {
   }
 
   def verifyTopicDeletion(zkUtils: ZkUtils, topic: String, numPartitions: Int, servers: Seq[KafkaServer])
{
-    val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _))
+    val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
     // wait until admin path for delete topic is deleted, signaling completion of topic deletion
     TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)),
       "Admin path /admin/delete_topic/%s path not deleted even after a replica is restarted".format(topic))
@@ -1023,13 +1014,13 @@ object TestUtils extends Logging {
       "Topic path /brokers/topics/%s not deleted after /admin/delete_topic/%s path is deleted".format(topic,
topic))
     // ensure that the topic-partition has been deleted from all brokers' replica managers
     TestUtils.waitUntilTrue(() =>
-      servers.forall(server => topicAndPartitions.forall(tp => server.replicaManager.getPartition(tp.topic,
tp.partition).isEmpty)),
+      servers.forall(server => topicPartitions.forall(tp => server.replicaManager.getPartition(tp).isEmpty)),
       "Replica manager's should have deleted all of this topic's partitions")
     // ensure that logs from all replicas are deleted if delete topic is marked successful
in zookeeper
     assertTrue("Replica logs not deleted after delete topic is complete",
-      servers.forall(server => topicAndPartitions.forall(tp => server.getLogManager().getLog(tp).isEmpty)))
+      servers.forall(server => topicPartitions.forall(tp => server.getLogManager().getLog(tp).isEmpty)))
     // ensure that topic is removed from all cleaner offsets
-    TestUtils.waitUntilTrue(() => servers.forall(server => topicAndPartitions.forall
{ tp =>
+    TestUtils.waitUntilTrue(() => servers.forall(server => topicPartitions.forall {
tp =>
       val checkpoints = server.getLogManager().logDirs.map { logDir =>
         new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read()
       }
@@ -1050,12 +1041,10 @@ object TestUtils extends Logging {
   }
 
   def sslConfigs(mode: Mode, clientCert: Boolean, trustStoreFile: Option[File], certAlias:
String): Properties = {
-
     val trustStore = trustStoreFile.getOrElse {
       throw new Exception("SSL enabled but no trustStoreFile provided")
     }
 
-
     val sslConfigs = TestSslUtils.createSslConfig(clientCert, true, mode, trustStore, certAlias)
 
     val sslProps = new Properties()


Mime
View raw message