kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-905 Logs can have same offsets causing recovery failure; reviewed by Jun, Neha and Jay
Date Tue, 04 Jun 2013 00:05:33 GMT
Updated Branches:
  refs/heads/0.8 43c43b1c6 -> fb37ea8c7


KAFKA-905 Logs can have same offsets causing recovery failure; reviewed by Jun, Neha and Jay


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb37ea8c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb37ea8c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb37ea8c

Branch: refs/heads/0.8
Commit: fb37ea8c749c41b1e34158debc7e9221b4db8f31
Parents: 43c43b1
Author: Neha Narkhede <nehanarkhede@apache.org>
Authored: Mon Jun 3 17:04:46 2013 -0700
Committer: Neha Narkhede <nehanarkhede@apache.org>
Committed: Mon Jun 3 17:05:17 2013 -0700

----------------------------------------------------------------------
 .../kafka/common/InvalidOffsetException.scala      |   22 +++++++++++++++
 core/src/main/scala/kafka/log/Log.scala            |   16 +++++++++-
 core/src/main/scala/kafka/log/OffsetIndex.scala    |   20 +++++++++-----
 .../scala/unit/kafka/log/OffsetIndexTest.scala     |    3 +-
 4 files changed, 51 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fb37ea8c/core/src/main/scala/kafka/common/InvalidOffsetException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InvalidOffsetException.scala b/core/src/main/scala/kafka/common/InvalidOffsetException.scala
new file mode 100644
index 0000000..c6811d7
--- /dev/null
+++ b/core/src/main/scala/kafka/common/InvalidOffsetException.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class InvalidOffsetException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb37ea8c/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index ef708e2..f634896 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -143,6 +143,7 @@ private[kafka] class Log(val dir: File,
     // open all the segments read-only
     val logSegments = new ArrayList[LogSegment]
     val ls = dir.listFiles()
+
     if(ls != null) {
       for(file <- ls if file.isFile) {
         val filename = file.getName()
@@ -189,8 +190,19 @@ private[kafka] class Log(val dir: File,
       logSegments.get(logSegments.size() - 1).index.resize(maxIndexSize)
 
       // run recovery on the last segment if necessary
-      if(needsRecovery)
-        recoverSegment(logSegments.get(logSegments.size - 1))
+      if(needsRecovery) {
+        var activeSegment = logSegments.get(logSegments.size - 1)
+        try {
+          recoverSegment(activeSegment)
+        } catch {
+          case e: InvalidOffsetException =>
+            val startOffset = activeSegment.start
+            warn("Found invalid offset during recovery of the active segment for topic partition
" + dir.getName +". Deleting the segment and " +
+                 "creating an empty one with starting offset " + startOffset)
+            // truncate the active segment to its starting offset
+            activeSegment.truncateTo(startOffset)
+        }
+      }
     }
 
     val segmentList = logSegments.toArray(new Array[LogSegment](logSegments.size))

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb37ea8c/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 60ebc52..9de3d31 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -23,6 +23,7 @@ import java.nio._
 import java.nio.channels._
 import java.util.concurrent.atomic._
 import kafka.utils._
+import kafka.common.InvalidOffsetException
 
 /**
  * An index that maps offsets to physical file locations for a particular log segment. This
index may be sparse:
@@ -178,13 +179,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize:
Int =
   def append(offset: Long, position: Int) {
     this synchronized {
       require(!isFull, "Attempt to append to a full index (size = " + size + ").")
-      require(size.get == 0 || offset > lastOffset, "Attempt to append an offset (%d)
to position %d no larger than the last offset appended (%d).".format(offset, entries, lastOffset))
-      debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
-      this.mmap.putInt((offset - baseOffset).toInt)
-      this.mmap.putInt(position)
-      this.size.incrementAndGet()
-      this.lastOffset = offset
-      require(entries * 8 == mmap.position, entries + " entries but file position in index
is " + mmap.position + ".")
+      if (size.get == 0 || offset > lastOffset) {
+        debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
+        this.mmap.putInt((offset - baseOffset).toInt)
+        this.mmap.putInt(position)
+        this.size.incrementAndGet()
+        this.lastOffset = offset
+        require(entries * 8 == mmap.position, entries + " entries but file position in index
is " + mmap.position + ".")
+      }
+      else {
+        throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d
no larger than the last offset appended (%d) to %s."
+          .format(offset, entries, lastOffset, file.getName))
+      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb37ea8c/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 3b2c069..9213a5d 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -25,6 +25,7 @@ import org.scalatest.junit.JUnitSuite
 import scala.collection._
 import scala.util.Random
 import kafka.utils.TestUtils
+import kafka.common.InvalidOffsetException
 
 class OffsetIndexTest extends JUnitSuite {
   
@@ -89,7 +90,7 @@ class OffsetIndexTest extends JUnitSuite {
     assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalArgumentException])
   }
   
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[InvalidOffsetException])
   def appendOutOfOrder() {
     idx.append(51, 0)
     idx.append(50, 1)


Mime
View raw message