kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject kafka git commit: KAFKA-1819 Cleaner gets confused about deleted and re-created topics; reviewed by Neha Narkhede
Date Tue, 13 Jan 2015 05:12:42 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ad4883a0c -> 14779dddb


KAFKA-1819 Cleaner gets confused about deleted and re-created topics; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14779ddd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14779ddd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14779ddd

Branch: refs/heads/trunk
Commit: 14779dddb6a9bb3aefbaa825a62874f89bb47d2c
Parents: ad4883a
Author: Gwen Shapira <cshapi@gmail.com>
Authored: Mon Jan 12 21:12:26 2015 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Mon Jan 12 21:12:34 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 11 +++-
 .../scala/kafka/log/LogCleanerManager.scala     | 17 ++++--
 core/src/main/scala/kafka/log/LogManager.scala  |  9 ++-
 .../unit/kafka/admin/DeleteTopicTest.scala      | 64 +++++++++++++++++++-
 .../kafka/log/LogCleanerIntegrationTest.scala   | 14 +++++
 5 files changed, 103 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/14779ddd/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index f8fcb84..f8e7cd5 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -71,8 +71,8 @@ class LogCleaner(val config: CleanerConfig,
                  val logs: Pool[TopicAndPartition, Log], 
                  time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
   
-  /* for managing the state of partitions being cleaned. */
-  private val cleanerManager = new LogCleanerManager(logDirs, logs);
+  /* for managing the state of partitions being cleaned. package-private to allow access
in tests */
+  private[log] val cleanerManager = new LogCleanerManager(logDirs, logs);
 
   /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum
rate */
   private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, 
@@ -130,6 +130,13 @@ class LogCleaner(val config: CleanerConfig,
   }
 
   /**
+   * Update checkpoint file, removing topics and partitions that no longer exist
+   */
+  def updateCheckpoints(dataDir: File) {
+    cleanerManager.updateCheckpoints(dataDir, update=None);
+  }
+
+  /**
    *  Abort the cleaning of a particular partition if it's in progress, and pause any future
cleaning of this partition.
    *  This call blocks until the cleaning of the partition is aborted and paused.
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/14779ddd/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index bcfef77..fd87d90 100644
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -44,9 +44,12 @@ private[log] case object LogCleaningPaused extends LogCleaningState
 private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition,
Log]) extends Logging with KafkaMetricsGroup {
   
   override val loggerName = classOf[LogCleaner].getName
+
+  // package-private for testing
+  private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
   
   /* the offset checkpoints holding the last cleaned point for each log */
-  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir,
"cleaner-offset-checkpoint")))).toMap
+  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir,
offsetCheckpointFile)))).toMap
 
   /* the set of logs currently being cleaned */
   private val inProgress = mutable.HashMap[TopicAndPartition, LogCleaningState]()
@@ -199,6 +202,14 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs:
Pool[To
     }
   }
 
+  def updateCheckpoints(dataDir: File, update: Option[(TopicAndPartition,Long)]) {
+    inLock(lock) {
+      val checkpoint = checkpoints(dataDir)
+      val existing = checkpoint.read().filterKeys(logs.keys) ++ update
+      checkpoint.write(existing)
+    }
+  }
+
   /**
    * Save out the endOffset and remove the given log from the in-progress set, if not aborted.
    */
@@ -206,9 +217,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs:
Pool[To
     inLock(lock) {
       inProgress(topicAndPartition) match {
         case LogCleaningInProgress =>
-          val checkpoint = checkpoints(dataDir)
-          val offsets = checkpoint.read() + ((topicAndPartition, endOffset))
-          checkpoint.write(offsets)
+          updateCheckpoints(dataDir,Option(topicAndPartition, endOffset))
           inProgress.remove(topicAndPartition)
         case LogCleaningAborted =>
           inProgress.put(topicAndPartition, LogCleaningPaused)

http://git-wip-us.apache.org/repos/asf/kafka/blob/14779ddd/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4ebaae0..47d250a 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -57,8 +57,9 @@ class LogManager(val logDirs: Array[File],
   private val dirLocks = lockLogDirs(logDirs)
   private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new
File(dir, RecoveryPointCheckpointFile)))).toMap
   loadLogs()
-  
-  private val cleaner: LogCleaner = 
+
+  // public, so we can access this from kafka.admin.DeleteTopicTest
+  val cleaner: LogCleaner =
     if(cleanerConfig.enableCleaner)
       new LogCleaner(cleanerConfig, logDirs, logs, time = time)
     else
@@ -370,8 +371,10 @@ class LogManager(val logDirs: Array[File],
     }
     if (removedLog != null) {
       //We need to wait until there is no more cleaning task on the log to be deleted before
actually deleting it.
-      if (cleaner != null)
+      if (cleaner != null) {
         cleaner.abortCleaning(topicAndPartition)
+        cleaner.updateCheckpoints(removedLog.dir.getParentFile)
+      }
       removedLog.delete()
       info("Deleted log for partition [%s,%d] in %s."
            .format(topicAndPartition.topic,

http://git-wip-us.apache.org/repos/asf/kafka/blob/14779ddd/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 29cc01b..33c2767 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -16,11 +16,14 @@
  */
 package kafka.admin
 
+import java.io.File
+
+import kafka.log.Log
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, TestUtils}
-import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.server.{OffsetCheckpoint, KafkaServer, KafkaConfig}
 import org.junit.Test
 import kafka.common._
 import kafka.producer.{ProducerConfig, Producer}
@@ -221,14 +224,50 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness
{
     val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
     assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
     servers.foreach(_.shutdown())
+  }
+
+  @Test
+  def testDeleteTopicWithCleaner() {
+    val topicName = "test"
+    val topicAndPartition = TopicAndPartition(topicName, 0)
+    val topic = topicAndPartition.topic
+
+    val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
+    brokerConfigs(0).setProperty("delete.topic.enable", "true")
+    brokerConfigs(0).setProperty("log.cleaner.enable","true")
+    brokerConfigs(0).setProperty("log.cleanup.policy","compact")
+    brokerConfigs(0).setProperty("log.segment.bytes","100")
+    brokerConfigs(0).setProperty("log.segment.delete.delay.ms","1000")
+    val servers = createTestTopicAndCluster(topic,brokerConfigs)
+
+    // for simplicity, we are validating cleaner offsets on a single broker
+    val server = servers(0)
+    val log = server.logManager.getLog(topicAndPartition).get
+
+    // write to the topic to activate cleaner
+    writeDups(numKeys = 100, numDups = 3,log)
+
+    // wait for cleaner to clean
+   server.logManager.cleaner.awaitCleaned(topicName,0,0)
 
+    // delete topic
+    AdminUtils.deleteTopic(zkClient, "test")
+    verifyTopicDeletion("test", servers)
+
+    servers.foreach(_.shutdown())
   }
 
   private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
+
+    val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
+    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")
+    )
+    createTestTopicAndCluster(topic,brokerConfigs)
+  }
+
+  private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer]
= {
     val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topicAndPartition = TopicAndPartition(topic, 0)
-    val brokerConfigs = TestUtils.createBrokerConfigs(3, false)
-    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
     val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
@@ -253,5 +292,24 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     // ensure that logs from all replicas are deleted if delete topic is marked successful
in zookeeper
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty))
+    // ensure that topic is removed from all cleaner offsets
+    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res,server) => res &&
+    {
+      val topicAndPartition = TopicAndPartition(topic,0)
+      val logdir = server.getLogManager().logDirs(0)
+      val checkpoints =  new OffsetCheckpoint(new File(logdir,"cleaner-offset-checkpoint")).read()
+      !checkpoints.contains(topicAndPartition)
+    }),
+      "Cleaner offset for deleted partition should have been removed")
+  }
+
+  private def writeDups(numKeys: Int, numDups: Int, log: Log): Seq[(Int, Int)] = {
+    var counter = 0
+    for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+      val count = counter
+      log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, key = key.toString.getBytes),
assignOffsets = true)
+      counter += 1
+      (key, count)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/14779ddd/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 5bfa764..07acd46 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -18,6 +18,8 @@
 package kafka.log
 
 import java.io.File
+import kafka.server.OffsetCheckpoint
+
 import scala.collection._
 import org.junit._
 import kafka.common.TopicAndPartition
@@ -62,6 +64,18 @@ class LogCleanerIntegrationTest extends JUnitSuite {
     cleaner.awaitCleaned("log", 0, lastCleaned2)
     val read2 = readFromLog(log)
     assertEquals("Contents of the map shouldn't change.", appends2.toMap, read2.toMap)
+
+    // simulate deleting a partition, by removing it from logs
+    // force a checkpoint
+    // and make sure its gone from checkpoint file
+
+    cleaner.logs.remove(topics(0))
+
+    cleaner.updateCheckpoints(logDir)
+    val checkpoints = new OffsetCheckpoint(new File(logDir,cleaner.cleanerManager.offsetCheckpointFile)).read()
+
+    // we expect partition 0 to be gone
+    assert(!checkpoints.contains(topics(0)))
     
     cleaner.shutdown()
   }


Mime
View raw message