kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-858 High watermark values can be overwritten during controlled shutdown; reviewed by Jun Rao
Date Tue, 09 Apr 2013 21:11:17 GMT
Updated Branches:
  refs/heads/0.8 d4a70eb9b -> ef123c20b


KAFKA-858 High watermark values can be overwritten during controlled shutdown; reviewed by
Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ef123c20
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ef123c20
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ef123c20

Branch: refs/heads/0.8
Commit: ef123c20b82dab2f9bfa125bbb81521b0ff806ae
Parents: d4a70eb
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Tue Apr 9 14:11:07 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue Apr 9 14:11:07 2013 -0700

----------------------------------------------------------------------
 .../kafka/server/AbstractFetcherManager.scala      |    4 ++--
 .../main/scala/kafka/server/ReplicaManager.scala   |   15 +++++++--------
 2 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ef123c20/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index be872dc..4269219 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -46,13 +46,13 @@ abstract class AbstractFetcherManager(protected val name: String, numFetchers:
I
           fetcherThread.start
       }
       fetcherThread.addPartition(topic, partitionId, initialOffset)
-      info("adding fetcher on topic %s, partition %d, initOffset %d to broker %d with fetcherId
%d"
+      info("Adding fetcher for partition [%s,%d], initOffset %d to broker %d with fetcherId
%d"
           .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId))
     }
   }
 
   def removeFetcher(topic: String, partitionId: Int) {
-    info("removing fetcher on topic %s, partition %d".format(topic, partitionId))
+    info("Removing fetcher for partition [%s,%d]".format(topic, partitionId))
     mapLock synchronized {
       for ((key, fetcher) <- fetcherThreadMap) {
         fetcher.removePartition(topic, partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ef123c20/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6d849ac..4a41bde 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -120,11 +120,11 @@ class ReplicaManager(val config: KafkaConfig,
         leaderPartitionsLock synchronized {
           leaderPartitions -= replica.partition
         }
-        allPartitions.remove((topic, partitionId))
-        info("After removing partition [%s,%d], the rest of allReplicas is: [%s]".format(topic,
partitionId, allPartitions))
+        if(deletePartition)
+          allPartitions.remove((topic, partitionId))
       case None => //do nothing if replica no longer exists
     }
-    stateChangeLogger.trace("Broker %d finished handling stop replica [%s,%d]".format(localBrokerId,
topic, partitionId))
+    stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId,
topic, partitionId))
     errorCode
   }
 
@@ -168,7 +168,7 @@ class ReplicaManager(val config: KafkaConfig,
     if(replicaOpt.isDefined)
       return replicaOpt.get
     else
-      throw new ReplicaNotAvailableException("Replica %d is not available for partiton [%s,%d]
yet".format(config.brokerId, topic, partition))
+      throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId,
topic, partition))
   }
 
   def getLeaderReplicaIfLocal(topic: String, partitionId: Int): Replica =  {
@@ -230,10 +230,9 @@ class ReplicaManager(val config: KafkaConfig,
             errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
         }
         responseMap.put(topicAndPartition, errorCode)
-        leaderAndISRRequest.partitionStateInfos.foreach(p =>
-          stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d
received from controller %d epoch %d for partition [%s,%d]"
-                                    .format(localBrokerId, leaderAndISRRequest.correlationId,
leaderAndISRRequest.controllerId,
-                                            leaderAndISRRequest.controllerEpoch, p._1._1,
p._1._2)))
+        stateChangeLogger.trace("Broker %d handled LeaderAndIsr request correlationId %d
received from controller %d epoch %d for partition [%s,%d]"
+          .format(localBrokerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.controllerEpoch,
+          topicAndPartition._1, topicAndPartition._2))
       }
       info("Handled leader and isr request %s".format(leaderAndISRRequest))
       // we initialize highwatermark thread after the first leaderisrrequest. This ensures
that all the partitions


Mime
View raw message