kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4145; Avoid redundant integration testing in ProducerSendTests
Date Sat, 10 Sep 2016 07:16:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4af50bb86 -> ac99a3c86


KAFKA-4145; Avoid redundant integration testing in ProducerSendTests

Author: Vahid Hashemian <vahidhashemian@us.ibm.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #1842 from vahidhashemian/KAFKA-4145


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ac99a3c8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ac99a3c8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ac99a3c8

Branch: refs/heads/trunk
Commit: ac99a3c86d5d8a56b61b1e0d1b2594e627016718
Parents: 4af50bb
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Sat Sep 10 08:16:23 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Sep 10 08:16:23 2016 +0100

----------------------------------------------------------------------
 .../kafka/api/BaseProducerSendTest.scala        | 73 +------------------
 .../kafka/api/PlaintextProducerSendTest.scala   | 74 +++++++++++++++++++-
 2 files changed, 76 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ac99a3c8/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index b5a1284..5bb0438 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -18,7 +18,7 @@
 package kafka.api
 
 import java.util.Properties
-import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.util.concurrent.TimeUnit
 
 import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
@@ -27,7 +27,6 @@ import kafka.message.Message
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException}
 import org.apache.kafka.common.record.TimestampType
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -70,7 +69,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     super.tearDown()
   }
 
-  private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props:
Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
+  protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0,
props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
     val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol,
trustStoreFile = trustStoreFile,
       saslProperties = saslProperties, retries = retries, lingerMs = lingerMs, props = props)
     registerProducer(producer)
@@ -170,21 +169,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
   }
 
-  @Test
-  def testSendCompressedMessageWithLogAppendTime() {
-    val producerProps = new Properties()
-    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
-    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props
= Some(producerProps))
-    sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
-  }
-
-  @Test
-  def testSendNonCompressedMessageWithLogApendTime() {
-    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
-    sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
-  }
-
-  private def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType:
TimestampType) {
+  protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]],
timestampType: TimestampType) {
     val partition = new Integer(0)
 
     val baseTimestamp = 123456L
@@ -317,28 +302,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   }
 
   /**
-   * testAutoCreateTopic
-   *
-   * The topic should be created upon sending the first message
-   */
-  @Test
-  def testAutoCreateTopic() {
-    val producer = createProducer(brokerList, retries = 5)
-
-    try {
-      // Send a message to auto-create the topic
-      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes,
"value".getBytes)
-      assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
-
-      // double check that the topic is created with leader elected
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
-
-    } finally {
-      producer.close()
-    }
-  }
-
-  /**
    * Test that flush immediately sends all accumulated requests.
    */
   @Test
@@ -446,34 +409,4 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     }
   }
 
-  @Test
-  def testSendWithInvalidCreateTime() {
-    val topicProps = new Properties()
-    topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
-    TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
-
-    val producer = createProducer(brokerList = brokerList)
-    try {
-      producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes,
"value".getBytes)).get()
-      fail("Should throw CorruptedRecordException")
-    } catch {
-      case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
-    } finally {
-      producer.close()
-    }
-
-    // Test compressed messages.
-    val producerProps = new Properties()
-    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
-    val compressedProducer = createProducer(brokerList = brokerList, props = Some(producerProps))
-    try {
-      compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001,
"key".getBytes, "value".getBytes)).get()
-      fail("Should throw CorruptedRecordException")
-    } catch {
-      case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
-    } finally {
-      compressedProducer.close()
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ac99a3c8/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 55fdbe3..734eb66 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -18,11 +18,17 @@
 package kafka.api
 
 import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import kafka.log.LogConfig
+import kafka.utils.TestUtils
 
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.errors.SerializationException
+import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException}
+import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.junit.Assert._
 import org.junit.Test
 
 class PlaintextProducerSendTest extends BaseProducerSendTest {
@@ -65,6 +71,72 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     }
   }
 
+  @Test
+  def testSendCompressedMessageWithLogAppendTime() {
+    val producerProps = new Properties()
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
+    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props
= Some(producerProps))
+    sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
+  }
+
+  @Test
+  def testSendNonCompressedMessageWithLogAppendTime() {
+    val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
+    sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
+  }
+
+  /**
+   * testAutoCreateTopic
+   *
+   * The topic should be created upon sending the first message
+   */
+  @Test
+  def testAutoCreateTopic() {
+    val producer = createProducer(brokerList, retries = 5)
+
+    try {
+      // Send a message to auto-create the topic
+      val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes,
"value".getBytes)
+      assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
+
+      // double check that the topic is created with leader elected
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+
+    } finally {
+      producer.close()
+    }
+  }
+
+  @Test
+  def testSendWithInvalidCreateTime() {
+    val topicProps = new Properties()
+    topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
+    TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
+
+    val producer = createProducer(brokerList = brokerList)
+    try {
+      producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes,
"value".getBytes)).get()
+      fail("Should throw CorruptedRecordException")
+    } catch {
+      case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+    } finally {
+      producer.close()
+    }
+
+    // Test compressed messages.
+    val producerProps = new Properties()
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
+    val compressedProducer = createProducer(brokerList = brokerList, props = Some(producerProps))
+    try {
+      compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001,
"key".getBytes, "value".getBytes)).get()
+      fail("Should throw CorruptedRecordException")
+    } catch {
+      case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+    } finally {
+      compressedProducer.close()
+    }
+  }
+
   private def createProducerWithWrongSerializer(brokerList: String): KafkaProducer[Array[Byte],
Array[Byte]] = {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)


Mime
View raw message