Repository: kafka
Updated Branches:
refs/heads/trunk 4af50bb86 -> ac99a3c86
KAFKA-4145; Avoid redundant integration testing in ProducerSendTests
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Ismael Juma <ismael@juma.me.uk>
Closes #1842 from vahidhashemian/KAFKA-4145
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ac99a3c8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ac99a3c8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ac99a3c8
Branch: refs/heads/trunk
Commit: ac99a3c86d5d8a56b61b1e0d1b2594e627016718
Parents: 4af50bb
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Sat Sep 10 08:16:23 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Sep 10 08:16:23 2016 +0100
----------------------------------------------------------------------
.../kafka/api/BaseProducerSendTest.scala | 73 +------------------
.../kafka/api/PlaintextProducerSendTest.scala | 74 +++++++++++++++++++-
2 files changed, 76 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ac99a3c8/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index b5a1284..5bb0438 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -18,7 +18,7 @@
package kafka.api
import java.util.Properties
-import java.util.concurrent.{ExecutionException, TimeUnit}
+import java.util.concurrent.TimeUnit
import kafka.consumer.SimpleConsumer
import kafka.integration.KafkaServerTestHarness
@@ -27,7 +27,6 @@ import kafka.message.Message
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException}
import org.apache.kafka.common.record.TimestampType
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -70,7 +69,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
super.tearDown()
}
- private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props:
Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
+ protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0,
props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol,
trustStoreFile = trustStoreFile,
saslProperties = saslProperties, retries = retries, lingerMs = lingerMs, props = props)
registerProducer(producer)
@@ -170,21 +169,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME)
}
- @Test
- def testSendCompressedMessageWithLogAppendTime() {
- val producerProps = new Properties()
- producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
- val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props
= Some(producerProps))
- sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
- }
-
- @Test
- def testSendNonCompressedMessageWithLogApendTime() {
- val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
- sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
- }
-
- private def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType:
TimestampType) {
+ protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]],
timestampType: TimestampType) {
val partition = new Integer(0)
val baseTimestamp = 123456L
@@ -317,28 +302,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
/**
- * testAutoCreateTopic
- *
- * The topic should be created upon sending the first message
- */
- @Test
- def testAutoCreateTopic() {
- val producer = createProducer(brokerList, retries = 5)
-
- try {
- // Send a message to auto-create the topic
- val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes,
"value".getBytes)
- assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
-
- // double check that the topic is created with leader elected
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
-
- } finally {
- producer.close()
- }
- }
-
- /**
* Test that flush immediately sends all accumulated requests.
*/
@Test
@@ -446,34 +409,4 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
}
}
- @Test
- def testSendWithInvalidCreateTime() {
- val topicProps = new Properties()
- topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
- TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
-
- val producer = createProducer(brokerList = brokerList)
- try {
- producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes,
"value".getBytes)).get()
- fail("Should throw CorruptedRecordException")
- } catch {
- case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
- } finally {
- producer.close()
- }
-
- // Test compressed messages.
- val producerProps = new Properties()
- producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
- val compressedProducer = createProducer(brokerList = brokerList, props = Some(producerProps))
- try {
- compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001,
"key".getBytes, "value".getBytes)).get()
- fail("Should throw CorruptedRecordException")
- } catch {
- case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
- } finally {
- compressedProducer.close()
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ac99a3c8/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 55fdbe3..734eb66 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -18,11 +18,17 @@
package kafka.api
import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import kafka.log.LogConfig
+import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.errors.SerializationException
+import org.apache.kafka.common.errors.{InvalidTimestampException, SerializationException}
+import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.junit.Assert._
import org.junit.Test
class PlaintextProducerSendTest extends BaseProducerSendTest {
@@ -65,6 +71,72 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
}
}
+ @Test
+ def testSendCompressedMessageWithLogAppendTime() {
+ val producerProps = new Properties()
+ producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
+ val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props
= Some(producerProps))
+ sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
+ }
+
+ @Test
+ def testSendNonCompressedMessageWithLogAppendTime() {
+ val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue)
+ sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME)
+ }
+
+ /**
+ * testAutoCreateTopic
+ *
+ * The topic should be created upon sending the first message
+ */
+ @Test
+ def testAutoCreateTopic() {
+ val producer = createProducer(brokerList, retries = 5)
+
+ try {
+ // Send a message to auto-create the topic
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, null, "key".getBytes,
"value".getBytes)
+ assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
+
+ // double check that the topic is created with leader elected
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+
+ } finally {
+ producer.close()
+ }
+ }
+
+ @Test
+ def testSendWithInvalidCreateTime() {
+ val topicProps = new Properties()
+ topicProps.setProperty(LogConfig.MessageTimestampDifferenceMaxMsProp, "1000")
+ TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps)
+
+ val producer = createProducer(brokerList = brokerList)
+ try {
+ producer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001, "key".getBytes,
"value".getBytes)).get()
+ fail("Should throw CorruptedRecordException")
+ } catch {
+ case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+ } finally {
+ producer.close()
+ }
+
+ // Test compressed messages.
+ val producerProps = new Properties()
+ producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip")
+ val compressedProducer = createProducer(brokerList = brokerList, props = Some(producerProps))
+ try {
+ compressedProducer.send(new ProducerRecord(topic, 0, System.currentTimeMillis() - 1001,
"key".getBytes, "value".getBytes)).get()
+ fail("Should throw CorruptedRecordException")
+ } catch {
+ case e: ExecutionException => assertTrue(e.getCause.isInstanceOf[InvalidTimestampException])
+ } finally {
+ compressedProducer.close()
+ }
+ }
+
private def createProducerWithWrongSerializer(brokerList: String): KafkaProducer[Array[Byte],
Array[Byte]] = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
|