kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2669; Fix LogCleaner.awaitCleaned for LogCleanerIntegrationTest
Date Mon, 19 Oct 2015 21:45:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 674543525 -> 343db8a7f


KAFKA-2669; Fix LogCleaner.awaitCleaned for LogCleanerIntegrationTest

LogCleanerIntegrationTest calls LogCleaner.awaitCleaned() to wait until cleaner has processed
up to given offset. However, existing awaitCleaned() implementation doesn't wait for this.
This patch fix the problem.

Author: Dong Lin <lindong@cis.upenn.edu>
Author: Dong Lin <lindong28@gmail.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #327 from lindong28/KAFKA-2669


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/343db8a7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/343db8a7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/343db8a7

Branch: refs/heads/trunk
Commit: 343db8a7f4d22a593f0aecc79d55869350803889
Parents: 6745435
Author: Dong Lin <lindong@cis.upenn.edu>
Authored: Mon Oct 19 14:50:18 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Oct 19 14:50:18 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 21 ++++++++++++++++----
 .../unit/kafka/admin/DeleteTopicTest.scala      |  2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   | 17 +++++++++++-----
 .../scala/unit/kafka/utils/MockScheduler.scala  |  1 +
 4 files changed, 31 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/343db8a7/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index b36ea0d..16dd945 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -149,12 +149,25 @@ class LogCleaner(val config: CleanerConfig,
   }
 
   /**
-   * For testing, a way to know when work has completed. This method blocks until the
+   * For testing, a way to know when work has completed. This method waits until the
    * cleaner has processed up to the given offset on the specified topic/partition
+   *
+   * @param topic The Topic to be cleaned
+   * @param part The partition of the topic to be cleaned
+   * @param offset The first dirty offset that the cleaner doesn't have to clean
+   * @param maxWaitMs The maximum time in ms to wait for cleaner
+   *
+   * @return A boolean indicating whether the work has completed before timeout
    */
-  def awaitCleaned(topic: String, part: Int, offset: Long, timeout: Long = 30000L): Unit
= {
-    while(!cleanerManager.allCleanerCheckpoints.contains(TopicAndPartition(topic, part)))
-      Thread.sleep(10)
+  def awaitCleaned(topic: String, part: Int, offset: Long, maxWaitMs: Long = 60000L): Boolean
= {
+    def isCleaned = cleanerManager.allCleanerCheckpoints.get(TopicAndPartition(topic, part)).fold(false)(_
>= offset)
+    var remainingWaitMs = maxWaitMs
+    while (!isCleaned && remainingWaitMs > 0) {
+      val sleepTime = math.min(100, remainingWaitMs)
+      Thread.sleep(sleepTime)
+      remainingWaitMs -= sleepTime
+    }
+    isCleaned
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/343db8a7/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 383cb44..d28ca69 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -240,7 +240,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     writeDups(numKeys = 100, numDups = 3,log)
 
     // wait for cleaner to clean
-   server.logManager.cleaner.awaitCleaned(topicName,0,0)
+   server.logManager.cleaner.awaitCleaned(topicName, 0, 0)
 
     // delete topic
     AdminUtils.deleteTopic(zkUtils, "test")

http://git-wip-us.apache.org/repos/asf/kafka/blob/343db8a7/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 70beb5f..d7f3156 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -56,9 +56,12 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     val startSize = log.size
     cleaner.startup()
 
-    val lastCleaned = log.activeSegment.baseOffset
+    val firstDirty = log.activeSegment.baseOffset
     // wait until we clean up to base_offset of active segment - minDirtyMessages
-    cleaner.awaitCleaned("log", 0, lastCleaned)
+    cleaner.awaitCleaned("log", 0, firstDirty)
+
+    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log",
0)).get
+    assertTrue("log cleaner should have processed up to offset " + firstDirty, lastCleaned
>= firstDirty);
     
     val read = readFromLog(log)
     assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap)
@@ -66,8 +69,12 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
     // write some more stuff and validate again
     val appends2 = appends ++ writeDups(numKeys = 100, numDups = 3, log, CompressionCodec.getCompressionCodec(compressionCodec))
-    val lastCleaned2 = log.activeSegment.baseOffset
-    cleaner.awaitCleaned("log", 0, lastCleaned2)
+    val firstDirty2 = log.activeSegment.baseOffset
+    cleaner.awaitCleaned("log", 0, firstDirty2)
+
+    val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log",
0)).get
+    assertTrue("log cleaner should have processed up to offset " + firstDirty2, lastCleaned2
>= firstDirty2);
+
     val read2 = readFromLog(log)
     assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
 
@@ -82,7 +89,6 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
 
     // we expect partition 0 to be gone
     assert(!checkpoints.contains(topics(0)))
-    
     cleaner.shutdown()
   }
 
@@ -111,6 +117,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     
   @After
   def teardown() {
+    time.scheduler.shutdown()
     CoreUtils.rm(logDir)
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/343db8a7/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index eeafeda..434c22a 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -43,6 +43,7 @@ class MockScheduler(val time: Time) extends Scheduler {
   
   def shutdown() {
     this synchronized {
+      tasks.foreach(_.fun())
       tasks.clear()
     }
   }


Mime
View raw message