kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2453: Enable new consumer in EndToEndLatency
Date Tue, 08 Sep 2015 22:15:57 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f25731265 -> 926297566


KAFKA-2453: Enable new consumer in EndToEndLatency

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Gwen Shapira, Jason Gustafson

Closes #158 from benstopford/KAFKA-2453b


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/92629756
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/92629756
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/92629756

Branch: refs/heads/trunk
Commit: 92629756616849d62dde3d09c7ddd9bcf5f7326c
Parents: f257312
Author: Ben Stopford <benstopford@gmail.com>
Authored: Tue Sep 8 15:15:51 2015 -0700
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Tue Sep 8 15:15:51 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/tools/EndToEndLatency.scala     | 137 +++++++++++++------
 tests/kafkatest/services/performance.py         |   4 +-
 2 files changed, 98 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/92629756/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index 7bb69b7..cbaed0a 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,74 +19,129 @@ package kafka.tools
 
 import java.util.{Arrays, Properties}
 
-import kafka.consumer._
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.consumer.{CommitType, ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.utils.Utils
 
-import scala.Option.option2Iterable
+import scala.collection.JavaConversions._
+
+
+/**
+ * This class records the average end to end latency for a single message to travel through
Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 10000 1 20]
+ */
 
 object EndToEndLatency {
+  private val timeout: Long = 60000
+
   def main(args: Array[String]) {
-    if (args.length != 6) {
-      System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect
topic num_messages consumer_fetch_max_wait producer_acks")
+    println(args.length)
+    if (args.length != 5 && args.length != 6) {
+      System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages
producer_acks message_size_bytes [optional] ssl_properties_file")
       System.exit(1)
     }
 
     val brokerList = args(0)
-    val zkConnect = args(1)
-    val topic = args(2)
-    val numMessages = args(3).toInt
-    val consumerFetchMaxWait = args(4).toInt
-    val producerAcks = args(5).toInt
-
-    val consumerProps = new Properties()
-    consumerProps.put("group.id", topic)
-    consumerProps.put("auto.commit.enable", "false")
-    consumerProps.put("auto.offset.reset", "largest")
-    consumerProps.put("zookeeper.connect", zkConnect)
-    consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString)
-    consumerProps.put("socket.timeout.ms", 1201000.toString)
-
-    val config = new ConsumerConfig(consumerProps)
-    val connector = Consumer.create(config)
-    val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
-    val iter = stream.iterator
-
-    val producerProps = new Properties()
+    val topic = args(1)
+    val numMessages = args(2).toInt
+    val producerAcks = args(3)
+    val messageLen = args(4).toInt
+    val sslPropsFile = if (args.length == 6) args(5) else ""
+
+    if (!List("1", "all").contains(producerAcks))
+      throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement.
Please use 1 or all")
+
+    val consumerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile)
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis())
+    consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
+    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
+    consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal
batching
+
+    val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
+    consumer.subscribe(List(topic))
+
+    val producerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile)
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
+    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous
     producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
     producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
-    val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+    val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
+
+    def finalise() {
+      consumer.commit(CommitType.SYNC)
+      producer.close()
+      consumer.close()
+    }
 
-    // make sure the consumer fetcher has started before sending data since otherwise
-    // the consumption from the tail will skip the first message and hence be blocked
-    Thread.sleep(5000)
+    //Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually
performs the seek only when
+    //a poll() or position() request is issued. Hence we need to poll after we seek to ensure
we see our first write.
+    consumer.seekToEnd()
+    consumer.poll(0)
 
-    val message = "hello there beautiful".getBytes
     var totalTime = 0.0
     val latencies = new Array[Long](numMessages)
+
     for (i <- 0 until numMessages) {
+      val message = randomBytesOfLen(messageLen)
       val begin = System.nanoTime
-      producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message))
-      val received = iter.next
+
+      //Send message (of random bytes) synchronously then immediately poll for it
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get()
+      val recordIter = consumer.poll(timeout).iterator
+
       val elapsed = System.nanoTime - begin
-      // poor man's progress bar
+
+      //Check we got results
+      if (!recordIter.hasNext) {
+        finalise()
+        throw new RuntimeException(s"poll() timed out before finding a result (timeout:[$timeout])")
+      }
+
+      //Check result matches the original record
+      val sent = new String(message)
+      val read = new String(recordIter.next().value())
+      if (!read.equals(sent)) {
+        finalise()
+        throw new RuntimeException(s"The message read [$read] did not match the message sent
[$sent]")
+      }
+
+      //Check we only got the one message
+      if (recordIter.hasNext) {
+        var count = 1
+        for (elem <- recordIter) count += 1
+        throw new RuntimeException(s"Only one result was expected during this test. We found
[$count]")
+      }
+
+      //Report progress
       if (i % 1000 == 0)
         println(i + "\t" + elapsed / 1000.0 / 1000.0)
       totalTime += elapsed
-      latencies(i) = (elapsed / 1000 / 1000)
+      latencies(i) = elapsed / 1000 / 1000
     }
+
+    //Results
     println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0))
     Arrays.sort(latencies)
     val p50 = latencies((latencies.length * 0.5).toInt)
-    val p99 = latencies((latencies.length * 0.99).toInt) 
+    val p99 = latencies((latencies.length * 0.99).toInt)
     val p999 = latencies((latencies.length * 0.999).toInt)
     println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999))
-    producer.close()
-    connector.commitOffsets(true)
-    connector.shutdown()
-    System.exit(0)
+
+    finalise()
+  }
+
+  def randomBytesOfLen(len: Int): Array[Byte] = {
+    Array.fill(len)((scala.util.Random.nextInt(26) + 65).toByte)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/92629756/tests/kafkatest/services/performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance.py b/tests/kafkatest/services/performance.py
index 65c1a4d..34892e0 100644
--- a/tests/kafkatest/services/performance.py
+++ b/tests/kafkatest/services/performance.py
@@ -127,8 +127,8 @@ class EndToEndLatencyService(PerformanceService):
             'bootstrap_servers': self.kafka.bootstrap_servers(),
         })
         cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
-              "%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\
-              "%(consumer_fetch_max_wait)d %(acks)d" % args
+              "%(bootstrap_servers)s %(topic)s %(num_records)d "\
+              "%(acks)d 20" % args
         self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
         results = {}
         for line in node.account.ssh_capture(cmd):


Mime
View raw message