kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2977: Transient Failure in kafka.log.LogCleanerIntegrationTest.cleanerTest
Date Mon, 21 Dec 2015 06:07:15 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 d5698fa31 -> 9765354c8


KAFKA-2977: Transient Failure in kafka.log.LogCleanerIntegrationTest.cleanerTest

Make MinCleanableDirtyRatioProp configurable(default 0.0F)in makeCleaner, thus log cleaning
is always undergoing;
Also removed minDirtyMessages.

Author: jinxing <jinxing@fenbi.com>
Author: ZoneMayor <jinxing6042@126.com>

Reviewers: Ismael Juma, Guozhang Wang

Closes #671 from ZoneMayor/trunk-KAFKA-2977

(cherry picked from commit d2632d011f7bda3cbd0cbcc8f2a3bd1e985d5b1b)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9765354c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9765354c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9765354c

Branch: refs/heads/0.9.0
Commit: 9765354c857bec8ab7625854f2fb68e0fad6c339
Parents: d5698fa
Author: Jin Xing <jinxing@fenbi.com>
Authored: Sun Dec 20 22:06:56 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Dec 20 22:07:11 2015 -0800

----------------------------------------------------------------------
 .../unit/kafka/log/LogCleanerIntegrationTest.scala     | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9765354c/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index d7f3156..de3d7a3 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -57,11 +57,12 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     cleaner.startup()
 
     val firstDirty = log.activeSegment.baseOffset
-    // wait until we clean up to base_offset of active segment - minDirtyMessages
+    // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty
ratio" is higher than LogConfig.MinCleanableDirtyRatioProp
     cleaner.awaitCleaned("log", 0, firstDirty)
-
+    val compactedSize = log.logSegments.map(_.size).sum
     val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log",
0)).get
-    assertTrue("log cleaner should have processed up to offset " + firstDirty, lastCleaned
>= firstDirty);
+    assertTrue(s"log cleaner should have processed up to offset $firstDirty, but lastCleaned=$lastCleaned",
lastCleaned >= firstDirty)
+    assertTrue(s"log should have been compacted:  startSize=$startSize compactedSize=$compactedSize",
startSize > compactedSize)
     
     val read = readFromLog(log)
     assertEquals("Contents of the map shouldn't change.", appends.toMap, read.toMap)
@@ -73,7 +74,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
     cleaner.awaitCleaned("log", 0, firstDirty2)
 
     val lastCleaned2 = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log",
0)).get
-    assertTrue("log cleaner should have processed up to offset " + firstDirty2, lastCleaned2
>= firstDirty2);
+    assertTrue(s"log cleaner should have processed up to offset $firstDirty2", lastCleaned2
>= firstDirty2);
 
     val read2 = readFromLog(log)
     assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
@@ -123,7 +124,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   
   /* create a cleaner instance and logs with the given parameters */
   def makeCleaner(parts: Int, 
-                  minDirtyMessages: Int = 0, 
+                  minCleanableDirtyRatio: Float = 0.0F,
                   numThreads: Int = 1,
                   defaultPolicy: String = "compact",
                   policyOverrides: Map[String, String] = Map()): LogCleaner = {
@@ -138,6 +139,8 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
       logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
       logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
       logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+      logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
+
       val log = new Log(dir = dir,
                         LogConfig(logProps),
                         recoveryPoint = 0L,


Mime
View raw message