kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1204764 - in /incubator/kafka/trunk/core/src: main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/server/ test/scala/unit/kafka/integration/ test/scala/unit/kafka/log/
Date Tue, 22 Nov 2011 00:35:21 GMT
Author: junrao
Date: Tue Nov 22 00:35:09 2011
New Revision: 1204764

URL: http://svn.apache.org/viewvc?rev=1204764&view=rev
Log:
Avoid creating a new topic by the consumer; patched by Taylor Gautier; reviewed by Jun Rao;
KAFKA-101

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1204764&r1=1204763&r2=1204764&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Tue Nov 22 00:35:09 2011
@@ -194,7 +194,6 @@ private[kafka] class ZookeeperConsumerCo
 
       // register on broker partition path changes
       val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic
-      ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath)
       zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
     }
 

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1204764&r1=1204763&r2=1204764&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/Log.scala Tue Nov 22 00:35:09 2011
@@ -17,11 +17,9 @@
 
 package kafka.log
 
-import java.util.concurrent.CopyOnWriteArrayList
 import java.util.concurrent.atomic._
 import java.text.NumberFormat
 import java.io._
-import java.nio.channels.FileChannel
 import org.apache.log4j._
 import kafka.message._
 import kafka.utils._
@@ -80,6 +78,13 @@ private[log] object Log {
     nf.setGroupingUsed(false)
     nf.format(offset) + Log.FileSuffix
   }
+  
+  def getEmptyOffsets(request: OffsetRequest): Array[Long] = {
+    if (request.time == OffsetRequest.LatestTime || request.time == OffsetRequest.EarliestTime)
+      return Array(0L)
+    else
+      return Array()
+  }
 }
 
 /**
@@ -352,7 +357,7 @@ private[log] class Log(val dir: File, va
     }
     ret
   }
-
+ 
   def getTopicName():String = {
     name.substring(0, name.lastIndexOf("-"))
   }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1204764&r1=1204763&r2=1204764&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Tue Nov 22 00:35:09
2011
@@ -25,6 +25,7 @@ import scala.collection._
 import java.util.concurrent.CountDownLatch
 import kafka.server.{KafkaConfig, KafkaZooKeeper}
 import kafka.common.{InvalidTopicException, InvalidPartitionException}
+import kafka.api.OffsetRequest
 
 /**
  * The guy who creates and hands out logs
@@ -135,7 +136,7 @@ private[kafka] class LogManager(val conf
       startupLatch.await
   }
 
-  def registerNewTopicInZK(topic: String) {
+  private def registerNewTopicInZK(topic: String) {
     if (config.enableZookeeper)
       zkActor ! topic 
   }
@@ -151,15 +152,10 @@ private[kafka] class LogManager(val conf
     }
   }
   
-
-  def chooseRandomPartition(topic: String): Int = {
-    random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions))
-  }
-
   /**
-   * Create the log if it does not exist, if it exists just return it
+   * Return the Pool (partitions) for a specific log
    */
-  def getOrCreateLog(topic: String, partition: Int): Log = {
+  private def getLogPool(topic: String, partition: Int): Pool[Int, Log] = {
     awaitStartup
     if (topic.length <= 0)
       throw new InvalidTopicException("topic name can't be empty")
@@ -168,8 +164,37 @@ private[kafka] class LogManager(val conf
               (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")")
       throw new InvalidPartitionException("wrong partition " + partition)
     }
+    logs.get(topic)
+  }
+
+  /**
+   * Pick a random partition from the given topic
+   */
+  def chooseRandomPartition(topic: String): Int = {
+    random.nextInt(topicPartitionsMap.getOrElse(topic, numPartitions))
+  }
+
+  def getOffsets(offsetRequest: OffsetRequest): Array[Long] = {
+    val log = getLog(offsetRequest.topic, offsetRequest.partition)
+    if (log != null) return log.getOffsetsBefore(offsetRequest)
+    Log.getEmptyOffsets(offsetRequest)
+  }
+
+  /**
+   * Get the log if exists
+   */
+  def getLog(topic: String, partition: Int): Log = {
+    val parts = getLogPool(topic, partition)
+    if (parts == null) return null
+    parts.get(partition)
+  }
+
+  /**
+   * Create the log if it does not exist, if it exists just return it
+   */
+  def getOrCreateLog(topic: String, partition: Int): Log = {
     var hasNewTopic = false
-    var parts = logs.get(topic)
+    var parts = getLogPool(topic, partition)
     if (parts == null) {
       val found = logs.putIfNotExists(topic, new Pool[Int, Log])
       if (found == null)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala?rev=1204764&r1=1204763&r2=1204764&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaRequestHandlers.scala Tue
Nov 22 00:35:09 2011
@@ -17,17 +17,13 @@
 
 package kafka.server
 
-import java.nio.channels._
 import org.apache.log4j.Logger
-import kafka.producer._
-import kafka.consumer._
 import kafka.log._
 import kafka.network._
 import kafka.message._
-import kafka.server._
 import kafka.api._
 import kafka.common.ErrorMapping
-import kafka.utils.{Utils, SystemTime}
+import kafka.utils.SystemTime
 import java.io.IOException
 
 /**
@@ -112,8 +108,11 @@ private[kafka] class KafkaRequestHandler
     var  response: MessageSetSend = null
     try {
       logger.trace("Fetching log segment for topic = " + fetchRequest.topic + " and partition
= " + fetchRequest.partition)
-      val log = logManager.getOrCreateLog(fetchRequest.topic, fetchRequest.partition)
-      response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
+      val log = logManager.getLog(fetchRequest.topic, fetchRequest.partition)
+      if (log != null)
+        response = new MessageSetSend(log.read(fetchRequest.offset, fetchRequest.maxSize))
+      else
+        response = new MessageSetSend()
     }
     catch {
       case e =>
@@ -127,8 +126,7 @@ private[kafka] class KafkaRequestHandler
     val offsetRequest = OffsetRequest.readFrom(request.buffer)
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Offset request " + offsetRequest.toString)
-    val log = logManager.getOrCreateLog(offsetRequest.topic, offsetRequest.partition)
-    val offsets = log.getOffsetsBefore(offsetRequest)
+    val offsets = logManager.getOffsets(offsetRequest)
     val response = new OffsetArraySend(offsets)
     Some(response)
   }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala?rev=1204764&r1=1204763&r2=1204764&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/MessageSetSend.scala Tue Nov 22
00:35:09 2011
@@ -42,6 +42,8 @@ private[server] class MessageSetSend(val
 
   def this(messages: MessageSet) = this(messages, ErrorMapping.NoError)
 
+  def this() = this(MessageSet.Empty)
+
   def writeTo(channel: WritableByteChannel): Int = {
     expectIncomplete()
     var written = 0

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1204764&r1=1204763&r2=1204764&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
(original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
Tue Nov 22 00:35:09 2011
@@ -29,6 +29,7 @@ import kafka.producer.{ProducerData, Pro
 import kafka.serializer.StringDecoder
 import kafka.utils.TestUtils
 import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet}
+import java.io.File
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -259,4 +260,12 @@ class PrimitiveApiTest extends JUnit3Sui
     for((topic, resp) <- topics.zip(response.toList))
       TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
   }
+
+  def testConsumerNotExistTopic() {
+    val newTopic = "new-topic"
+    val messageSetIter = consumer.fetch(new FetchRequest(newTopic, 0, 0, 10000)).iterator
+    assertTrue(messageSetIter.hasNext == false)
+    val logFile = new File(config.logDir, newTopic + "-0")
+    assertTrue(!logFile.exists)
+  }
 }

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1204764&r1=1204763&r2=1204764&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Tue Nov
22 00:35:09 2011
@@ -53,10 +53,20 @@ class LogManagerTest extends JUnitSuite 
   
   @Test
   def testCreateLog() {
-    val log = logManager.getOrCreateLog("kafka", 0)
+    val name = "kafka"
+    val log = logManager.getOrCreateLog(name, 0)
+    val logFile = new File(config.logDir, name + "-0")
+    assertTrue(logFile.exists)
     log.append(TestUtils.singleMessageSet("test".getBytes()))
   }
 
+  @Test
+  def testGetLog() {
+    val name = "kafka"
+    val log = logManager.getLog(name, 0)
+    val logFile = new File(config.logDir, name + "-0")
+    assertTrue(!logFile.exists)
+  }
 
   @Test
   def testCleanupExpiredSegments() {

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1204764&r1=1204763&r2=1204764&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Tue Nov 22
00:35:09 2011
@@ -17,10 +17,8 @@
 
 package kafka.log
 
-import junit.framework.TestCase
 import java.io.File
-import kafka.utils.TestUtils
-import kafka.utils.Utils
+import kafka.utils._
 import kafka.server.{KafkaConfig, KafkaServer}
 import junit.framework.Assert._
 import java.util.{Random, Properties}
@@ -30,6 +28,7 @@ import kafka.consumer.SimpleConsumer
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import org.apache.log4j._
 
 object LogOffsetTest {
   val random = new Random()  
@@ -43,6 +42,8 @@ class LogOffsetTest extends JUnitSuite {
   val brokerPort: Int = 9099
   var simpleConsumer: SimpleConsumer = null
 
+  private val logger = Logger.getLogger(classOf[LogOffsetTest])
+  
   @Before
   def setUp() {
     val config: Properties = createBrokerConfig(1, brokerPort)
@@ -66,21 +67,26 @@ class LogOffsetTest extends JUnitSuite {
       new FetchRequest("test", 0, 0, 300 * 1024))
     assertFalse(messageSet.iterator.hasNext)
 
+    val name = "test"
+    val logFile = new File(logDir, name + "-0")
+    
     {
-      val offsets = simpleConsumer.getOffsetsBefore("test", 0, OffsetRequest.LatestTime,
10)
+      val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.LatestTime, 10)
       assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+      assertTrue(!logFile.exists())
     }
 
     {
-      val offsets = simpleConsumer.getOffsetsBefore("test", 0, OffsetRequest.EarliestTime,
10)
+      val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.EarliestTime,
10)
       assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+      assertTrue(!logFile.exists())
     }
 
     {
-      val offsets = simpleConsumer.getOffsetsBefore("test", 0, 1295978400000L, 10)
-      assertTrue( 0 == offsets.length )
+      val offsets = simpleConsumer.getOffsetsBefore(name, 0, SystemTime.milliseconds, 10)
+      assertEquals( 0, offsets.length )
+      assertTrue(!logFile.exists())
     }
-
   }
 
   @Test



Mime
View raw message