kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka git commit: kafka-1797; add the serializer/deserializer api to the new java client; patched by Jun Rao; reviewed by Neha Narkhede
Date Thu, 18 Dec 2014 00:29:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ae0bb84fa -> 92d1d4cd3


http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index a913fe5..5ec613c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -52,10 +52,10 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
 
-  private var producer1: KafkaProducer = null
-  private var producer2: KafkaProducer = null
-  private var producer3: KafkaProducer = null
-  private var producer4: KafkaProducer = null
+  private var producer1: KafkaProducer[Array[Byte],Array[Byte]] = null
+  private var producer2: KafkaProducer[Array[Byte],Array[Byte]] = null
+  private var producer3: KafkaProducer[Array[Byte],Array[Byte]] = null
+  private var producer4: KafkaProducer[Array[Byte],Array[Byte]] = null
 
   private val topic1 = "topic-1"
   private val topic2 = "topic-2"
@@ -93,7 +93,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // send a too-large record
-    val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes
+ 1))
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
new Array[Byte](serverMessageMaxBytes + 1))
     assertEquals("Returned metadata should have offset -1", producer1.send(record).get.offset,
-1L)
   }
 
@@ -106,7 +106,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // send a too-large record
-    val record = new ProducerRecord(topic1, null, "key".getBytes, new Array[Byte](serverMessageMaxBytes
+ 1))
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
new Array[Byte](serverMessageMaxBytes + 1))
     intercept[ExecutionException] {
       producer2.send(record).get
     }
@@ -118,7 +118,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testNonExistentTopic() {
     // send a record with non-exist topic
-    val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic2, null, "key".getBytes,
"value".getBytes)
     intercept[ExecutionException] {
       producer1.send(record).get
     }
@@ -143,7 +143,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     producer4 = TestUtils.createNewProducer("localhost:8686,localhost:4242", acks = 1, blockOnBufferFull
= false, bufferSize = producerBufferSize)
 
     // send a record with incorrect broker list
-    val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
"value".getBytes)
     intercept[ExecutionException] {
       producer4.send(record).get
     }
@@ -160,7 +160,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // first send a message to make sure the metadata is refreshed
-    val record1 = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+    val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
"value".getBytes)
     producer1.send(record1).get
     producer2.send(record1).get
 
@@ -180,7 +180,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     val msgSize = producerBufferSize / tooManyRecords
     val value = new Array[Byte](msgSize)
     new Random().nextBytes(value)
-    val record2 = new ProducerRecord(topic1, null, "key".getBytes, value)
+    val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
value)
 
     intercept[KafkaException] {
       for (i <- 1 to tooManyRecords)
@@ -201,7 +201,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
     // create a record with incorrect partition id, send should fail
-    val record = new ProducerRecord(topic1, new Integer(1), "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, new Integer(1), "key".getBytes,
"value".getBytes)
     intercept[IllegalArgumentException] {
       producer1.send(record)
     }
@@ -221,7 +221,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     // create topic
     TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
 
-    val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, "key".getBytes,
"value".getBytes)
 
     // first send a message to make sure the metadata is refreshed
     producer1.send(record).get
@@ -300,7 +300,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   @Test
   def testCannotSendToInternalTopic() {
     val thrown = intercept[ExecutionException] {
-      producer2.send(new ProducerRecord(Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get
+      producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head,
"test".getBytes, "test".getBytes)).get
     }
     assertTrue(thrown.getCause.isInstanceOf[InvalidTopicException])
   }
@@ -313,7 +313,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
 
     TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
 
-    val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes,
"value".getBytes)
     try {
       producer3.send(record).get
       fail("Expected exception when producing to topic with fewer brokers than min.insync.replicas")
@@ -333,7 +333,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
 
     TestUtils.createTopic(zkClient, topicName, 1, 2, servers,topicProps)
 
-    val record = new ProducerRecord(topicName, null, "key".getBytes, "value".getBytes)
+    val record = new ProducerRecord[Array[Byte],Array[Byte]](topicName, null, "key".getBytes,
"value".getBytes)
     // this should work with all brokers up and running
     producer3.send(record).get
 
@@ -365,7 +365,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
     override def doWork(): Unit = {
       val responses =
         for (i <- sent+1 to sent+numRecords)
-        yield producer.send(new ProducerRecord(topic1, null, null, i.toString.getBytes))
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null,
i.toString.getBytes))
       val futures = responses.toList
 
       try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index d407af9..6196060 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -86,24 +86,24 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness
{
       TestUtils.createTopic(zkClient, topic, 1, 2, servers)
 
       // send a normal record
-      val record0 = new ProducerRecord(topic, new Integer(0), "key".getBytes, "value".getBytes)
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes,
"value".getBytes)
       assertEquals("Should have offset 0", 0L, producer.send(record0, callback).get.offset)
 
       // send a record with null value should be ok
-      val record1 = new ProducerRecord(topic, new Integer(0), "key".getBytes, null)
+      val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), "key".getBytes,
null)
       assertEquals("Should have offset 1", 1L, producer.send(record1, callback).get.offset)
 
       // send a record with null key should be ok
-      val record2 = new ProducerRecord(topic, new Integer(0), null, "value".getBytes)
+      val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, new Integer(0), null,
"value".getBytes)
       assertEquals("Should have offset 2", 2L, producer.send(record2, callback).get.offset)
 
       // send a record with null part id should be ok
-      val record3 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+      val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes,
"value".getBytes)
       assertEquals("Should have offset 3", 3L, producer.send(record3, callback).get.offset)
 
       // send a record with null topic should fail
       try {
-        val record4 = new ProducerRecord(null, new Integer(0), "key".getBytes, "value".getBytes)
+        val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, new Integer(0), "key".getBytes,
"value".getBytes)
         producer.send(record4, callback)
         fail("Should not allow sending a record without topic")
       } catch {
@@ -140,7 +140,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness
{
       TestUtils.createTopic(zkClient, topic, 1, 2, servers)
 
       // non-blocking send a list of records
-      val record0 = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+      val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes,
"value".getBytes)
       for (i <- 1 to numRecords)
         producer.send(record0)
       val response0 = producer.send(record0)
@@ -182,7 +182,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness
{
 
       val responses =
         for (i <- 1 to numRecords)
-        yield producer.send(new ProducerRecord(topic, partition, null, ("value" + i).getBytes))
+        yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition,
null, ("value" + i).getBytes))
       val futures = responses.toList
       futures.map(_.get)
       for (future <- futures)
@@ -228,7 +228,7 @@ class ProducerSendTest extends JUnit3Suite with KafkaServerTestHarness
{
 
     try {
       // Send a message to auto-create the topic
-      val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
+      val record = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes,
"value".getBytes)
       assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
 
       // double check that the topic is created with leader elected

http://git-wip-us.apache.org/repos/asf/kafka/blob/92d1d4cd/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 0da774d..94d0028 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -383,7 +383,7 @@ object TestUtils extends Logging {
                         metadataFetchTimeout: Long = 3000L,
                         blockOnBufferFull: Boolean = true,
                         bufferSize: Long = 1024L * 1024L,
-                        retries: Int = 0) : KafkaProducer = {
+                        retries: Int = 0) : KafkaProducer[Array[Byte],Array[Byte]] = {
     import org.apache.kafka.clients.producer.ProducerConfig
 
     val producerProps = new Properties()
@@ -395,7 +395,7 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100")
     producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
-    return new KafkaProducer(producerProps)
+    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
   }
 
   /**


Mime
View raw message