kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4318; Migrate ProducerSendTest to the new consumer
Date Tue, 03 Jan 2017 16:08:02 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 252e7e958 -> 7411ed89b


KAFKA-4318; Migrate ProducerSendTest to the new consumer

Author: Balint Molnar <balintmolnar91@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2083 from baluchicken/KAFKA-4318


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7411ed89
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7411ed89
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7411ed89

Branch: refs/heads/trunk
Commit: 7411ed89be151be4b3df6f0d2adab956b79e2885
Parents: 252e7e9
Author: Balint Molnar <balintmolnar91@gmail.com>
Authored: Tue Jan 3 14:13:50 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Jan 3 15:47:44 2017 +0000

----------------------------------------------------------------------
 .../kafka/api/BaseProducerSendTest.scala        | 94 +++++++++-----------
 1 file changed, 40 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7411ed89/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index b5aaaf4..82409bb 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -20,21 +20,21 @@ package kafka.api
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 
+import collection.JavaConverters._
 import kafka.admin.AdminUtils
-import kafka.consumer.SimpleConsumer
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig
-import kafka.message.Message
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import kafka.utils.TestUtils._
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.record.TimestampType
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
-import scala.collection.mutable.Buffer
+import scala.collection.mutable.{ArrayBuffer, Buffer}
 
 abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
@@ -46,8 +46,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
       trustStoreFile = trustStoreFile, saslProperties = saslProperties).map(KafkaConfig.fromProps(_,
overridingProps))
   }
 
-  private var consumer1: SimpleConsumer = null
-  private var consumer2: SimpleConsumer = null
+  private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
   private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
   protected val topic = "topic"
@@ -56,16 +55,12 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-
-    // TODO: we need to migrate to new consumers when 0.9 is final
-    consumer1 = new SimpleConsumer("localhost", servers.head.boundPort(), 100, 1024 * 1024,
"")
-    consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024 * 1024,
"")
+    consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
securityProtocol = SecurityProtocol.PLAINTEXT)
   }
 
   @After
   override def tearDown() {
-    consumer1.close()
-    consumer2.close()
+    consumer.close()
     // Ensure that all producers are closed since unclosed producers impact other tests when
Kafka server ports are reused
     producers.foreach(_.close())
 
@@ -83,6 +78,15 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     producer
   }
 
+  private def pollUntilNumRecords(numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]]
= {
+    val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    TestUtils.waitUntilTrue(() => {
+      records ++= consumer.poll(50).asScala
+      records.size == numRecords
+    }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
+    records
+  }
+
   /**
    * testSendOffset checks the basic send API behavior
    *
@@ -260,8 +264,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val producer = createProducer(brokerList)
 
     try {
-      // create topic
-      val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
+      TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
       val partition = 1
 
       val now = System.currentTimeMillis()
@@ -276,21 +279,20 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         assertEquals(partition, recordMetadata.partition)
       }
 
-      val leader1 = leaders(partition)
+      consumer.assign(List(new TopicPartition(topic, partition)).asJava)
+
       // make sure the fetched messages also respect the partitioning and ordering
-      val fetchResponse1 = if (leader1.get == configs.head.brokerId) {
-        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
-      } else {
-        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build())
+      val records = pollUntilNumRecords(numRecords)
+
+      records.zipWithIndex.foreach { case (record, i) =>
+        assertEquals(topic, record.topic)
+        assertEquals(partition, record.partition)
+        assertEquals(i.toLong, record.offset)
+        assertNull(record.key)
+        assertEquals(s"value${i + 1}", new String(record.value))
+        assertEquals(now, record.timestamp)
       }
-      val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer
-      assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size)
 
-      // TODO: also check topic and partition after they are added in the return messageSet
-      for (i <- 0 until numRecords) {
-        assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes, now, Message.MagicValue_V1),
messageSet1(i).message)
-        assertEquals(i.toLong, messageSet1(i).offset)
-      }
     } finally {
       producer.close()
     }
@@ -384,12 +386,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    */
   @Test
   def testCloseWithZeroTimeoutFromCallerThread() {
-    // create topic
-    val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
-    val leader0 = leaders(0)
-
-    // create record
-    val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
+    TestUtils.createTopic(zkUtils, topic, 2, 2, servers)
+    val partition = 0
+    consumer.assign(List(new TopicPartition(topic, partition)).asJava)
+    val record0 = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes)
 
     // Test closing from caller thread.
     for (_ <- 0 until 50) {
@@ -406,12 +406,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
             assertEquals("java.lang.IllegalStateException: Producer is closed forcefully.",
e.getMessage)
         }
       }
-      val fetchResponse = if (leader0.get == configs.head.brokerId) {
-        consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
-      } else {
-        consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
-      }
-      assertEquals("Fetch response should have no message returned.", 0, fetchResponse.messageSet(topic,
0).size)
+      assertEquals("Fetch response should have no message returned.", 0, consumer.poll(50).count)
     }
   }
 
@@ -420,12 +415,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
    */
   @Test
   def testCloseWithZeroTimeoutFromSenderThread() {
-    // create topic
-    val leaders = TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
-    val leader = leaders(0)
-
-    // create record
-    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0, null, "value".getBytes)
+    TestUtils.createTopic(zkUtils, topic, 1, 2, servers)
+    val partition = 0
+    consumer.assign(List(new TopicPartition(topic, partition)).asJava)
+    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, partition, null, "value".getBytes)
 
     // Test closing from sender thread.
     class CloseCallback(producer: KafkaProducer[Array[Byte], Array[Byte]], sendRecords: Boolean)
extends Callback {
@@ -450,16 +443,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
         assertTrue("No request is complete.", responses.forall(!_.isDone()))
         // flush the messages.
         producer.flush()
-        assertTrue("All request are complete.", responses.forall(_.isDone()))
+        assertTrue("All requests are complete.", responses.forall(_.isDone()))
         // Check the messages received by broker.
-        val fetchResponse = if (leader.get == configs.head.brokerId) {
-          consumer1.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
-        } else {
-          consumer2.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, Int.MaxValue).build())
-        }
-        val expectedNumRecords = (i + 1) * numRecords
-        assertEquals("Fetch response to partition 0 should have %d messages.".format(expectedNumRecords),
-          expectedNumRecords, fetchResponse.messageSet(topic, 0).size)
+        pollUntilNumRecords(numRecords)
       } finally {
         producer.close()
       }


Mime
View raw message