kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1396726 [2/2] - in /incubator/kafka/branches/0.8: bin/ config/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/sr...
Date Wed, 10 Oct 2012 18:42:59 GMT
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala Wed Oct 10 18:42:57 2012
@@ -49,39 +49,36 @@ import kafka.utils._
  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, maxIndexSize: Int = -1) extends Logging {
-
+class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1) extends Logging {
+  
   /* the memory mapping */
   private var mmap: MappedByteBuffer = 
     {
       val newlyCreated = file.createNewFile()
       val raf = new RandomAccessFile(file, "rw")
       try {
-        if(mutable) {
-          /* if mutable create and memory map a new sparse file */
+        /* pre-allocate the file if necessary */
+        if(newlyCreated) {
           if(maxIndexSize < 8)
             throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
+          raf.setLength(roundToExactMultiple(maxIndexSize, 8))
+        }
           
-          /* pre-allocate the file if necessary */
-          if(newlyCreated)
-            raf.setLength(roundToExactMultiple(maxIndexSize, 8))
-          val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, raf.length())
+        val len = raf.length()  
+        if(len < 0 || len % 8 != 0)
+          throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + 
+                                          " bytes which is not positive or not a multiple of 8.")
           
-          /* set the position in the index for the next entry */
-          if(newlyCreated)
-            idx.position(0)
-          else
-            // if this is a pre-existing index, assume it is all valid and set position to last entry
-            idx.position(roundToExactMultiple(idx.limit, 8))
-          idx
-        } else {
-          /* if not mutable, just mmap what they gave us */
-          val len = raf.length()
-          if(len < 0 || len % 8 != 0)
-            throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + 
-                                            " bytes which is not positive or not a multiple of 8.")
-          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len)
-        }
+        /* memory-map the file */
+        val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+          
+        /* set the position in the index for the next entry */
+        if(newlyCreated)
+          idx.position(0)
+        else
+          // if this is a pre-existing index, assume it is all valid and set position to last entry
+          idx.position(roundToExactMultiple(idx.limit, 8))
+        idx
       } finally {
         Utils.swallow(raf.close())
       }
@@ -91,7 +88,7 @@ class OffsetIndex(val file: File, val ba
   val maxEntries = mmap.limit / 8
   
   /* the number of entries in the index */
-  private var size = if(mutable) new AtomicInteger(mmap.position / 8) else new AtomicInteger(mmap.limit / 8)
+  private var size = new AtomicInteger(mmap.position / 8)
   
   /* the last offset in the index */
   var lastOffset = readLastOffset()
@@ -115,8 +112,6 @@ class OffsetIndex(val file: File, val ba
    * the pair (baseOffset, 0) is returned.
    */
   def lookup(targetOffset: Long): OffsetPosition = {
-    if(entries == 0)
-      return OffsetPosition(baseOffset, 0)
     val idx = mmap.duplicate
     val slot = indexSlotFor(idx, targetOffset)
     if(slot == -1)
@@ -128,16 +123,20 @@ class OffsetIndex(val file: File, val ba
   /**
    * Find the slot in which the largest offset less than or equal to the given
    * target offset is stored.
-   * Return -1 if the least entry in the index is larger than the target offset 
+   * Return -1 if the least entry in the index is larger than the target offset or the index is empty
    */
   private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
     // we only store the difference from the baseoffset so calculate that
     val relativeOffset = targetOffset - baseOffset
     
+    // check if the index is empty
+    if(entries == 0)
+      return -1
+    
     // check if the target offset is smaller than the least offset
     if(logical(idx, 0) > relativeOffset)
       return -1
-    
+      
     // binary search for the entry
     var lo = 0
     var hi = entries-1
@@ -175,8 +174,6 @@ class OffsetIndex(val file: File, val ba
    */
   def append(logicalOffset: Long, position: Int) {
     this synchronized {
-      if(!mutable)
-        throw new IllegalStateException("Attempt to append to an immutable offset index " + file.getName)
       if(isFull)
         throw new IllegalStateException("Attempt to append to a full index (size = " + size + ").")
       if(size.get > 0 && logicalOffset <= lastOffset)
@@ -227,17 +224,17 @@ class OffsetIndex(val file: File, val ba
   }
   
   /**
-   * Make this segment read-only, flush any unsaved changes, and truncate any excess bytes
+   * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
+   * the file.
    */
-  def makeReadOnly() {
+  def trimToSize() {
     this synchronized {
-      mutable = false
       flush()
       val raf = new RandomAccessFile(file, "rws")
       try {
         val newLength = entries * 8
         raf.setLength(newLength)
-        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, newLength)
+        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, newLength)
       } finally {
         Utils.swallow(raf.close())
       }
@@ -265,8 +262,7 @@ class OffsetIndex(val file: File, val ba
   
   /** Close the index */
   def close() {
-    if(mutable)
-      makeReadOnly()
+    trimToSize()
   }
   
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala Wed Oct 10 18:42:57 2012
@@ -50,9 +50,10 @@ private class KafkaCSVMetricsReporter ex
         if (!csvDir.exists())
           csvDir.mkdirs()
         underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
-        if (props.getBoolean("kafka.csv.metrics.reporter.enabled", false))
+        if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) {
+          initialized = true
           startReporter(metricsConfig.pollingIntervalSecs)
-        initialized = true
+        }
       }
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Wed Oct 10 18:42:57 2012
@@ -92,7 +92,7 @@ class RequestChannel(val numProcessors: 
   newGauge(
     "RequestQueueSize",
     new Gauge[Int] {
-      def value() = requestQueue.size
+      def getValue = requestQueue.size
     }
   )
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerPool.scala Wed Oct 10 18:42:57 2012
@@ -21,7 +21,7 @@ import kafka.cluster.Broker
 import java.util.Properties
 import collection.mutable.HashMap
 import java.lang.Object
-import kafka.utils.Logging
+import kafka.utils.{Utils, Logging}
 import kafka.api.TopicMetadata
 import kafka.common.UnavailableProducerException
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Wed Oct 10 18:42:57 2012
@@ -36,7 +36,7 @@ class ProducerSendThread[K,V](val thread
   newGauge(
     "ProducerQueueSize-" + getId,
     new Gauge[Int] {
-      def value() = queue.size
+      def getValue = queue.size
     }
   )
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala Wed Oct 10 18:42:57 2012
@@ -23,7 +23,7 @@ import kafka.cluster.Broker
 
 abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
-  private val fetcherThreadMap = new mutable.HashMap[(Int, Int), AbstractFetcherThread]
+  private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "], "
 
@@ -37,7 +37,7 @@ abstract class AbstractFetcherManager(pr
   def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) {
     mapLock synchronized {
       var fetcherThread: AbstractFetcherThread = null
-      val key = (sourceBroker.id, getFetcherId(topic, partitionId))
+      val key = (sourceBroker, getFetcherId(topic, partitionId))
       fetcherThreadMap.get(key) match {
         case Some(f) => fetcherThread = f
         case None =>
@@ -64,15 +64,6 @@ abstract class AbstractFetcherManager(pr
     }
   }
 
-  def fetcherSourceBroker(topic: String, partitionId: Int): Option[Int] = {
-    mapLock synchronized {
-      for ( ((sourceBrokerId, _), fetcher) <- fetcherThreadMap)
-        if (fetcher.hasPartition(topic, partitionId))
-          return Some(sourceBrokerId)
-    }
-    None
-  }
-
   def closeAllFetchers() {
     mapLock synchronized {
       for ( (_, fetcher) <- fetcherThreadMap) {
@@ -81,4 +72,4 @@ abstract class AbstractFetcherManager(pr
       fetcherThreadMap.clear()
     }
   }
-}
\ No newline at end of file
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Wed Oct 10 18:42:57 2012
@@ -119,7 +119,7 @@ abstract class  AbstractFetcherThread(na
                   warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
                     .format(currentOffset.get, topic, partitionId, newOffset))
                 case _ =>
-                  error("error for %s %d to broker %s ".format(topic, partitionId, sourceBroker.host),
+                  error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
                     ErrorMapping.exceptionFor(partitionData.error))
                   partitionsWithError += topicAndPartition
                   fetchMap.remove(topicAndPartition)
@@ -165,7 +165,7 @@ class FetcherLagMetrics(name: (String, I
   newGauge(
     name._1 + "-" + name._2 + "-ConsumerLag",
     new Gauge[Long] {
-      def value() = lagVal.get
+      def getValue = lagVal.get
     }
   )
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Oct 10 18:42:57 2012
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.io.IOException
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
 import kafka.message._
@@ -75,6 +76,7 @@ class KafkaApis(val requestChannel: Requ
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse)))
   }
 
+
   def handleStopReplicaRequest(request: RequestChannel.Request){
     val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
     if(requestLogger.isTraceEnabled)
@@ -83,7 +85,7 @@ class KafkaApis(val requestChannel: Requ
 
     val responseMap = new HashMap[(String, Int), Short]
 
-    for((topic, partitionId) <- stopReplicaRequest.partitions){
+    for((topic, partitionId) <- stopReplicaRequest.stopReplicaSet){
       val errorCode = replicaManager.stopReplica(topic, partitionId)
       responseMap.put((topic, partitionId), errorCode)
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Wed Oct 10 18:42:57 2012
@@ -21,7 +21,7 @@ import java.util.Properties
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
 import java.net.InetAddress
-import kafka.utils.{Utils, VerifiableProperties, ZKConfig}
+import kafka.utils.{Topic, Utils, VerifiableProperties, ZKConfig}
 
 /**
  * Configuration settings for the kafka server

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Wed Oct 10 18:42:57 2012
@@ -20,8 +20,7 @@ package kafka.server
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
-        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ",
-          brokerConfig.numReplicaFetchers) {
+        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Wed Oct 10 18:42:57 2012
@@ -47,13 +47,13 @@ class ReplicaManager(val config: KafkaCo
   newGauge(
     "LeaderCount",
     new Gauge[Int] {
-      def value() = leaderPartitions.size
+      def getValue = leaderPartitions.size
     }
   )
   newGauge(
     "UnderReplicatedPartitions",
     new Gauge[Int] {
-      def value() = {
+      def getValue = {
         leaderPartitionsLock synchronized {
           leaderPartitions.count(_.isUnderReplicated)
         }
@@ -159,6 +159,19 @@ class ReplicaManager(val config: KafkaCo
       }
       responseMap.put(partitionInfo, errorCode)
     }
+
+    /**
+     *  If IsInit flag is on, this means that the controller wants to treat topics not in the request
+     *  as deleted.
+     *  TODO: Handle this properly as part of KAFKA-330
+     */
+//    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
+//      startHighWaterMarksCheckPointThread
+//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
+//      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
+//      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
+//    }
+
     responseMap
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Wed Oct 10 18:42:57 2012
@@ -69,7 +69,7 @@ abstract class RequestPurgatory[T <: Del
   newGauge(
     "NumDelayedRequests",
     new Gauge[Int] {
-      def value() = expiredRequestReaper.unsatisfied.get()
+      def getValue = expiredRequestReaper.unsatisfied.get()
     }
   )
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala Wed Oct 10 18:42:57 2012
@@ -22,6 +22,7 @@ import joptsimple._
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import kafka.consumer.SimpleConsumer
+import collection.mutable.Map
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.TopicAndPartition
 import scala.collection._
@@ -33,8 +34,9 @@ object ConsumerOffsetChecker extends Log
 
   private val BidPidPattern = """(\d+)-(\d+)""".r
 
-  private val BrokerIpPattern = """.*:(\d+\.\d+\.\d+\.\d+):(\d+$)""".r
+  private val BrokerIpPattern = """.*:([^:]+):(\d+$)""".r
   // e.g., 127.0.0.1-1315436360737:127.0.0.1:9092
+  // e.g., host.domain.com-1315436360737:host.domain.com:9092
 
   private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = {
     val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala Wed Oct 10 18:42:57 2012
@@ -43,7 +43,7 @@ object DumpLogSegments {
   /* print out the contents of the index */
   def dumpIndex(file: File) {
     val startOffset = file.getName().split("\\.")(0).toLong
-    val index = new OffsetIndex(file = file, baseOffset = startOffset, mutable = false)
+    val index = new OffsetIndex(file = file, baseOffset = startOffset)
     for(i <- 0 until index.entries) {
       val entry = index.entry(i)
       // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
@@ -57,7 +57,7 @@ object DumpLogSegments {
   def dumpLog(file: File, printContents: Boolean) {
     val startOffset = file.getName().split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
-    val messageSet = new FileMessageSet(file, false)
+    val messageSet = new FileMessageSet(file)
     var validBytes = 0L
     for(messageAndOffset <- messageSet) {
       val msg = messageAndOffset.message

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Wed Oct 10 18:42:57 2012
@@ -19,7 +19,6 @@ package kafka.utils
 
 import java.io._
 import java.nio._
-import charset.Charset
 import java.nio.channels._
 import java.lang.management._
 import java.util.zip.CRC32
@@ -639,7 +638,7 @@ object Utils extends Logging {
     builder.toString
   }
 
-  def mapToJson[T <: Any](map: Map[String, Seq[String]]): String = {
+  def mapToJson[T <: Any](map: Map[String, List[String]]): String = {
     val builder = new StringBuilder
     builder.append("{ ")
     var numElements = 0
@@ -716,18 +715,6 @@ object Utils extends Logging {
       for (forever <- Stream.continually(1); t <- coll) yield t
     stream.iterator
   }
-
-  def readFileIntoString(path: String): String = {
-    val stream = new FileInputStream(new File(path))
-    try {
-      val fc = stream.getChannel()
-      val bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size())
-      Charset.defaultCharset().decode(bb).toString()
-    }
-    finally {
-      stream.close()
-    }
-  }
 }
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Wed Oct 10 18:42:57 2012
@@ -27,15 +27,12 @@ import kafka.api.LeaderAndIsr
 import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
 import kafka.common.{KafkaException, NoEpochForPartitionException}
-import kafka.controller.{PartitionAndReplica, ReassignedPartitionsContext}
-import kafka.admin._
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
   val ControllerPath = "/controller"
-  val ReassignPartitionsPath = "/admin/reassign_partitions"
 
   def getTopicPath(topic: String): String ={
     BrokerTopicsPath + "/" + topic
@@ -79,23 +76,20 @@ object ZkUtils extends Logging {
     val leaderAndIsrOpt = leaderAndIsrInfo._1
     val stat = leaderAndIsrInfo._2
     leaderAndIsrOpt match {
-      case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr, topic, partition, stat)
-      case None => None
-    }
-  }
-
-  def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = {
-    SyncJSON.parseFull(leaderAndIsrStr) match {
-      case Some(m) =>
-        val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
-        val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
-        val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
-        val isr = Utils.getCSVList(isrString).map(r => r.toInt)
-        val zkPathVersion = stat.getVersion
-        debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
-          isr.toString(), zkPathVersion, topic, partition))
-        Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion))
-      case None => None
+      case Some(leaderAndIsrStr) =>
+        SyncJSON.parseFull(leaderAndIsrStr) match {
+          case Some(m) =>
+            val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
+            val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+            val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
+            val isr = Utils.getCSVList(isrString).map(r => r.toInt)
+            val zkPathVersion = stat.getVersion
+            debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
+              isr.toString(), zkPathVersion, topic, partition))
+            Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion))
+          case None => None
+        }
+      case None => None // TODO: Handle if leader and isr info is not available in zookeeper
     }
   }
 
@@ -301,13 +295,11 @@ object ZkUtils extends Logging {
   def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
     try {
       val stat = client.writeData(path, data, expectVersion)
-      info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d"
-        .format(path, data, expectVersion, stat.getVersion))
+      debug("Conditional update to the zookeeper path %s with expected version %d succeeded and returned the new version: %d".format(path, expectVersion, stat.getVersion))
       (true, stat.getVersion)
     } catch {
       case e: Exception =>
-        error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path, data,
-          expectVersion), e)
+        debug("Conditional update to the zookeeper path %s with expected version %d failed".format(path, expectVersion), e)
         (false, -1)
     }
   }
@@ -519,66 +511,6 @@ object ZkUtils extends Logging {
     }.flatten[(String, Int)].toSeq
   }
 
-  def getPartitionsBeingReassigned(zkClient: ZkClient): Map[(String, Int), ReassignedPartitionsContext] = {
-    // read the partitions and their new replica list
-    val jsonPartitionMapOpt = readDataMaybeNull(zkClient, ReassignPartitionsPath)._1
-    jsonPartitionMapOpt match {
-      case Some(jsonPartitionMap) =>
-        val reassignedPartitions = parsePartitionReassignmentData(jsonPartitionMap)
-        reassignedPartitions.map { p =>
-          val newReplicas = p._2
-          (p._1 -> new ReassignedPartitionsContext(newReplicas))
-        }
-      case None => Map.empty[(String, Int), ReassignedPartitionsContext]
-    }
-  }
-
-  def parsePartitionReassignmentData(jsonData: String):Map[(String, Int), Seq[Int]] = {
-    SyncJSON.parseFull(jsonData) match {
-      case Some(m) =>
-        val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
-        replicaMap.map { reassignedPartitions =>
-          val topic = reassignedPartitions._1.split(",").head
-          val partition = reassignedPartitions._1.split(",").last.toInt
-          val newReplicas = reassignedPartitions._2.map(_.toInt)
-          (topic, partition) -> newReplicas
-        }
-      case None => Map.empty[(String, Int), Seq[Int]]
-    }
-  }
-
-  def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[(String, Int), Seq[Int]]) {
-    val zkPath = ZkUtils.ReassignPartitionsPath
-    partitionsToBeReassigned.size match {
-      case 0 => // need to delete the /admin/reassign_partitions path
-        deletePath(zkClient, zkPath)
-        info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
-      case _ =>
-        val jsonData = Utils.mapToJson(partitionsToBeReassigned.map(p => ("%s,%s".format(p._1._1, p._1._2)) -> p._2.map(_.toString)))
-        try {
-          updatePersistentPath(zkClient, zkPath, jsonData)
-          info("Updated partition reassignment path with %s".format(jsonData))
-        }catch {
-          case nne: ZkNoNodeException =>
-            ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
-            debug("Created path %s with %s for partition reassignment".format(zkPath, jsonData))
-          case e2 => throw new AdministrationException(e2.toString)
-        }
-    }
-  }
-
-  def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]): Seq[PartitionAndReplica] = {
-    brokerIds.map { brokerId =>
-      // read all the partitions and their assigned replicas into a map organized by
-      // { replica id -> partition 1, partition 2...
-      val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(zkClient, topics, brokerId)
-      if(partitionsAssignedToThisBroker.size == 0)
-        info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
-      partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId))
-    }.flatten
-  }
-
-
   def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
     val brokerIdPath = BrokerIdsPath + "/" + brokerId
     zkClient.delete(brokerIdPath)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Wed Oct 10 18:42:57 2012
@@ -20,10 +20,8 @@ import junit.framework.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
-import kafka.server.{KafkaServer, KafkaConfig}
-import collection.mutable.ListBuffer
 import kafka.common.ErrorMapping
-import kafka.utils.{Utils, ZkUtils, TestUtils}
+import kafka.utils.TestUtils
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -207,182 +205,4 @@ class AdminTest extends JUnit3Suite with
         }
     }
   }
-
-  @Test
-  def testPartitionReassignmentWithLeaderInNewReplicas() {
-    val expectedReplicaAssignment = Map(0  -> List("0", "1", "2"))
-    val topic = "test"
-    // create brokers
-    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
-    // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
-    // reassign partition 0
-    val newReplicas = Seq(0, 2, 3)
-    val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
-    assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
-    // wait until reassignment is completed
-    TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned, newReplicas,
-      Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
-    }, 1000)
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    servers.foreach(_.shutdown())
-  }
-
-  @Test
-  def testPartitionReassignmentWithLeaderNotInNewReplicas() {
-    val expectedReplicaAssignment = Map(0  -> List("0", "1", "2"))
-    val topic = "test"
-    // create brokers
-    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
-    // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
-    // reassign partition 0
-    val newReplicas = Seq(1, 2, 3)
-    val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
-    assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
-    // wait until reassignment is completed
-    TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned, newReplicas,
-      Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
-    }, 1000)
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    // leader should be 2
-    servers.foreach(_.shutdown())
-  }
-
-  @Test
-  def testPartitionReassignmentNonOverlappingReplicas() {
-    val expectedReplicaAssignment = Map(0  -> List("0", "1"))
-    val topic = "test"
-    // create brokers
-    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
-    // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
-    // reassign partition 0
-    val newReplicas = Seq(2, 3)
-    val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
-    assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
-    // wait until reassignment is completed
-    TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned, newReplicas,
-      Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
-    }, 1000)
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
-    // leader should be 2
-    servers.foreach(_.shutdown())
-  }
-
-  @Test
-  def testReassigningNonExistingPartition() {
-    val topic = "test"
-    // create brokers
-    val servers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
-    // reassign partition 0
-    val newReplicas = Seq(2, 3)
-    val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
-    assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
-    val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient)
-    assertFalse("Partition should not be reassigned", reassignedPartitions.contains((topic, partitionToBeReassigned)))
-    // leader should be 2
-    servers.foreach(_.shutdown())
-  }
-
-  @Test
-  def testResumePartitionReassignmentThatWasCompleted() {
-    val expectedReplicaAssignment = Map(0  -> List("0", "1"))
-    val topic = "test"
-    // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
-    // put the partition in the reassigned path as well
-    // reassign partition 0
-    val newReplicas = Seq(0, 1)
-    val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
-    reassignPartitionsCommand.reassignPartitions
-    // create brokers
-    val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new KafkaConfig(b)))
-    TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000)
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
-    servers.foreach(_.shutdown())
-  }
-
-  @Test
-  def testResumePartitionReassignmentAfterLeaderWasMoved() {
-    var expectedReplicaAssignment = Map(0  -> List(1, 0, 2, 3))
-    val leaderForPartitionMap = Map(0 -> 2)
-    val topic = "test"
-    val serverConfigs = TestUtils.createBrokerConfigs(4).map(b => new KafkaConfig(b))
-    val servers = new ListBuffer[KafkaServer]
-    // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic,
-      expectedReplicaAssignment.map(r => (r._1) -> r._2.map(_.toString)), zkClient)
-    // bring up just brokers 0 and 1
-    servers.append(TestUtils.createServer(serverConfigs(0)))
-    servers.append(TestUtils.createServer(serverConfigs(1)))
-    val newReplicas = Seq(2, 3)
-    val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned) -> newReplicas))
-    reassignPartitionsCommand.reassignPartitions
-    // this partition reassignment request should be ignored since replicas 2 and 3 are not online
-    // and the admin path should be deleted as well
-    TestUtils.waitUntilTrue(checkIfReassignmentPathIsDeleted, 1000)
-    var assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    assertEquals("Partition should not be reassigned to 2, 3 yet", expectedReplicaAssignment(0), assignedReplicas)
-    // create brokers
-    servers.append(TestUtils.createServer(serverConfigs(2)))
-    servers.append(TestUtils.createServer(serverConfigs(3)))
-    // wait until new replicas catch up with leader
-    TestUtils.waitUntilTrue(checkIfNewReplicasInIsr, 2000)
-    // change the assigned replicas to 0 and 1
-    updateAssignedReplicasForPartition("test", 0, List(0, 1))
-    // reissue the partition reassignment
-    reassignPartitionsCommand.reassignPartitions
-    // create leaders for the partition to be reassigned
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
-    // bounce controller
-    servers.head.shutdown()
-    servers.head.startup()
-    TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1500)
-    assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
-    servers.foreach(_.shutdown())
-  }
-
-  private def checkIfReassignPartitionPathExists(): Boolean = {
-    ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
-  }
-
-  private def checkIfReassignmentPathIsDeleted(): Boolean = {
-    !ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
-  }
-
-  private def checkIfNewReplicasInIsr(): Boolean = {
-    val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, "test", 0)
-    leaderAndIsrOpt match {
-      case Some(leaderAndIsr) =>
-        if(leaderAndIsr.isr.contains(2) && leaderAndIsr.isr.contains(3))
-          true
-        else
-          false
-      case None => false
-    }
-  }
-
-  private def updateAssignedReplicasForPartition(topic: String, partition: Int, newAssignedReplicas: Seq[Int]) {
-    val zkPath = ZkUtils.getTopicPath(topic)
-    val jsonPartitionMap = Utils.mapToJson(Map(partition.toString -> newAssignedReplicas.map(_.toString)))
-    ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
-  }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala Wed Oct 10 18:42:57 2012
@@ -95,7 +95,7 @@ object SerializationTestUtils{
   }
 
   def createTestStopReplicaRequest() : StopReplicaRequest = {
-    new StopReplicaRequest(collection.immutable.Set((topic1, 0), (topic2, 0)))
+    new StopReplicaRequest(Set((topic1, 0), (topic2, 0)))
   }
 
   def createTestStopReplicaResponse() : StopReplicaResponse = {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala Wed Oct 10 18:42:57 2012
@@ -29,7 +29,7 @@ class FileMessageSetTest extends BaseMes
   val messageSet = createMessageSet(messages)
   
   def createMessageSet(messages: Seq[Message]): FileMessageSet = {
-    val set = new FileMessageSet(tempFile(), true)
+    val set = new FileMessageSet(tempFile())
     set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
     set.flush()
     set

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Wed Oct 10 18:42:57 2012
@@ -30,7 +30,7 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
 import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 import kafka.utils.TestUtils._
-import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.common.{ErrorMapping, TopicAndPartition, UnknownTopicOrPartitionException}
 
 object LogOffsetTest {
   val random = new Random()  

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala Wed Oct 10 18:42:57 2012
@@ -15,10 +15,10 @@ class LogSegmentTest extends JUnit3Suite
   
   def createSegment(offset: Long): LogSegment = {
     val msFile = TestUtils.tempFile()
-    val ms = new FileMessageSet(msFile, true)
+    val ms = new FileMessageSet(msFile)
     val idxFile = TestUtils.tempFile()
     idxFile.delete()
-    val idx = new OffsetIndex(idxFile, offset, true, 100)
+    val idx = new OffsetIndex(idxFile, offset, 100)
     val seg = new LogSegment(ms, idx, offset, 10, SystemTime)
     segments += seg
     seg

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Wed Oct 10 18:42:57 2012
@@ -342,6 +342,33 @@ class LogTest extends JUnitSuite {
     assertEquals("Should change log size", log.size, 0)
   }
 
+  @Test
+  def testReopenThenTruncate() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+
+    // create a log
+    var log = new Log(logDir, 
+                      maxLogFileSize = set.sizeInBytes * 5, 
+                      maxMessageSize = config.maxMessageSize, 
+                      maxIndexSize = 1000, 
+                      indexIntervalBytes = 10000, 
+                      needsRecovery = true)
+    
+    // add enough messages to roll over several segments then close and re-open and attempt to truncate
+    for(i <- 0 until 100)
+      log.append(set)
+    log.close()
+    log = new Log(logDir, 
+                  maxLogFileSize = set.sizeInBytes * 5, 
+                  maxMessageSize = config.maxMessageSize, 
+                  maxIndexSize = 1000, 
+                  indexIntervalBytes = 10000, 
+                  needsRecovery = true)
+    log.truncateTo(3)
+    assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments)
+    assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
+  }
+  
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
       case Some(range) => 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala Wed Oct 10 18:42:57 2012
@@ -33,7 +33,7 @@ class OffsetIndexTest extends JUnitSuite
   
   @Before
   def setup() {
-    this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, mutable = true, maxIndexSize = 30 * 8)
+    this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
   }
   
   @After
@@ -41,7 +41,7 @@ class OffsetIndexTest extends JUnitSuite
     if(this.idx != null)
       this.idx.file.delete()
   }
-
+  
   @Test
   def randomLookupTest() {
     assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset, 0), idx.lookup(92L))
@@ -88,25 +88,6 @@ class OffsetIndexTest extends JUnitSuite
     }
     assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalStateException])
   }
-
-  
-  @Test
-  def testReadOnly() {
-    /* add some random values */
-    val vals = List((49, 1), (52, 2), (55, 3))
-    for((logical, physical) <- vals)
-      idx.append(logical, physical)
-    
-    idx.makeReadOnly()
-    
-    assertEquals("File length should just contain added entries.", vals.size * 8L, idx.file.length())
-    assertEquals("Last offset field should be initialized", vals.last._1, idx.lastOffset)
-    
-    for((logical, physical) <- vals)
-    	assertEquals("Should still be able to find everything.", OffsetPosition(logical, physical), idx.lookup(logical))
-    	
-    assertWriteFails("Append should fail on read-only index", idx, 60, classOf[IllegalStateException])
-  }
   
   @Test(expected = classOf[IllegalArgumentException])
   def appendOutOfOrder() {
@@ -115,13 +96,13 @@ class OffsetIndexTest extends JUnitSuite
   }
   
   @Test
-  def reopenAsReadonly() {
+  def testReopen() {
     val first = OffsetPosition(51, 0)
     val sec = OffsetPosition(52, 1)
     idx.append(first.offset, first.position)
     idx.append(sec.offset, sec.position)
     idx.close()
-    val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset, mutable = false)
+    val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset)
     assertEquals(first, idxRo.lookup(first.offset))
     assertEquals(sec, idxRo.lookup(sec.offset))
     assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalStateException])
@@ -129,7 +110,8 @@ class OffsetIndexTest extends JUnitSuite
   
   @Test
   def truncate() {
-	val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, mutable = true, maxIndexSize = 10 * 8)
+	val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
+	idx.truncate()
     for(i <- 1 until 10)
       idx.append(i, i)
       
@@ -155,13 +137,6 @@ class OffsetIndexTest extends JUnitSuite
       case e: Exception => assertEquals("Got an unexpected exception.", klass, e.getClass)
     }
   }
-  
-  def makeIndex(baseOffset: Long, mutable: Boolean, vals: Seq[(Long, Int)]): OffsetIndex = {
-    val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = baseOffset, mutable = mutable, maxIndexSize = 2 * vals.size * 8)
-    for ((logical, physical) <- vals)
-      idx.append(logical, physical)
-    idx
-  }
 
   def monotonicSeq(base: Int, len: Int): Seq[Int] = {
     val rand = new Random(1L)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala Wed Oct 10 18:42:57 2012
@@ -67,7 +67,7 @@ trait BaseMessageSetTestCases extends JU
       val channel = new RandomAccessFile(file, "rw").getChannel()
       val written = set.writeTo(channel, 0, 1024)
       assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
-      val newSet = new FileMessageSet(file, channel, false)
+      val newSet = new FileMessageSet(file, channel)
       checkEquals(set.iterator, newSet.iterator)
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala Wed Oct 10 18:42:57 2012
@@ -60,6 +60,7 @@ class MessageCompressionTest extends JUn
       true
     } catch {
       case e: UnsatisfiedLinkError => false
+      case e: org.xerial.snappy.SnappyError => false
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala Wed Oct 10 18:42:57 2012
@@ -35,20 +35,20 @@ class KafkaTimerTest extends JUnit3Suite
     timer.time {
       clock.addMillis(1000)
     }
-    assertEquals(1, metric.count())
-    assertTrue((metric.max() - 1000).abs <= Double.Epsilon)
-    assertTrue((metric.min() - 1000).abs <= Double.Epsilon)
+    assertEquals(1, metric.getCount())
+    assertTrue((metric.getMax() - 1000).abs <= Double.Epsilon)
+    assertTrue((metric.getMin() - 1000).abs <= Double.Epsilon)
   }
 
   private class ManualClock extends Clock {
 
     private var ticksInNanos = 0L
 
-    override def tick() = {
+    override def getTick() = {
       ticksInNanos
     }
 
-    override def time() = {
+    override def getTime() = {
       TimeUnit.NANOSECONDS.toMillis(ticksInNanos)
     }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Wed Oct 10 18:42:57 2012
@@ -70,22 +70,20 @@ class LeaderElectionTest extends JUnit3S
     // kill the server hosting the preferred replica
     servers.last.shutdown()
     // check if leader moves to the other server
-    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500,
-      if(leader1.get == 0) None else leader1)
+    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, if(leader1.get == 0) None else leader1)
     val leaderEpoch2 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
     debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
     debug("leader Epoc: " + leaderEpoch2)
     assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
     if(leader1.get == leader2.get)
-      assertEquals("Second epoch value should be " + leaderEpoch1+1, leaderEpoch1+1, leaderEpoch2)
+      assertEquals("Second epoch value should be " + leaderEpoch1, leaderEpoch1, leaderEpoch2)
     else
       assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2)
 
     servers.last.startup()
     servers.head.shutdown()
     Thread.sleep(zookeeper.tickTime)
-    val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500,
-      if(leader2.get == 1) None else leader2)
+    val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, if(leader2.get == 1) None else leader2)
     val leaderEpoch3 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
     debug("leader Epoc: " + leaderEpoch3)
     debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala Wed Oct 10 18:42:57 2012
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package kafka.server
 
@@ -69,10 +69,9 @@ class RequestPurgatoryTest extends JUnit
     purgatory.watch(r2)
     purgatory.awaitExpiration(r1)
     val elapsed = System.currentTimeMillis - start
-    println("Start = %d, Elapsed = %d".format(start, elapsed))
     assertTrue("r1 expired", purgatory.expired.contains(r1))
     assertTrue("r2 hasn't expired", !purgatory.expired.contains(r2))
-    assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L)
+    assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration)
   }
   
   class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Wed Oct 10 18:42:57 2012
@@ -389,8 +389,7 @@ object TestUtils extends Logging {
             newLeaderAndISR.leaderEpoch += 1
             newLeaderAndISR.zkVersion += 1
           }
-          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-            newLeaderAndISR.toString)
+          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath( topic, partition), newLeaderAndISR.toString)
         } catch {
           case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe)
         }

Modified: incubator/kafka/branches/0.8/project/build/KafkaProject.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/project/build/KafkaProject.scala?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/project/build/KafkaProject.scala (original)
+++ incubator/kafka/branches/0.8/project/build/KafkaProject.scala Wed Oct 10 18:42:57 2012
@@ -66,17 +66,42 @@ class KafkaProject(info: ProjectInfo) ex
         <scope>compile</scope>
       </dependency>
 
+    def metricsDeps =
+      <dependencies>
+        <dependency>
+          <groupId>com.yammer.metrics</groupId>
+          <artifactId>metrics-core</artifactId>
+          <version>3.0.0-10ccc80c</version>
+          <scope>compile</scope>
+        </dependency>
+        <dependency>
+          <groupId>com.yammer.metrics</groupId>
+          <artifactId>metrics-annotations</artifactId>
+          <version>3.0.0-10ccc80c</version>
+          <scope>compile</scope>
+        </dependency>
+      </dependencies>
+
     object ZkClientDepAdder extends RuleTransformer(new RewriteRule() {
       override def transform(node: Node): Seq[Node] = node match {
         case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
-          Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep :_*)
+          Elem(prefix, "dependencies", attribs, scope, deps ++ zkClientDep:_*)
+        }
+        case other => other
+      }
+    })
+
+    object MetricsDepAdder extends RuleTransformer(new RewriteRule() {
+      override def transform(node: Node): Seq[Node] = node match {
+        case Elem(prefix, "dependencies", attribs, scope, deps @ _*) => {
+          Elem(prefix, "dependencies", attribs, scope, deps ++ metricsDeps:_*)
         }
         case other => other
       }
     })
 
     override def pomPostProcess(pom: Node): Node = {
-      ZkClientDepAdder(pom)
+      MetricsDepAdder(ZkClientDepAdder(pom))
     }
 
     override def artifactID = "kafka"
@@ -251,7 +276,6 @@ class KafkaProject(info: ProjectInfo) ex
   trait CoreDependencies {
     val log4j = "log4j" % "log4j" % "1.2.15"
     val jopt = "net.sf.jopt-simple" % "jopt-simple" % "3.2"
-    val metricsCore = "com.yammer.metrics" % "metrics-core" % "latest.release"
     val slf4jSimple = "org.slf4j" % "slf4j-simple" % "latest.release"
   }
   

Modified: incubator/kafka/branches/0.8/system_test/testcase_to_run.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/testcase_to_run.json?rev=1396726&r1=1396725&r2=1396726&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/testcase_to_run.json (original)
+++ incubator/kafka/branches/0.8/system_test/testcase_to_run.json Wed Oct 10 18:42:57 2012
@@ -1,5 +1,6 @@
 {
-    "ReplicaBasicTest"   : [
+    "ReplicaBasicTest"  : [
+        "testcase_0001",
         "testcase_1"
     ]
 }



Mime
View raw message