kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1315 log.dirs property in KafkaServer intolerant of trailing slash; reviewed by Neha Narkhede and Guozhang Wang
Date Fri, 21 Mar 2014 00:33:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.1 03762453f -> c66e408b2


KAFKA-1315 log.dirs property in KafkaServer intolerant of trailing slash; reviewed by Neha
Narkhede and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c66e408b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c66e408b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c66e408b

Branch: refs/heads/0.8.1
Commit: c66e408b244de52f1c5c5bbd7627aa1f028f9a87
Parents: 0376245
Author: Timothy Chen <tnachen@gmail.com>
Authored: Thu Mar 20 17:32:45 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Thu Mar 20 17:32:58 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/server/ReplicaManager.scala     |  2 +-
 .../server/HighwatermarkPersistenceTest.scala   |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 46 ++++++++++++++++++++
 3 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c66e408b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 21bba48..0fe881d 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -53,7 +53,7 @@ class ReplicaManager(val config: KafkaConfig,
   private val replicaStateChangeLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
-  val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new OffsetCheckpoint(new
File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
+  val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath,
new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
   private var hwThreadInitialized = false
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
   val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c66e408b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 02c188a..a78f7cf 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -143,7 +143,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
   }
 
   def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
-    replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read.getOrElse(TopicAndPartition(topic,
partition), 0L)
+    replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic,
partition), 0L)
   }
   
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c66e408b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
new file mode 100644
index 0000000..b5936d4
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Test
+import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.io.File
+import org.easymock.EasyMock
+import org.I0Itec.zkclient.ZkClient
+import kafka.cluster.Replica
+import kafka.log.{LogManager, LogConfig, Log}
+
+class ReplicaManagerTest extends JUnit3Suite {
+  @Test
+  def testHighwaterMarkDirectoryMapping() {
+    val props = TestUtils.createBrokerConfig(1)
+    val dir = "/tmp/kafka-logs/"
+    new File(dir).mkdir()
+    props.setProperty("log.dirs", dir)
+    val config = new KafkaConfig(props)
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val mockLogMgr = EasyMock.createMock(classOf[LogManager])
+    val time: MockTime = new MockTime()
+    val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
+    val partition = rm.getOrCreatePartition("test-topic", 1, 1)
+    partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new
File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null))))
+    rm.checkpointHighWatermarks()
+  }
+}


Mime
View raw message