kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5104: DumpLogSegments should not open index files with `rw`
Date Thu, 04 May 2017 19:32:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a6728f6ee -> 41fead3ac


KAFKA-5104: DumpLogSegments should not open index files with `rw`

Add a parameter 'writable' for AbstractIndex and set its default value to true for its children
classes.

Author: amethystic <huxi_2b@hotmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2905 from amethystic/kafka-5104_DumpLogSegments_should_not_open_index_files_with_rw


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41fead3a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41fead3a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41fead3a

Branch: refs/heads/trunk
Commit: 41fead3acf6b70f829eb61688d31b0914adedadc
Parents: a6728f6
Author: Xi Hu <huxi_2b@hotmail.com>
Authored: Thu May 4 12:32:22 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu May 4 12:32:22 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/AbstractIndex.scala     | 12 ++++++++----
 core/src/main/scala/kafka/log/OffsetIndex.scala       |  4 ++--
 core/src/main/scala/kafka/log/TimeIndex.scala         |  5 +++--
 core/src/main/scala/kafka/tools/DumpLogSegments.scala |  6 +++---
 4 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/41fead3a/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index bddd1b3..f7478ad 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -37,7 +37,7 @@ import scala.math.ceil
  * @param baseOffset the base offset of the segment that this index is corresponding to.
  * @param maxIndexSize The maximum index size in bytes.
  */
-abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize:
Int = -1)
+abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize:
Int = -1, val writable: Boolean)
     extends Logging {
 
   protected def entrySize: Int
@@ -47,7 +47,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset:
Lon
   @volatile
   protected var mmap: MappedByteBuffer = {
     val newlyCreated = file.createNewFile()
-    val raf = new RandomAccessFile(file, "rw")
+    val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file,
"r")
     try {
       /* pre-allocate the file if necessary */
       if(newlyCreated) {
@@ -58,8 +58,12 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset:
Lon
 
       /* memory-map the file */
       val len = raf.length()
-      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
-
+      val idx = {
+        if (writable)
+          raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+        else
+          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len)
+      }
       /* set the position in the index for the next entry */
       if(newlyCreated)
         idx.position(0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fead3a/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index a59c02c..a54579f 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -48,8 +48,8 @@ import kafka.common.InvalidOffsetException
  * All external APIs translate from relative offsets to full offsets, so users of this class
do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1)
-    extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize) {
+class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean
= true)
+    extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize, writable) {
 
   override def entrySize = 8
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fead3a/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index 0cedbca..731b173 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -51,8 +51,9 @@ import org.apache.kafka.common.record.RecordBatch
  */
 class TimeIndex(file: File,
                 baseOffset: Long,
-                maxIndexSize: Int = -1)
-    extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize) with Logging {
+                maxIndexSize: Int = -1,
+                writable: Boolean = true)
+    extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize, writable) with Logging
{
 
   override def entrySize = 12
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/41fead3a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index feb81c6..7a5f671 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -152,7 +152,7 @@ object DumpLogSegments {
     val startOffset = file.getName.split("\\.")(0).toLong
     val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) +
Log.LogFileSuffix)
     val fileRecords = FileRecords.open(logFile, false)
-    val index = new OffsetIndex(file, baseOffset = startOffset)
+    val index = new OffsetIndex(file, baseOffset = startOffset, writable = false)
 
     //Check that index passes sanityCheck, this is the check that determines if indexes will
be rebuilt on startup or not.
     if (indexSanityOnly) {
@@ -187,8 +187,8 @@ object DumpLogSegments {
     val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) +
Log.LogFileSuffix)
     val fileRecords = FileRecords.open(logFile, false)
     val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0)
+ Log.IndexFileSuffix)
-    val index = new OffsetIndex(indexFile, baseOffset = startOffset)
-    val timeIndex = new TimeIndex(file, baseOffset = startOffset)
+    val index = new OffsetIndex(indexFile, baseOffset = startOffset, writable = false)
+    val timeIndex = new TimeIndex(file, baseOffset = startOffset, writable = false)
 
     //Check that index passes sanityCheck, this is the check that determines if indexes will
be rebuilt on startup or not.
     if (indexSanityOnly) {


Mime
View raw message