kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cburrou...@apache.org
Subject svn commit: r1178670 - in /incubator/kafka/trunk/core/src: main/scala/kafka/consumer/ main/scala/kafka/network/ main/scala/kafka/utils/ test/scala/unit/kafka/integration/ test/scala/unit/kafka/javaapi/integration/ test/scala/unit/kafka/log/ test/scala/...
Date Tue, 04 Oct 2011 01:41:25 GMT
Author: cburroughs
Date: Tue Oct  4 01:41:25 2011
New Revision: 1178670

URL: http://svn.apache.org/viewvc?rev=1178670&view=rev
Log:
obsessive compulsive tag team: Replace tabs with spaces
patch by jkreps; reviewed by cburroughs for KAFKA-114

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1178670&r1=1178669&r2=1178670&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Tue Oct
 4 01:41:25 2011
@@ -75,7 +75,7 @@ object ConsoleConsumer {
                            .describedAs("prop")
                            .ofType(classOf[String])
     val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already
have an established offset to consume from, " +
-    		"start with the earliest message present in the log rather than the latest message.")
+        "start with the earliest message present in the log rather than the latest message.")
     val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval
at which to save the current offset in ms")
                            .withRequiredArg
                            .describedAs("ms")
@@ -86,7 +86,7 @@ object ConsoleConsumer {
                            .describedAs("num_messages")
                            .ofType(classOf[java.lang.Integer])
     val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error
when processing a message, " +
-    		"skip it instead of halt.")
+        "skip it instead of halt.")
 
     val options: OptionSet = tryParse(parser, args)
     checkRequiredArgs(parser, options, topicIdOpt, zkConnectOpt)
@@ -170,9 +170,9 @@ object ConsoleConsumer {
   
   def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*)
{
     for(arg <- required) {
-    	if(!options.has(arg)) {
+      if(!options.has(arg)) {
         logger.error("Missing required argument \"" + arg + "\"")
-    		parser.printHelpOn(System.err)
+        parser.printHelpOn(System.err)
         System.exit(1)
       }
     }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1178670&r1=1178669&r2=1178670&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Tue Oct
 4 01:41:25 2011
@@ -66,8 +66,8 @@ class ConsumerIterator[T](private val to
       if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
         if(logger.isDebugEnabled)
           logger.debug("Received the shutdown command")
-    	  channel.offer(currentDataChunk)
-        return allDone()
+        channel.offer(currentDataChunk)
+        return allDone
       } else {
         currentTopicInfo = currentDataChunk.topicInfo
         if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala?rev=1178670&r1=1178669&r2=1178670&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala Tue Oct  4
01:41:25 2011
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -35,17 +35,17 @@ import kafka.api.RequestKeys
  *   N Processor threads that each have their own selectors and handle all requests from
their connections synchronously
  */
 class SocketServer(val port: Int,
-                   val numProcessorThreads: Int, 
+                   val numProcessorThreads: Int,
                    monitoringPeriodSecs: Int,
                    private val handlerFactory: Handler.HandlerMapping,
                    val maxRequestSize: Int = Int.MaxValue) {
- 
+
   private val logger = Logger.getLogger(classOf[SocketServer])
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
   private var acceptor: Acceptor = new Acceptor(port, processors)
   val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
-  
+
   /**
    * Start the socket server
    */
@@ -57,7 +57,7 @@ class SocketServer(val port: Int,
     Utils.newThread("kafka-acceptor", acceptor, false).start()
     acceptor.awaitStartup
   }
-  
+
   /**
    * Shutdown the socket server
    */
@@ -66,20 +66,20 @@ class SocketServer(val port: Int,
     for(processor <- processors)
       processor.shutdown
   }
-    
+
 }
 
 /**
  * A base class with some helper variables and methods
  */
 private[kafka] abstract class AbstractServerThread extends Runnable {
-  
+
   protected val selector = Selector.open();
   protected val logger = Logger.getLogger(getClass())
   private val startupLatch = new CountDownLatch(1)
   private val shutdownLatch = new CountDownLatch(1)
-  private val alive = new AtomicBoolean(false) 
-  
+  private val alive = new AtomicBoolean(false)
+
   /**
    * Initiates a graceful shutdown by signeling to stop and waiting for the shutdown to complete
    */
@@ -88,17 +88,17 @@ private[kafka] abstract class AbstractSe
     selector.wakeup
     shutdownLatch.await
   }
-  
+
   /**
    * Wait for the thread to completely start up
    */
   def awaitStartup(): Unit = startupLatch.await
-  
+
   /**
    * Record that the thread startup is complete
    */
   protected def startupComplete() = {
-    alive.set(true)  
+    alive.set(true)
     startupLatch.countDown
   }
 
@@ -106,52 +106,52 @@ private[kafka] abstract class AbstractSe
    * Record that the thread shutdown is complete
    */
   protected def shutdownComplete() = shutdownLatch.countDown
-  
+
   /**
    * Is the server still running?
    */
   protected def isRunning = alive.get
-  
+
 }
 
 /**
  * Thread that accepts and configures new connections. There is only need for one of these
  */
 private[kafka] class Acceptor(val port: Int, private val processors: Array[Processor]) extends
AbstractServerThread {
-  
+
   /**
    * Accept loop that checks for new connection attempts
    */
-  def run() {	 
+  def run() {
     val serverChannel = ServerSocketChannel.open()
-	  serverChannel.configureBlocking(false)
-	  serverChannel.socket.bind(new InetSocketAddress(port))
-	  serverChannel.register(selector, SelectionKey.OP_ACCEPT);
- 	  logger.info("Awaiting connections on port " + port)
+    serverChannel.configureBlocking(false)
+    serverChannel.socket.bind(new InetSocketAddress(port))
+    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+    logger.info("Awaiting connections on port " + port)
     startupComplete()
-	
-	  var currentProcessor = 0
+
+    var currentProcessor = 0
     while(isRunning) {
       val ready = selector.select(500)
       if(ready > 0) {
-  	    val keys = selector.selectedKeys()
-  	    val iter = keys.iterator()
-  	    while(iter.hasNext && isRunning) {
-  	      var key: SelectionKey = null
-  	      try {
-  	        key = iter.next
-  	        iter.remove()
-  	      
-  	        if(key.isAcceptable)
+        val keys = selector.selectedKeys()
+        val iter = keys.iterator()
+        while(iter.hasNext && isRunning) {
+          var key: SelectionKey = null
+          try {
+            key = iter.next
+            iter.remove()
+
+            if(key.isAcceptable)
                 accept(key, processors(currentProcessor))
               else
                 throw new IllegalStateException("Unrecognized key state for acceptor thread.")
-         
+
               // round robin to the next processor thread
               currentProcessor = (currentProcessor + 1) % processors.length
-  	      } catch {
-  	        case e: Throwable => logger.error("Error in acceptor", e)
-  	      }
+          } catch {
+            case e: Throwable => logger.error("Error in acceptor", e)
+          }
         }
       }
     }
@@ -160,7 +160,7 @@ private[kafka] class Acceptor(val port: 
     Utils.swallow(logger.error, selector.close())
     shutdownComplete()
   }
-  
+
   /*
    * Accept a new connection
    */
@@ -169,10 +169,10 @@ private[kafka] class Acceptor(val port: 
     if(logger.isDebugEnabled)
       logger.info("Accepted connection from " + socketChannel.socket.getInetAddress() + "
on " + socketChannel.socket.getLocalSocketAddress)
     socketChannel.configureBlocking(false)
-	  socketChannel.socket().setTcpNoDelay(true)
+    socketChannel.socket().setTcpNoDelay(true)
     processor.accept(socketChannel)
   }
-  
+
 }
 
 /**
@@ -180,10 +180,10 @@ private[kafka] class Acceptor(val port: 
  * each of which has its own selectors
  */
 private[kafka] class Processor(val handlerMapping: Handler.HandlerMapping,
-                               val time: Time, 
+                               val time: Time,
                                val stats: SocketServerStats,
                                val maxRequestSize: Int) extends AbstractServerThread {
-  
+
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -192,34 +192,34 @@ private[kafka] class Processor(val handl
     while(isRunning) {
       // setup any new connections that have been queued up
       configureNewConnections()
-      
+
       val ready = selector.select(500)
       if(ready > 0) {
-		    val keys = selector.selectedKeys()
-		    val iter = keys.iterator()
-		    while(iter.hasNext && isRunning) {
-		      var key: SelectionKey = null
-		      try {
-		        key = iter.next
-		        iter.remove()
-		      
-		        if(key.isReadable)
-		          read(key)
+        val keys = selector.selectedKeys()
+        val iter = keys.iterator()
+        while(iter.hasNext && isRunning) {
+          var key: SelectionKey = null
+          try {
+            key = iter.next
+            iter.remove()
+
+            if(key.isReadable)
+              read(key)
             else if(key.isWritable)
               write(key)
             else if(!key.isValid)
               close(key)
             else
               throw new IllegalStateException("Unrecognized key state for processor thread.")
-		      } catch {
-		      	case e: EOFException => {
-		      		logger.info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
-		      		close(key)
-				}
-				case e: InvalidRequestException => {
-					logger.info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress,
e.getMessage))
-					close(key)
-		      	} case e: Throwable => {
+          } catch {
+            case e: EOFException => {
+              logger.info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
+              close(key)
+        }
+        case e: InvalidRequestException => {
+          logger.info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress,
e.getMessage))
+          close(key)
+            } case e: Throwable => {
               logger.error("Closing socket for " + channelFor(key).socket.getInetAddress
+ " because of error", e)
               close(key)
             }
@@ -231,7 +231,7 @@ private[kafka] class Processor(val handl
     Utils.swallow(logger.info, selector.close())
     shutdownComplete()
   }
-  
+
   private def close(key: SelectionKey) {
     val channel = key.channel.asInstanceOf[SocketChannel]
     if(logger.isDebugEnabled)
@@ -241,7 +241,7 @@ private[kafka] class Processor(val handl
     key.attach(null)
     Utils.swallow(logger.info, key.cancel())
   }
-  
+
   /**
    * Queue up a new connection for reading
    */
@@ -249,7 +249,7 @@ private[kafka] class Processor(val handl
     newConnections.add(socketChannel)
     selector.wakeup()
   }
-  
+
   /**
    * Register any new connections that have been queued up
    */
@@ -261,7 +261,7 @@ private[kafka] class Processor(val handl
       channel.register(selector, SelectionKey.OP_READ)
     }
   }
-  
+
   /**
    * Handle a completed request producing an optional response
    */
@@ -290,7 +290,7 @@ private[kafka] class Processor(val handl
     stats.recordRequest(requestTypeId, time.nanoseconds - start)
     maybeSend
   }
-  
+
   /*
    * Process reads from ready sockets
    */
@@ -322,7 +322,7 @@ private[kafka] class Processor(val handl
       selector.wakeup()
     }
   }
-  
+
   /*
    * Process writes to ready sockets
    */
@@ -341,7 +341,7 @@ private[kafka] class Processor(val handl
       selector.wakeup()
     }
   }
-  
+
   private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel]
 
 }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala?rev=1178670&r1=1178669&r2=1178670&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/Transmission.scala Tue Oct  4
01:41:25 2011
@@ -91,7 +91,7 @@ abstract class MultiSend[S <: Send](val 
   var totalWritten = 0
 
   def writeTo(channel: WritableByteChannel): Int = {
-	  expectIncomplete
+    expectIncomplete
     val written = current.head.writeTo(channel)
     totalWritten += written
     if(current.head.complete)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala?rev=1178670&r1=1178669&r2=1178670&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/IteratorTemplate.scala Tue Oct 
4 01:41:25 2011
@@ -25,7 +25,7 @@ object FAILED extends State
 
 /**
  * Transliteration of the iterator template in google collections. To implement an iterator
- * override makeNext and call allDone() when there is no more items	
+ * override makeNext and call allDone() when there is no more items
  */
 abstract class IteratorTemplate[T] extends Iterator[T] with java.util.Iterator[T] {
   

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1178670&r1=1178669&r2=1178670&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
Tue Oct  4 01:41:25 2011
@@ -104,7 +104,7 @@ class LazyInitProducerTest extends JUnit
       Thread.sleep(200)
       val response = consumer.multifetch(fetches: _*)
       for((topic, resp) <- topics.zip(response.toList))
-    	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+        TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
     }
 
     {
@@ -117,7 +117,7 @@ class LazyInitProducerTest extends JUnit
       try {
         val responses = consumer.multifetch(fetches: _*)
         for(resp <- responses)
-    	    resp.iterator
+          resp.iterator
       }
       catch {
         case e: OffsetOutOfRangeException => exceptionThrown = true
@@ -149,7 +149,7 @@ class LazyInitProducerTest extends JUnit
     Thread.sleep(200)
     val response = consumer.multifetch(fetches: _*)
     for((topic, resp) <- topics.zip(response.toList))
-  	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+      TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
   }
 
   def testMultiProduceResend() {
@@ -180,6 +180,6 @@ class LazyInitProducerTest extends JUnit
       TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator,
                                                       messages(topic).map(m => m.message).iterator),
                             resp.map(m => m.message).iterator)
-//  	  TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator),
resp.iterator)
+//      TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator),
resp.iterator)
   }
 }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1178670&r1=1178669&r2=1178670&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
Tue Oct  4 01:41:25 2011
@@ -103,7 +103,7 @@ class PrimitiveApiTest extends JUnit3Sui
       Thread.sleep(700)
       val response = consumer.multifetch(fetches: _*)
       for((topic, resp) <- topics.zip(response.toList))
-    	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+        TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
     }
 
     // temporarily set request handler logger to a higher level
@@ -118,7 +118,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val responses = consumer.multifetch(fetches: _*)
         for(resp <- responses)
-    	    resp.iterator
+          resp.iterator
         fail("expect exception")
       }
       catch {
@@ -135,7 +135,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val responses = consumer.multifetch(fetches: _*)
         for(resp <- responses)
-    	    resp.iterator
+          resp.iterator
         fail("expect exception")
       }
       catch {
@@ -166,7 +166,7 @@ class PrimitiveApiTest extends JUnit3Sui
       Thread.sleep(200)
       val response = consumer.multifetch(fetches: _*)
       for((topic, resp) <- topics.zip(response.toList))
-    	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+        TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
     }
 
     // temporarily set request handler logger to a higher level
@@ -181,7 +181,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val responses = consumer.multifetch(fetches: _*)
         for(resp <- responses)
-    	    resp.iterator
+          resp.iterator
         fail("expect exception")
       }
       catch {
@@ -198,7 +198,7 @@ class PrimitiveApiTest extends JUnit3Sui
       try {
         val responses = consumer.multifetch(fetches: _*)
         for(resp <- responses)
-    	    resp.iterator
+          resp.iterator
         fail("expect exception")
       }
       catch {
@@ -232,7 +232,7 @@ class PrimitiveApiTest extends JUnit3Sui
     Thread.sleep(200)
     val response = consumer.multifetch(fetches: _*)
     for((topic, resp) <- topics.zip(response.toList))
-  	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+      TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
   }
 
   def testMultiProduceWithCompression() {
@@ -257,6 +257,6 @@ class PrimitiveApiTest extends JUnit3Sui
     Thread.sleep(200)
     val response = consumer.multifetch(fetches: _*)
     for((topic, resp) <- topics.zip(response.toList))
-  	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+      TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
   }
 }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala?rev=1178670&r1=1178669&r2=1178670&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
Tue Oct  4 01:41:25 2011
@@ -152,7 +152,7 @@ class PrimitiveApiTest extends JUnit3Sui
       for(topic <- topics) {
         if (iter.hasNext) {
           val resp = iter.next
-      	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+          TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
         }
         else
           fail("fewer responses than expected")
@@ -172,7 +172,7 @@ class PrimitiveApiTest extends JUnit3Sui
         val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
         val iter = responses.iterator
         while (iter.hasNext)
-    	    iter.next.iterator
+          iter.next.iterator
         fail("expect exception")
       }
       catch {
@@ -190,7 +190,7 @@ class PrimitiveApiTest extends JUnit3Sui
         val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
         val iter = responses.iterator
         while (iter.hasNext)
-    	    iter.next.iterator
+          iter.next.iterator
         fail("expect exception")
       }
       catch {
@@ -225,7 +225,7 @@ class PrimitiveApiTest extends JUnit3Sui
       for(topic <- topics) {
         if (iter.hasNext) {
           val resp = iter.next
-      	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+          TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
         }
         else
           fail("fewer responses than expected")
@@ -245,7 +245,7 @@ class PrimitiveApiTest extends JUnit3Sui
         val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
         val iter = responses.iterator
         while (iter.hasNext)
-    	    iter.next.iterator
+          iter.next.iterator
         fail("expect exception")
       }
       catch {
@@ -263,7 +263,7 @@ class PrimitiveApiTest extends JUnit3Sui
         val responses = consumer.multifetch(getFetchRequestList(fetches: _*))
         val iter = responses.iterator
         while (iter.hasNext)
-    	    iter.next.iterator
+          iter.next.iterator
         fail("expect exception")
       }
       catch {
@@ -298,7 +298,7 @@ class PrimitiveApiTest extends JUnit3Sui
       for(topic <- topics) {
         if (iter.hasNext) {
           val resp = iter.next
-      	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+          TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
         }
         else
           fail("fewer responses than expected")
@@ -329,7 +329,7 @@ class PrimitiveApiTest extends JUnit3Sui
       for(topic <- topics) {
         if (iter.hasNext) {
           val resp = iter.next
-      	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+          TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
         }
         else
           fail("fewer responses than expected")

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1178670&r1=1178669&r2=1178670&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Tue Oct  4 01:41:25
2011
@@ -43,7 +43,7 @@ class LogTest extends JUnitSuite {
   
   def createEmptyLogs(dir: File, offsets: Int*) = {
     for(offset <- offsets)
-    	new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
+      new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
   }
   
   @Test

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala?rev=1178670&r1=1178669&r2=1178670&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
Tue Oct  4 01:41:25 2011
@@ -46,7 +46,7 @@ class FileMessageSetTest extends BaseMes
   @Test
   def testIterationOverPartialAndTruncation() {
     testPartialWrite(0, messageSet)
-	  testPartialWrite(2, messageSet)
+    testPartialWrite(2, messageSet)
     testPartialWrite(4, messageSet)
     testPartialWrite(5, messageSet)
     testPartialWrite(6, messageSet)
@@ -56,7 +56,7 @@ class FileMessageSetTest extends BaseMes
     val buffer = ByteBuffer.allocate(size)
     val originalPosition = messageSet.channel.position
     for(i <- 0 until size)
-    	buffer.put(0.asInstanceOf[Byte])
+      buffer.put(0.asInstanceOf[Byte])
     buffer.rewind()
     messageSet.channel.write(buffer)
     // appending those bytes should not change the contents



Mime
View raw message