kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1397134 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: log/FileMessageSet.scala log/Log.scala log/LogManager.scala log/LogSegment.scala log/OffsetIndex.scala message/ByteBufferMessageSet.scala server/KafkaApis.scala
Date Thu, 11 Oct 2012 16:06:51 GMT
Author: jkreps
Date: Thu Oct 11 16:06:50 2012
New Revision: 1397134

URL: http://svn.apache.org/viewvc?rev=1397134&view=rev
Log:
KAFKA-506 Misc. follow-up cleanups from Neha's review.


Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala?rev=1397134&r1=1397133&r2=1397134&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala Thu Oct
11 16:06:50 2012
@@ -68,9 +68,9 @@ class FileMessageSet private[kafka](val 
    */
   private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition =
{
     var position = startingPosition
-    val buffer = ByteBuffer.allocate(12)
+    val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
     val size = _size.get()
-    while(position + 12 < size) {
+    while(position + MessageSet.LogOverhead < size) {
       buffer.rewind()
       channel.read(buffer, position)
       if(buffer.hasRemaining)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1397134&r1=1397133&r2=1397134&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu Oct 11 16:06:50
2012
@@ -483,8 +483,10 @@ private[kafka] class Log(val dir: File, 
     var total = 0
     for(segment <- segments) {
       info("Deleting log segment " + segment.start + " from " + name)
-      if(!segment.messageSet.delete() || !segment.index.delete()) {
-        warn("Delete of log segment " + segment.start + " failed.")
+      val deletedLog = segment.messageSet.delete()
+      val deletedIndex = segment.index.delete()
+      if(!deletedIndex || !deletedLog) {
+        throw new KafkaStorageException("Deleting log segment " + segment.start + " failed.")
       } else {
         total += 1
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1397134&r1=1397133&r2=1397134&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Thu Oct 11
16:06:50 2012
@@ -38,7 +38,6 @@ private[kafka] class LogManager(val conf
                                 needRecovery: Boolean) extends Logging {
 
   val logDir: File = new File(config.logDir)
-  private val numPartitions = config.numPartitions
   private val logFileSizeMap = config.logFileSizeMap
   private val flushInterval = config.flushInterval
   private val logCreationLock = new Object

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala?rev=1397134&r1=1397133&r2=1397134&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala Thu Oct 11
16:06:50 2012
@@ -67,7 +67,7 @@ class LogSegment(val messageSet: FileMes
    * Find the physical file position for the least offset >= the given offset. If no offset
is found
    * that meets this criteria before the end of the log, return null.
    */
-  def translateOffset(offset: Long): OffsetPosition = {
+  private def translateOffset(offset: Long): OffsetPosition = {
     val mapping = index.lookup(offset)
     messageSet.searchFor(offset, mapping.position)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala?rev=1397134&r1=1397133&r2=1397134&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala Thu Oct 11
16:06:50 2012
@@ -141,7 +141,7 @@ class OffsetIndex(val file: File, val ba
     var lo = 0
     var hi = entries-1
     while(lo < hi) {
-      val mid = ceil((hi + lo) / 2.0).toInt
+      val mid = ceil(hi/2.0 + lo/2.0).toInt
       val found = logical(idx, mid)
       if(found == relativeOffset)
         return mid
@@ -150,7 +150,7 @@ class OffsetIndex(val file: File, val ba
       else
         hi = mid - 1
     }
-    return lo
+    lo
   }
   
   /* return the nth logical offset relative to the base offset */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1397134&r1=1397133&r2=1397134&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Thu Oct 11 16:06:50 2012
@@ -31,7 +31,7 @@ object ByteBufferMessageSet {
   
   private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages:
Message*): ByteBuffer = {
     if(messages.size == 0) {
-      return MessageSet.Empty.buffer
+      MessageSet.Empty.buffer
     } else if(compressionCodec == NoCompressionCodec) {
       val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
       for(message <- messages)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1397134&r1=1397133&r2=1397134&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Oct
11 16:06:50 2012
@@ -70,10 +70,15 @@ class KafkaApis(val requestChannel: Requ
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
     trace("Handling leader and isr request " + leaderAndISRRequest)
-
-    val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest)
-    val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse)))
+    try {
+      val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndISRRequest)
+      val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId,
responseMap)
+      requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse)))
+    } catch {
+      case e: KafkaStorageException =>
+        fatal("Disk error during leadership change.", e)
+        Runtime.getRuntime.halt(1)
+    }
   }
 
 



Mime
View raw message