kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-2388: refactor KafkaConsumer subscribe API
Date Thu, 27 Aug 2015 00:18:47 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 03f850f67 -> 35eaef7bb


http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 5017c95..95325b0 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -33,9 +33,10 @@ case class BaseConsumerRecord(topic: String, partition: Int, offset: Long,
key:
 
 class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseConsumer {
   import org.apache.kafka.clients.consumer.KafkaConsumer
+  import scala.collection.JavaConversions._
 
   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
-  consumer.subscribe(topic)
+  consumer.subscribe(List(topic))
   var recordIter = consumer.poll(0).iterator
 
   override def receive(): BaseConsumerRecord = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 7797dee..0078b00 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -56,7 +56,7 @@ object ConsumerPerformance {
     var startMs, endMs = 0L
     if(config.useNewConsumer) {
       val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
-      consumer.subscribe(config.topic)
+      consumer.subscribe(List(config.topic))
       startMs = System.currentTimeMillis
       consume(consumer, config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
       endMs = System.currentTimeMillis

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 2c6ee23..666d62f 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -75,7 +75,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
 
     var consumed = 0
     val consumer = this.consumers(0)
-    consumer.subscribe(topic)
+    consumer.subscribe(List(topic))
 
     val scheduler = new BounceBrokerScheduler(numIters)
     scheduler.start()
@@ -106,7 +106,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     this.producers.foreach(_.close)
 
     val consumer = this.consumers(0)
-    consumer.subscribe(tp)
+    consumer.assign(List(tp))
     consumer.seek(tp, 0)
 
     // wait until all the followers have synced the last HW with leader

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 9e8172a..c1e5d02 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -12,7 +12,7 @@
  */
 package kafka.api
 
-import java.{lang, util}
+import java.{util, lang}
 
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
@@ -27,6 +27,7 @@ import java.util.ArrayList
 import org.junit.Assert._
 import org.junit.{Test, Before}
 
+import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import kafka.coordinator.ConsumerCoordinator
 
@@ -70,9 +71,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     val numRecords = 10000
     sendRecords(numRecords)
 
-    assertEquals(0, this.consumers(0).subscriptions.size)
-    this.consumers(0).subscribe(tp)
-    assertEquals(1, this.consumers(0).subscriptions.size)
+    assertEquals(0, this.consumers(0).assignment.size)
+    this.consumers(0).assign(List(tp))
+    assertEquals(1, this.consumers(0).assignment.size)
     
     this.consumers(0).seek(tp, 0)
     consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
@@ -92,8 +93,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     sendRecords(5, tp)
     sendRecords(7, tp2)
 
-    this.consumers(0).subscribe(tp)
-    this.consumers(0).subscribe(tp2)
+    this.consumers(0).assign(List(tp, tp2));
 
     // Need to poll to join the group
     this.consumers(0).poll(50)
@@ -121,7 +121,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   @Test
   def testAutoOffsetReset() {
     sendRecords(1)
-    this.consumers(0).subscribe(tp)
+    this.consumers(0).assign(List(tp))
     consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
   }
 
@@ -130,7 +130,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     val consumer = this.consumers(0)
     val totalRecords = 50L
     sendRecords(totalRecords.toInt)
-    consumer.subscribe(tp)
+    consumer.assign(List(tp))
 
     consumer.seekToEnd(tp)
     assertEquals(totalRecords, consumer.position(tp))
@@ -149,7 +149,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   @Test
   def testGroupConsumption() {
     sendRecords(10)
-    this.consumers(0).subscribe(topic)
+    this.consumers(0).subscribe(List(topic))
     consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
   }
 
@@ -167,7 +167,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
       this.consumers(0).position(new TopicPartition(topic, 15))
     }
 
-    this.consumers(0).subscribe(tp)
+    this.consumers(0).assign(List(tp))
 
     assertEquals("position() on a partition that we are subscribed to should reset the offset",
0L, this.consumers(0).position(tp))
     this.consumers(0).commit(CommitType.SYNC)
@@ -181,7 +181,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     sendRecords(1)
 
     // another consumer in the same group should get the same position
-    this.consumers(1).subscribe(tp)
+    this.consumers(1).assign(List(tp))
     consumeRecords(this.consumers(1), 1, 5)
   }
 
@@ -216,14 +216,14 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
 
   @Test
   def testPartitionReassignmentCallback() {
-    val callback = new TestConsumerReassignmentCallback()
+    val listener = new TestConsumerReassignmentListener()
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); //
timeout quickly to avoid slow test
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
-    val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(),
new ByteArrayDeserializer())
-    consumer0.subscribe(topic)
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new
ByteArrayDeserializer())
+    consumer0.subscribe(List(topic), listener)
         
     // the initial subscription should cause a callback execution
-    while(callback.callsToAssigned == 0)
+    while(listener.callsToAssigned == 0)
       consumer0.poll(50)
     
     // get metadata for the topic
@@ -238,31 +238,32 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     this.servers(coordinator).shutdown()
     
     // this should cause another callback execution
-    while(callback.callsToAssigned < 2)
+    while(listener.callsToAssigned < 2)
       consumer0.poll(50)
 
-    assertEquals(2, callback.callsToAssigned)
-    assertEquals(2, callback.callsToRevoked)
+    assertEquals(2, listener.callsToAssigned)
+    assertEquals(2, listener.callsToRevoked)
 
     consumer0.close()
   }
 
   @Test
   def testUnsubscribeTopic() {
-    val callback = new TestConsumerReassignmentCallback()
+
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); //
timeout quickly to avoid slow test
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
-    val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(),
new ByteArrayDeserializer())
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new
ByteArrayDeserializer())
 
     try {
-      consumer0.subscribe(topic)
+      val listener = new TestConsumerReassignmentListener()
+      consumer0.subscribe(List(topic), listener)
 
       // the initial subscription should cause a callback execution
-      while (callback.callsToAssigned == 0)
+      while (listener.callsToAssigned == 0)
         consumer0.poll(50)
 
-      consumer0.unsubscribe(topic)
-      assertEquals(0, consumer0.subscriptions.size())
+      consumer0.subscribe(List())
+      assertEquals(0, consumer0.assignment.size())
     } finally {
       consumer0.close()
     }
@@ -273,18 +274,18 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     val otherTopic = "other"
     val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1))
     val expandedSubscriptions = subscriptions ++ Set(new TopicPartition(otherTopic, 0), new
TopicPartition(otherTopic, 1))
-    this.consumers(0).subscribe(topic)
+    this.consumers(0).subscribe(List(topic))
     TestUtils.waitUntilTrue(() => {
       this.consumers(0).poll(50)
-      this.consumers(0).subscriptions == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+      this.consumers(0).assignment == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
 
     TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
-    this.consumers(0).subscribe(otherTopic)
+    this.consumers(0).subscribe(List(topic, otherTopic))
     TestUtils.waitUntilTrue(() => {
       this.consumers(0).poll(50)
-      this.consumers(0).subscriptions == expandedSubscriptions.asJava
-    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+      this.consumers(0).assignment == expandedSubscriptions.asJava
+    }, s"Expected partitions ${expandedSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
   }
 
   @Test
@@ -293,23 +294,23 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     TestUtils.createTopic(this.zkClient, otherTopic, 2, serverCount, this.servers)
     val subscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new
TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
     val shrunkenSubscriptions = Set(new TopicPartition(topic, 0), new TopicPartition(topic,
1))
-    this.consumers(0).subscribe(topic, otherTopic)
+    this.consumers(0).subscribe(List(topic, otherTopic))
     TestUtils.waitUntilTrue(() => {
       this.consumers(0).poll(50)
-      this.consumers(0).subscriptions == subscriptions.asJava
-    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+      this.consumers(0).assignment == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got ${this.consumers(0).assignment}")
 
-    this.consumers(0).unsubscribe(otherTopic)
+    this.consumers(0).subscribe(List(topic))
     TestUtils.waitUntilTrue(() => {
       this.consumers(0).poll(50)
-      this.consumers(0).subscriptions == shrunkenSubscriptions.asJava
-    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).subscriptions}")
+      this.consumers(0).assignment == shrunkenSubscriptions.asJava
+    }, s"Expected partitions ${shrunkenSubscriptions.asJava} but actually got ${this.consumers(0).assignment}")
   }
 
   @Test
   def testPartitionPauseAndResume() {
     sendRecords(5)
-    this.consumers(0).subscribe(tp)
+    this.consumers(0).assign(List(tp))
     consumeRecords(this.consumers(0), 5, 0)
     this.consumers(0).pause(tp)
     sendRecords(5)
@@ -322,22 +323,22 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   def testPauseStateNotPreservedByRebalance() {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); //
timeout quickly to avoid slow test
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30");
-    val consumer0 = new KafkaConsumer(this.consumerConfig, null, new ByteArrayDeserializer(),
new ByteArrayDeserializer())
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new
ByteArrayDeserializer())
 
     sendRecords(5)
-    consumer0.subscribe(topic)
+    consumer0.subscribe(List(topic))
     consumeRecords(consumer0, 5, 0)
     consumer0.pause(tp)
 
     // subscribe to a new topic to trigger a rebalance
-    consumer0.subscribe("topic2")
+    consumer0.subscribe(List("topic2"))
 
     // after rebalance, our position should be reset and our pause state lost,
     // so we should be able to consume from the beginning
     consumeRecords(consumer0, 0, 5)
   }
 
-  private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
+  private class TestConsumerReassignmentListener extends ConsumerRebalanceListener {
     var callsToAssigned = 0
     var callsToRevoked = 0
     def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition])
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
index a11bf90..3343c53 100644
--- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala
+++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala
@@ -183,7 +183,7 @@ class QuotasTest extends KafkaServerTestHarness {
   }
 
   def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) {
-    consumer.subscribe(topic1)
+    consumer.subscribe(List(topic1))
     var numConsumed = 0
     while (numConsumed < numRecords) {
       for (cr <- consumer.poll(100)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
index 4281036..057c70c 100644
--- a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.consumer.CommitType
@@ -123,9 +123,9 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
   def testSimpleConsumption() {
     val numRecords = 10000
     sendRecords(numRecords)
-    assertEquals(0, this.consumers(0).subscriptions.size)
-    this.consumers(0).subscribe(tp)
-    assertEquals(1, this.consumers(0).subscriptions.size)
+    assertEquals(0, this.consumers(0).assignment.size)
+    this.consumers(0).assign(List(tp))
+    assertEquals(1, this.consumers(0).assignment.size)
     this.consumers(0).seek(tp, 0)
     consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0)
   }
@@ -133,7 +133,7 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
   @Test
   def testAutoOffsetReset() {
     sendRecords(1)
-    this.consumers(0).subscribe(tp)
+    this.consumers(0).assign(List(tp))
     consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
   }
 
@@ -142,7 +142,7 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
     val consumer = this.consumers(0)
     val totalRecords = 50L
     sendRecords(totalRecords.toInt)
-    consumer.subscribe(tp)
+    consumer.assign(List(tp))
 
     consumer.seekToEnd(tp)
     assertEquals(totalRecords, consumer.position(tp))
@@ -161,7 +161,7 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
   @Test
   def testGroupConsumption() {
     sendRecords(10)
-    this.consumers(0).subscribe(topic)
+    this.consumers(0).subscribe(List(topic))
     consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0)
   }
 
@@ -179,7 +179,7 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
       this.consumers(0).position(new TopicPartition(topic, 15))
     }
 
-    this.consumers(0).subscribe(tp)
+    this.consumers(0).assign(List(tp))
 
     assertEquals("position() on a partition that we are subscribed to should reset the offset",
0L, this.consumers(0).position(tp))
     this.consumers(0).commit(CommitType.SYNC)
@@ -193,7 +193,7 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
     sendRecords(1)
 
     // another consumer in the same group should get the same position
-    this.consumers(1).subscribe(tp)
+    this.consumers(1).assign(List(tp))
     consumeRecords(this.consumers(1), 1, 5)
   }
 
@@ -207,19 +207,6 @@ class SSLConsumerTest extends KafkaServerTestHarness with Logging {
     assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
   }
 
-  private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
-    var callsToAssigned = 0
-    var callsToRevoked = 0
-    def onPartitionsAssigned(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition])
{
-      info("onPartitionsAssigned called.")
-      callsToAssigned += 1
-    }
-    def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition])
{
-      info("onPartitionsRevoked called.")
-      callsToRevoked += 1
-    }
-  }
-
   private def sendRecords(numRecords: Int) {
     val futures = (0 until numRecords).map { i =>
       this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes))

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index dba1afb..6c22e8b 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -127,7 +127,7 @@ private object PartitionAssignorTest extends Logging {
       "\n" +
       "Group                  : %s\n".format(group) +
       "Topic partition counts : %s\n".format(topicPartitionCounts) +
-      "Consumer subscriptions : %s\n".format(subscriptions)
+      "Consumer assignment : %s\n".format(subscriptions)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/35eaef7b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 00fbb61..09b8444 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -47,8 +47,7 @@ import kafka.log._
 
 import org.junit.Assert._
 import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback
+import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.security.ssl.SSLFactory
@@ -435,7 +434,6 @@ object TestUtils extends Logging {
                         partitionFetchSize: Long = 4096L,
                         partitionAssignmentStrategy: String = "blah",
                         sessionTimeout: Int = 30000,
-                        callback: Option[ConsumerRebalanceCallback] = None,
                         enableSSL: Boolean = false,
                         trustStoreFile: Option[File] = None) : KafkaConsumer[Array[Byte],Array[Byte]]
= {
     import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -455,11 +453,7 @@ object TestUtils extends Logging {
       consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL")
       consumerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, trustStoreFile, "consumer"))
     }
-    if (callback.isDefined) {
-      new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps, callback.get, new ByteArrayDeserializer(),
new ByteArrayDeserializer())
-    } else {
-      new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
-    }
+    new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
   }
 
   /**


Mime
View raw message