kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1391854 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: cluster/ controller/ message/ network/ producer/ server/ tools/
Date Sat, 29 Sep 2012 18:17:39 GMT
Author: junrao
Date: Sat Sep 29 18:17:39 2012
New Revision: 1391854

URL: http://svn.apache.org/viewvc?rev=1391854&view=rev
Log:
IndexOutOfBoundsException thrown by kafka.consumer.ConsumerFetcherThread; patched by Jun Rao;
reviewed by Jay Kreps and Neha Narkhede; kafka-528

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Sat Sep
29 18:17:39 2012
@@ -192,7 +192,7 @@ class Partition(val topic: String,
         case Some(leaderReplica) =>
           val replica = getReplica(replicaId).get
           val leaderHW = leaderReplica.highWatermark
-          if (replica.logEndOffset >= leaderHW) {
+          if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW)
{
             // expand ISR
             val newInSyncReplicas = inSyncReplicas + replica
             info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId,
newInSyncReplicas.map(_.brokerId).mkString(",")))
@@ -237,8 +237,10 @@ class Partition(val topic: String,
     val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min
     val oldHighWatermark = leaderReplica.highWatermark
-    if(newHighWatermark > oldHighWatermark)
+    if(newHighWatermark > oldHighWatermark) {
       leaderReplica.highWatermark = newHighWatermark
+      debug("Highwatermark for topic %s partition %d updated to %d".format(topic, partitionId,
newHighWatermark))
+    }
     else
       debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s"
         .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Sat Sep 29
18:17:39 2012
@@ -100,7 +100,7 @@ class Replica(val brokerId: Int,
     val replicaString = new StringBuilder
     replicaString.append("ReplicaId: " + brokerId)
     replicaString.append("; Topic: " + topic)
-    replicaString.append("; Partition: " + partition.toString)
+    replicaString.append("; Partition: " + partition.partitionId)
     replicaString.append("; isLocal: " + isLocal)
     if(isLocal) replicaString.append("; Highwatermark: " + highWatermark)
     replicaString.toString()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Sat Sep 29 18:17:39 2012
@@ -163,7 +163,7 @@ class ControllerBrokerRequestBatch(sendR
       val broker = m._1
       val leaderAndIsr = m._2
       val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr)
-      debug(("The leaderAndIsr request sent to broker %d is %s").format(broker, leaderAndIsrRequest))
+      info("Sending to broker %d leaderAndIsr request of %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }
     brokerRequestMap.clear()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
Sat Sep 29 18:17:39 2012
@@ -116,7 +116,7 @@ class KafkaController(val config : Kafka
     updateLeaderAndIsrCache()
     // trigger OfflinePartition state for all partitions whose current leader is one amongst
the dead brokers
     val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader
=>
-      deadBrokers.contains(partitionAndLeader._2)).map(_._1).toSeq
+      deadBrokers.contains(partitionAndLeader._2)).keySet.toSeq
     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Sat
Sep 29 18:17:39 2012
@@ -187,10 +187,7 @@ class FileMessageSet private[kafka](priv
       if(next >= 0)
         validUpTo = next
     } while(next >= 0)
-    channel.truncate(validUpTo)
-    setSize.set(validUpTo)
-    /* This should not be necessary, but fixes bug 6191269 on some OSs. */
-    channel.position(validUpTo)
+    truncateTo(validUpTo)
     needRecover.set(false)    
     len - validUpTo
   }
@@ -201,6 +198,8 @@ class FileMessageSet private[kafka](priv
         " size of this log segment is only %d bytes".format(sizeInBytes()))
     channel.truncate(targetSize)
     setSize.set(targetSize)
+    /* This should not be necessary, but fixes bug 6191269 on some OSs. */
+    channel.position(targetSize)
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Sat
Sep 29 18:17:39 2012
@@ -18,15 +18,15 @@
 package kafka.network
 
 import java.util.concurrent._
-import kafka.utils.SystemTime
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.nio.ByteBuffer
 import kafka.api._
 import kafka.common.TopicAndPartition
+import kafka.utils.{Logging, SystemTime}
 
 
-object RequestChannel {
+object RequestChannel extends Logging {
   val AllDone = new Request(1, 2, getShutdownReceive(), 0)
 
   def getShutdownReceive() = {
@@ -45,6 +45,7 @@ object RequestChannel {
     val requestId = buffer.getShort()
     val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
     buffer.rewind()
+    trace("Received request: %s".format(requestObj))
 
     def updateRequestMetrics() {
       val endTimeNs = SystemTime.nanoseconds
@@ -70,6 +71,7 @@ object RequestChannel {
              m.totalTimeHist.update(totalTime)
       }
     }
+    trace("Completed request: %s".format(requestObj))
   }
   
   case class Response(processor: Int, request: Request, responseSend: Send) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Sat
Sep 29 18:17:39 2012
@@ -37,8 +37,7 @@ class SyncProducer(val config: SyncProdu
   
   private val MaxConnectBackoffMs = 60000
   private var sentOnConnection = 0
-  /** make time-based reconnect starting at a random time **/
-  private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble()
* config.reconnectInterval
+  private var lastConnectionTime = -1L
 
   private val lock = new Object()
   @volatile private var shutdown: Boolean = false
@@ -90,7 +89,6 @@ class SyncProducer(val config: SyncProdu
       if(sentOnConnection >= config.reconnectInterval || (config.reconnectTimeInterval
>= 0 && System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval))
{
         reconnect()
         sentOnConnection = 0
-        lastConnectionTime = System.currentTimeMillis
       }
       response
     }
@@ -146,6 +144,7 @@ class SyncProducer(val config: SyncProdu
     while(!blockingChannel.isConnected && !shutdown) {
       try {
         blockingChannel.connect()
+        lastConnectionTime = System.currentTimeMillis
         info("Connected to " + config.host + ":" + config.port + " for producing")
       } catch {
         case e: Exception => {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Sat Sep 29 18:17:39 2012
@@ -112,7 +112,7 @@ abstract class  AbstractFetcherThread(na
                   warn("current offset %d for topic %s partition %d out of range; reset offset
to %d"
                     .format(currentOffset.get, topic, partitionId, newOffset))
                 case _ =>
-                  error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
+                  error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
                     ErrorMapping.exceptionFor(partitionData.error))
                   partitionsWithError += topicAndPartition
                   fetchMap.remove(topicAndPartition)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Sat Sep
29 18:17:39 2012
@@ -265,7 +265,7 @@ class KafkaApis(val requestChannel: Requ
                          .format(partition, fetchRequest.clientId))
             0
           case e =>
-            error("Error determining available fetch bytes for topic %s partition %s on broker
%s for client %s"
+            warn("Error determining available fetch bytes for topic %s partition %s on broker
%s for client %s"
                           .format(topic, partition, brokerId, fetchRequest.clientId), e)
             0
         }
@@ -562,6 +562,7 @@ class KafkaApis(val requestChannel: Requ
 
       // unblocked if there are no partitions with pending acks
       val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
+      trace("Producer request satisfaction for %s-%d = %b".format(topic, partitionId, satisfied))
       satisfied
     }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
Sat Sep 29 18:17:39 2012
@@ -23,7 +23,7 @@ class ReplicaFetcherManager(private val 
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId
+ ", ", brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread
= {
-    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d-".format(sourceBroker.id,
fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
+    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id,
fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
   }
 
   def shutdown() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala?rev=1391854&r1=1391853&r2=1391854&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala Sat
Sep 29 18:17:39 2012
@@ -45,7 +45,10 @@ object DumpLogSegments {
             println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
           offset = messageAndOffset.offset
         }
-        println("tail of the log is at offset: " + (startOffset + offset)) 
+        val endOffset = startOffset + offset
+        println("Tail of the log is at offset: " + endOffset)
+        if (messageSet.sizeInBytes != endOffset)
+          println("Log corrupted from " + endOffset + " to " + messageSet.sizeInBytes + "!!!")
       }
     }
   }



Mime
View raw message