kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7210: Add system test to verify log compaction (#5226)
Date Mon, 20 Aug 2018 10:47:07 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 914ffa9  KAFKA-7210: Add system test to verify log compaction (#5226)
914ffa9 is described below

commit 914ffa9dbef3c8ad6851b380276a1cb7c5aa4a0d
Author: Manikumar Reddy O <manikumar.reddy@gmail.com>
AuthorDate: Mon Aug 20 16:16:57 2018 +0530

    KAFKA-7210: Add system test to verify log compaction (#5226)
    
    * Updated TestLogCleaning tool to use Java consumer and rename as LogCompactionTester.
    * Enabled the log cleaner in every system test.
    * Removed configs from "kafka.properties" with default values and `socket.receive.buffer.bytes`
    as the override did not seem necessary.
    * Updated `kafka.py` logic to handle duplicates between `kafka.properties` and `server_prop_overrides`.
    * Updated Gradle build so that classes from `kafka-clients` test jar can be used in
    system tests.
    
    Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Ismael Juma <ismael@juma.me.uk>
---
 build.gradle                                       |   3 +
 .../scala/kafka/tools/LogCompactionTester.scala    | 348 +++++++++++++++++++++
 tests/kafkatest/services/kafka/kafka.py            |  36 ++-
 .../services/kafka/templates/kafka.properties      |  12 -
 tests/kafkatest/services/log_compaction_tester.py  |  88 ++++++
 tests/kafkatest/tests/tools/log_compaction_test.py |  66 ++++
 6 files changed, 532 insertions(+), 21 deletions(-)

diff --git a/build.gradle b/build.gradle
index 0892ed1..c1387d4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -775,6 +775,9 @@ project(':core') {
       include('*.jar')
     }
     into "$buildDir/dependant-testlibs"
+    //By default gradle does not handle test dependencies between the sub-projects
+    //This line is to include clients project test jar to dependant-testlibs
+    from (project(':clients').testJar ) { "$buildDir/dependant-testlibs" }
     duplicatesStrategy 'exclude'
   }
 
diff --git a/core/src/test/scala/kafka/tools/LogCompactionTester.scala b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
new file mode 100755
index 0000000..9f53f66
--- /dev/null
+++ b/core/src/test/scala/kafka/tools/LogCompactionTester.scala
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.{Files, Path}
+import java.time.Duration
+import java.util.{Properties, Random}
+
+import joptsimple.OptionParser
+import kafka.utils._
+import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.{CommonClientConfigs, admin}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer}
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.JavaConverters._
+
+/**
+ * This is a torture test that runs against an existing broker
+ *
+ * Here is how it works:
+ *
+ * It produces a series of specially formatted messages to one or more partitions. Each message
it produces
+ * it logs out to a text file. The messages have a limited set of keys, so there is duplication
in the key space.
+ *
+ * The broker will clean its log as the test runs.
+ *
+ * When the specified number of messages have been produced we create a consumer and consume
all the messages in the topic
+ * and write that out to another text file.
+ *
+ * Using a stable unix sort we sort both the producer log of what was sent and the consumer
log of what was retrieved by the message key.
+ * Then we compare the final message in both logs for each key. If this final message is
not the same for all keys we
+ * print an error and exit with exit code 1, otherwise we print the size reduction and exit
with exit code 0.
+ */
+object LogCompactionTester {
+
+  //maximum line size while reading produced/consumed record text file
+  private val ReadAheadLimit = 4906
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser(false)
+    val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(Long.MaxValue)
+    val messageCompressionOpt = parser.accepts("compression-type", "message compression type")
+      .withOptionalArg
+      .describedAs("compressionType")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo("none")
+    val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(5)
+    val brokerOpt = parser.accepts("bootstrap-server", "The server(s) to connect to.")
+      .withRequiredArg
+      .describedAs("url")
+      .ofType(classOf[String])
+    val topicsOpt = parser.accepts("topics", "The number of topics to test.")
+      .withRequiredArg
+      .describedAs("count")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+    val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates
that are deletes.")
+      .withRequiredArg
+      .describedAs("percent")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+    val sleepSecsOpt = parser.accepts("sleep", "Time in milliseconds to sleep between production
and consumption.")
+      .withRequiredArg
+      .describedAs("ms")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+
+    val options = parser.parse(args: _*)
+
+    if (args.length == 0)
+      CommandLineUtils.printUsageAndDie(parser, "A tool to test log compaction. Valid options
are: ")
+
+    CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, numMessagesOpt)
+
+    // parse options
+    val messages = options.valueOf(numMessagesOpt).longValue
+    val compressionType = options.valueOf(messageCompressionOpt)
+    val percentDeletes = options.valueOf(percentDeletesOpt).intValue
+    val dups = options.valueOf(numDupsOpt).intValue
+    val brokerUrl = options.valueOf(brokerOpt)
+    val topicCount = options.valueOf(topicsOpt).intValue
+    val sleepSecs = options.valueOf(sleepSecsOpt).intValue
+
+    val testId = new Random().nextLong
+    val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
+    createTopics(brokerUrl, topics.toSeq)
+
+    println(s"Producing $messages messages..to topics ${topics.mkString(",")}")
+    val producedDataFilePath = produceMessages(brokerUrl, topics, messages, compressionType,
dups, percentDeletes)
+    println(s"Sleeping for $sleepSecs seconds...")
+    Thread.sleep(sleepSecs * 1000)
+    println("Consuming messages...")
+    val consumedDataFilePath = consumeMessages(brokerUrl, topics)
+
+    val producedLines = lineCount(producedDataFilePath)
+    val consumedLines = lineCount(consumedDataFilePath)
+    val reduction = 100 * (1.0 - consumedLines.toDouble / producedLines.toDouble)
+    println(f"$producedLines%d rows of data produced, $consumedLines%d rows of data consumed
($reduction%.1f%% reduction).")
+
+    println("De-duplicating and validating output files...")
+    validateOutput(producedDataFilePath.toFile, consumedDataFilePath.toFile)
+    Utils.delete(producedDataFilePath.toFile)
+    Utils.delete(consumedDataFilePath.toFile)
+    //if you change this line, we need to update test_log_compaction_tool.py system test
+    println("Data verification is completed")
+  }
+
+  def createTopics(brokerUrl: String, topics: Seq[String]): Unit = {
+    val adminConfig = new Properties
+    adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+    val adminClient = admin.AdminClient.create(adminConfig)
+
+    try {
+      val topicConfigs = Map(TopicConfig.CLEANUP_POLICY_CONFIG -> TopicConfig.CLEANUP_POLICY_COMPACT)
+      val newTopics = topics.map(name => new NewTopic(name, 1, 1).configs(topicConfigs.asJava)).asJava
+      adminClient.createTopics(newTopics).all.get
+
+      var pendingTopics: Seq[String] = Seq()
+      TestUtils.waitUntilTrue(() => {
+        val allTopics = adminClient.listTopics.names.get.asScala.toSeq
+        pendingTopics = topics.filter(topicName => !allTopics.contains(topicName))
+        pendingTopics.isEmpty
+      }, s"timed out waiting for topics : $pendingTopics")
+
+    } finally adminClient.close()
+  }
+
+  def lineCount(filPath: Path): Int = Files.readAllLines(filPath).size
+
+  def validateOutput(producedDataFile: File, consumedDataFile: File) {
+    val producedReader = externalSort(producedDataFile)
+    val consumedReader = externalSort(consumedDataFile)
+    val produced = valuesIterator(producedReader)
+    val consumed = valuesIterator(consumedReader)
+
+    val producedDedupedFile = new File(producedDataFile.getAbsolutePath + ".deduped")
+    val producedDeduped : BufferedWriter = Files.newBufferedWriter(producedDedupedFile.toPath,
UTF_8)
+
+    val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + ".deduped")
+    val consumedDeduped : BufferedWriter = Files.newBufferedWriter(consumedDedupedFile.toPath,
UTF_8)
+    var total = 0
+    var mismatched = 0
+    while (produced.hasNext && consumed.hasNext) {
+      val p = produced.next()
+      producedDeduped.write(p.toString)
+      producedDeduped.newLine()
+      val c = consumed.next()
+      consumedDeduped.write(c.toString)
+      consumedDeduped.newLine()
+      if (p != c)
+        mismatched += 1
+      total += 1
+    }
+    producedDeduped.close()
+    consumedDeduped.close()
+    println(s"Validated $total values, $mismatched mismatches.")
+    require(!produced.hasNext, "Additional values produced not found in consumer log.")
+    require(!consumed.hasNext, "Additional values consumed not found in producer log.")
+    require(mismatched == 0, "Non-zero number of row mismatches.")
+    // if all the checks worked out we can delete the deduped files
+    Utils.delete(producedDedupedFile)
+    Utils.delete(consumedDedupedFile)
+  }
+
+  def require(requirement: Boolean, message: => Any) {
+    if (!requirement) {
+      System.err.println(s"Data validation failed : $message")
+      Exit.exit(1)
+    }
+  }
+
+  def valuesIterator(reader: BufferedReader) = {
+    new IteratorTemplate[TestRecord] {
+      def makeNext(): TestRecord = {
+        var next = readNext(reader)
+        while (next != null && next.delete)
+          next = readNext(reader)
+        if (next == null)
+          allDone()
+        else
+          next
+      }
+    }
+  }
+
+  def readNext(reader: BufferedReader): TestRecord = {
+    var line = reader.readLine()
+    if (line == null)
+      return null
+    var curr = TestRecord.parse(line)
+    while (true) {
+      line = peekLine(reader)
+      if (line == null)
+        return curr
+      val next = TestRecord.parse(line)
+      if (next == null || next.topicAndKey != curr.topicAndKey)
+        return curr
+      curr = next
+      reader.readLine()
+    }
+    null
+  }
+
+  def peekLine(reader: BufferedReader) = {
+    reader.mark(ReadAheadLimit)
+    val line = reader.readLine
+    reader.reset()
+    line
+  }
+
+  def externalSort(file: File): BufferedReader = {
+    val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%",
"--temporary-directory=" + Files.createTempDirectory("log_compaction_test"), file.getAbsolutePath)
+    val process = builder.start
+    new Thread() {
+      override def run() {
+        val exitCode = process.waitFor()
+        if (exitCode != 0) {
+          System.err.println("Process exited abnormally.")
+          while (process.getErrorStream.available > 0) {
+            System.err.write(process.getErrorStream().read())
+          }
+        }
+      }
+    }.start()
+    new BufferedReader(new InputStreamReader(process.getInputStream(), UTF_8), 10 * 1024
* 1024)
+  }
+
+  def produceMessages(brokerUrl: String,
+                      topics: Array[String],
+                      messages: Long,
+                      compressionType: String,
+                      dups: Int,
+                      percentDeletes: Int): Path = {
+    val producerProps = new Properties
+    producerProps.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
+    producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
+    val producer = new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer)
+    try {
+      val rand = new Random(1)
+      val keyCount = (messages / dups).toInt
+      val producedFilePath = Files.createTempFile("kafka-log-cleaner-produced-", ".txt")
+      println(s"Logging produce requests to $producedFilePath")
+      val producedWriter: BufferedWriter = Files.newBufferedWriter(producedFilePath, UTF_8)
+      for (i <- 0L until (messages * topics.length)) {
+        val topic = topics((i % topics.length).toInt)
+        val key = rand.nextInt(keyCount)
+        val delete = (i % 100) < percentDeletes
+        val msg =
+          if (delete)
+            new ProducerRecord[Array[Byte], Array[Byte]](topic, key.toString.getBytes(UTF_8),
null)
+          else
+            new ProducerRecord(topic, key.toString.getBytes(UTF_8), i.toString.getBytes(UTF_8))
+        producer.send(msg)
+        producedWriter.write(TestRecord(topic, key, i, delete).toString)
+        producedWriter.newLine()
+      }
+      producedWriter.close()
+      producedFilePath
+    } finally {
+      producer.close()
+    }
+  }
+
+  def createConsumer(brokerUrl: String): KafkaConsumer[String, String] = {
+    val consumerProps = new Properties
+    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
+    consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
+    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    new KafkaConsumer(consumerProps, new StringDeserializer, new StringDeserializer)
+  }
+
+  def consumeMessages(brokerUrl: String, topics: Array[String]): Path = {
+    val consumer = createConsumer(brokerUrl)
+    consumer.subscribe(topics.seq.asJava)
+    val consumedFilePath = Files.createTempFile("kafka-log-cleaner-consumed-", ".txt")
+    println(s"Logging consumed messages to $consumedFilePath")
+    val consumedWriter: BufferedWriter = Files.newBufferedWriter(consumedFilePath, UTF_8)
+
+    try {
+      var done = false
+      while (!done) {
+        val consumerRecords = consumer.poll(Duration.ofSeconds(20))
+        if (!consumerRecords.isEmpty) {
+          for (record <- consumerRecords.asScala) {
+            val delete = record.value == null
+            val value = if (delete) -1L else record.value.toLong
+            consumedWriter.write(TestRecord(record.topic, record.key.toInt, value, delete).toString)
+            consumedWriter.newLine
+          }
+        } else {
+          done = true
+        }
+      }
+      consumedFilePath
+    } finally {
+      consumedWriter.close()
+      consumer.close()
+    }
+  }
+
+  def readString(buffer: ByteBuffer): String = {
+    Utils.utf8(buffer)
+  }
+
+}
+
+case class TestRecord(topic: String, key: Int, value: Long, delete: Boolean) {
+  override def toString = topic + "\t" + key + "\t" + value + "\t" + (if (delete) "d" else
"u")
+  def topicAndKey = topic + key
+}
+
+object TestRecord {
+  def parse(line: String): TestRecord = {
+    val components = line.split("\t")
+    new TestRecord(components(0), components(1).toInt, components(2).toLong, components(3)
== "d")
+  }
+}
\ No newline at end of file
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 8eee575..b0a9faa 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -211,21 +211,39 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
         self.advertised_listeners = ','.join(advertised_listeners)
 
     def prop_file(self, node):
-        cfg = KafkaConfig(**node.config)
-        cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
-        cfg[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting()
+        self.set_protocol_and_port(node)
+
+        #load template configs as dictionary
+        config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node),
+                                 security_config=self.security_config, num_nodes=self.num_nodes)
+
+        configs = dict( l.rstrip().split('=') for l in config_template.split('\n')
+                        if not l.startswith("#") and "=" in l )
+
+        #load specific test override configs
+        override_configs = KafkaConfig(**node.config)
+        override_configs[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
+        override_configs[config_property.ZOOKEEPER_CONNECT] = self.zk_connect_setting()
 
         for prop in self.server_prop_overides:
-            cfg[prop[0]] = prop[1]
+            override_configs[prop[0]] = prop[1]
 
-        self.set_protocol_and_port(node)
+        #update template configs with test override configs
+        configs.update(override_configs)
 
-        # TODO - clean up duplicate configuration logic
-        prop_file = cfg.render()
-        prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
-                                 security_config=self.security_config, num_nodes=self.num_nodes)
+        prop_file = self.render_configs(configs)
         return prop_file
 
+    def render_configs(self, configs):
+        """Render self as a series of lines key=val\n, and do so in a consistent order. """
+        keys = [k for k in configs.keys()]
+        keys.sort()
+
+        s = ""
+        for k in keys:
+            s += "%s=%s\n" % (k, str(configs[k]))
+        return s
+
     def start_cmd(self, node):
         cmd = "export JMX_PORT=%d; " % self.jmx_port
         cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties
index 8cca14f..dd777f9 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -20,18 +20,6 @@ advertised.host.name={{ node.account.hostname }}
 listeners={{ listeners }}
 advertised.listeners={{ advertised_listeners }}
 
-num.network.threads=3
-num.io.threads=8
-socket.send.buffer.bytes=102400
-socket.receive.buffer.bytes=65536
-socket.request.max.bytes=104857600
-
-num.partitions=1
-num.recovery.threads.per.data.dir=1
-log.retention.hours=168
-log.segment.bytes=1073741824
-log.cleaner.enable=false
-
 security.inter.broker.protocol={{ security_config.interbroker_security_protocol }}
 
 ssl.keystore.location=/mnt/security/test.keystore.jks
diff --git a/tests/kafkatest/services/log_compaction_tester.py b/tests/kafkatest/services/log_compaction_tester.py
new file mode 100644
index 0000000..4a19650
--- /dev/null
+++ b/tests/kafkatest/services/log_compaction_tester.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, CORE_LIBS_JAR_NAME,
CORE_DEPENDANT_TEST_LIBS_JAR_NAME
+from kafkatest.services.security.security_config import SecurityConfig
+from kafkatest.version import DEV_BRANCH
+
+class LogCompactionTester(KafkaPathResolverMixin, BackgroundThreadService):
+
+    OUTPUT_DIR = "/mnt/logcompaction_tester"
+    LOG_PATH = os.path.join(OUTPUT_DIR, "logcompaction_tester_stdout.log")
+    VERIFICATION_STRING = "Data verification is completed"
+
+    logs = {
+        "tool_logs": {
+            "path": LOG_PATH,
+            "collect_default": True}
+    }
+
+    def __init__(self, context, kafka, security_protocol="PLAINTEXT", stop_timeout_sec=30):
+        super(LogCompactionTester, self).__init__(context, 1)
+
+        self.kafka = kafka
+        self.security_protocol = security_protocol
+        self.security_config = SecurityConfig(self.context, security_protocol)
+        self.stop_timeout_sec = stop_timeout_sec
+        self.log_compaction_completed = False
+
+    def _worker(self, idx, node):
+        node.account.ssh("mkdir -p %s" % LogCompactionTester.OUTPUT_DIR)
+        cmd = self.start_cmd(node)
+        self.logger.info("LogCompactionTester %d command: %s" % (idx, cmd))
+        self.security_config.setup_node(node)
+        for line in node.account.ssh_capture(cmd):
+            self.logger.debug("Checking line:{}".format(line))
+
+            if line.startswith(LogCompactionTester.VERIFICATION_STRING):
+                self.log_compaction_completed = True
+
+    def start_cmd(self, node):
+        core_libs_jar = self.path.jar(CORE_LIBS_JAR_NAME, DEV_BRANCH)
+        core_dependant_test_libs_jar = self.path.jar(CORE_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)
+
+        cmd = "for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_libs_jar
+        cmd += " for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_dependant_test_libs_jar
+        cmd += " export CLASSPATH;"
+        cmd += self.path.script("kafka-run-class.sh", node)
+        cmd += " %s" % self.java_class_name()
+        cmd += " --bootstrap-server %s --messages 1000000 --sleep 20 --duplicates 10 --percent-deletes
10" % (self.kafka.bootstrap_servers(self.security_protocol))
+
+        cmd += " 2>> %s | tee -a %s &" % (self.logs["tool_logs"]["path"], self.logs["tool_logs"]["path"])
+        return cmd
+
+    def stop_node(self, node):
+        node.account.kill_java_processes(self.java_class_name(), clean_shutdown=True,
+                                         allow_fail=True)
+
+        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+        assert stopped, "Node %s: did not stop within the specified timeout of %s seconds"
% \
+                        (str(node.account), str(self.stop_timeout_sec))
+
+    def clean_node(self, node):
+        node.account.kill_java_processes(self.java_class_name(), clean_shutdown=False,
+                                         allow_fail=True)
+        node.account.ssh("rm -rf %s" % LogCompactionTester.OUTPUT_DIR, allow_fail=False)
+
+    def java_class_name(self):
+        return "kafka.tools.LogCompactionTester"
+
+    @property
+    def is_done(self):
+        return self.log_compaction_completed
diff --git a/tests/kafkatest/tests/tools/log_compaction_test.py b/tests/kafkatest/tests/tools/log_compaction_test.py
new file mode 100644
index 0000000..338060f
--- /dev/null
+++ b/tests/kafkatest/tests/tools/log_compaction_test.py
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.kafka import config_property
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.log_compaction_tester import LogCompactionTester
+
+class LogCompactionTest(Test):
+
+    # Configure smaller segment size to create more segments for compaction
+    LOG_SEGMENT_BYTES = "1024000"
+
+    def __init__(self, test_context):
+        super(LogCompactionTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.kafka = None
+        self.compaction_verifier = None
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context,
+            num_nodes = self.num_brokers,
+            zk = self.zk,
+            security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol,
+            server_prop_overides=[
+                [config_property.LOG_SEGMENT_BYTES, LogCompactionTest.LOG_SEGMENT_BYTES],
+            ])
+        self.kafka.start()
+
+    def start_test_log_compaction_tool(self, security_protocol):
+        self.compaction_verifier = LogCompactionTester(self.test_context, self.kafka, security_protocol=security_protocol)
+        self.compaction_verifier.start()
+
+    @cluster(num_nodes=4)
+    def test_log_compaction(self, security_protocol='PLAINTEXT'):
+
+        self.start_kafka(security_protocol, security_protocol)
+        self.start_test_log_compaction_tool(security_protocol)
+
+        # Verify that compacted data verification completed in LogCompactionTester
+        wait_until(lambda: self.compaction_verifier.is_done, timeout_sec=180, err_msg="Timed
out waiting to complete compaction")


Mime
View raw message