kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1179 createMessageStreams() in javaapi.ZookeeperConsumerConnector does not throw; reviewed by Neha Narkhede
Date Fri, 16 May 2014 00:36:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 99f10739b -> 8d454f374


KAFKA-1179 createMessageStreams() in javaapi.ZookeeperConsumerConnector does not throw; reviewed
by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d454f37
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d454f37
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d454f37

Branch: refs/heads/trunk
Commit: 8d454f374b4f879b50825b192617167070749b4f
Parents: 99f1073
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Thu May 15 17:36:00 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Thu May 15 17:36:14 2014 -0700

----------------------------------------------------------------------
 .../common/MessageStreamsExistException.scala   | 23 ++++++++++++++++
 .../consumer/ZookeeperConsumerConnector.scala   | 29 ++++++++++----------
 .../consumer/ZookeeperConsumerConnector.scala   | 17 ++++++++----
 .../ZookeeperConsumerConnectorTest.scala        | 23 ++++++++++------
 .../ZookeeperConsumerConnectorTest.scala        | 26 ++++++++++++------
 5 files changed, 80 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8d454f37/core/src/main/scala/kafka/common/MessageStreamsExistException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/MessageStreamsExistException.scala b/core/src/main/scala/kafka/common/MessageStreamsExistException.scala
new file mode 100644
index 0000000..68a2e07
--- /dev/null
+++ b/core/src/main/scala/kafka/common/MessageStreamsExistException.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.common
+
+/**
+ * Indicates a createMessageStreams can't be called more thane once
+*/
+class MessageStreamsExistException(message: String, t: Throwable) extends RuntimeException(message,
t) {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d454f37/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 1dde4fc..c032d26 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -129,31 +129,31 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   if (config.autoCommitEnable) {
     scheduler.startup
     info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
-    scheduler.schedule("kafka-consumer-autocommit", 
-                       autoCommit, 
+    scheduler.schedule("kafka-consumer-autocommit",
+                       autoCommit,
                        delay = config.autoCommitIntervalMs,
-                       period = config.autoCommitIntervalMs, 
+                       period = config.autoCommitIntervalMs,
                        unit = TimeUnit.MILLISECONDS)
   }
 
   KafkaMetricsReporter.startReporters(config.props)
 
   def this(config: ConsumerConfig) = this(config, true)
-  
-  def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]
= 
+
+  def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]
=
     createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
 
   def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder:
Decoder[V])
       : Map[String, List[KafkaStream[K,V]]] = {
     if (messageStreamCreated.getAndSet(true))
-      throw new RuntimeException(this.getClass.getSimpleName +
-                                   " can create message streams at most once")
+      throw new MessageStreamsExistException(this.getClass.getSimpleName +
+                                   " can create message streams at most once",null)
     consume(topicCountMap, keyDecoder, valueDecoder)
   }
 
-  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, 
-                                        numStreams: Int, 
-                                        keyDecoder: Decoder[K] = new DefaultDecoder(), 
+  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter,
+                                        numStreams: Int,
+                                        keyDecoder: Decoder[K] = new DefaultDecoder(),
                                         valueDecoder: Decoder[V] = new DefaultDecoder())
= {
     val wildcardStreamsHandler = new WildcardStreamsHandler[K,V](topicFilter, numStreams,
keyDecoder, valueDecoder)
     wildcardStreamsHandler.streams
@@ -921,10 +921,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     private val wildcardQueuesAndStreams = (1 to numStreams)
       .map(e => {
         val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
-        val stream = new KafkaStream[K,V](queue, 
-                                          config.consumerTimeoutMs, 
-                                          keyDecoder, 
-                                          valueDecoder, 
+        val stream = new KafkaStream[K,V](queue,
+                                          config.consumerTimeoutMs,
+                                          keyDecoder,
+                                          valueDecoder,
                                           config.clientId)
         (queue, stream)
     }).toList
@@ -978,4 +978,3 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       wildcardQueuesAndStreams.map(_._2)
   }
 }
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d454f37/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 1f95d9b..1f98db5 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -18,9 +18,10 @@ package kafka.javaapi.consumer
 
 import kafka.serializer._
 import kafka.consumer._
+import kafka.common.MessageStreamsExistException
 import scala.collection.mutable
 import scala.collection.JavaConversions
-
+import java.util.concurrent.atomic.AtomicBoolean
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -63,6 +64,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     extends ConsumerConnector {
 
   private val underlying = new kafka.consumer.ZookeeperConsumerConnector(config, enableFetcher)
+  private val messageStreamCreated = new AtomicBoolean(false)
 
   def this(config: ConsumerConfig) = this(config, true)
 
@@ -73,6 +75,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
         valueDecoder: Decoder[V])
       : java.util.Map[String,java.util.List[KafkaStream[K,V]]] = {
 
+    if (messageStreamCreated.getAndSet(true))
+      throw new MessageStreamsExistException(this.getClass.getSimpleName +
+                                   " can create message streams at most once",null)
     val scalaTopicCountMap: Map[String, Int] = {
       import JavaConversions._
       Map.empty[String, Int] ++ (topicCountMap.asInstanceOf[java.util.Map[String, Int]]:
mutable.Map[String, Int])
@@ -87,19 +92,19 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
     ret
   }
-  
+
   def createMessageStreams(topicCountMap: java.util.Map[String,java.lang.Integer]): java.util.Map[String,java.util.List[KafkaStream[Array[Byte],Array[Byte]]]]
=
     createMessageStreams(topicCountMap, new DefaultDecoder(), new DefaultDecoder())
-    
+
   def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter, numStreams: Int, keyDecoder:
Decoder[K], valueDecoder: Decoder[V]) = {
     import JavaConversions._
     underlying.createMessageStreamsByFilter(topicFilter, numStreams, keyDecoder, valueDecoder)
   }
 
-  def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) = 
+  def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
     createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder(), new DefaultDecoder())
-    
-  def createMessageStreamsByFilter(topicFilter: TopicFilter) = 
+
+  def createMessageStreamsByFilter(topicFilter: TopicFilter) =
     createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder())
 
   def commitOffsets() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d454f37/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 96fa0bd..e1d8711 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -6,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -31,6 +30,7 @@ import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
 import java.util.{Collections, Properties}
 import org.apache.log4j.{Logger, Level}
 import kafka.utils.TestUtils._
+import kafka.common.MessageStreamsExistException
 
 class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with
Logging {
 
@@ -157,6 +157,14 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
     assertEquals(expected_2, actual_3)
 
+    // call createMesssageStreams twice should throw MessageStreamsExistException
+    try {
+      val topicMessageStreams4 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String,
Int]())
+      fail("Should fail with MessageStreamsExistException")
+    } catch {
+      case e: MessageStreamsExistException => // expected
+    }
+
     zkConsumerConnector1.shutdown
     zkConsumerConnector2.shutdown
     zkConsumerConnector3.shutdown
@@ -164,6 +172,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
+
   def testCompression() {
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
@@ -358,10 +367,10 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     ms.toList
   }
 
-  def sendMessages(config: KafkaConfig, 
-                   messagesPerNode: Int, 
-                   header: String, 
-                   compression: CompressionCodec, 
+  def sendMessages(config: KafkaConfig,
+                   messagesPerNode: Int,
+                   header: String,
+                   compression: CompressionCodec,
                    numParts: Int): List[String]= {
     var messages: List[String] = Nil
     val props = new Properties()
@@ -412,5 +421,3 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
   }
 
 }
-
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d454f37/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
index 20e8efe..d6248b0 100644
--- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -27,6 +27,7 @@ import kafka.utils.IntEncoder
 import kafka.utils.{Logging, TestUtils}
 import kafka.consumer.{KafkaStream, ConsumerConfig}
 import kafka.zk.ZooKeeperTestHarness
+import kafka.common.MessageStreamsExistException
 
 import scala.collection.JavaConversions
 
@@ -69,17 +70,24 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1.sorted, receivedMessages1.sorted)
 
+    // call createMesssageStreams twice should throw MessageStreamsExistException
+    try {
+      val topicMessageStreams2 = zkConsumerConnector1.createMessageStreams(toJavaMap(Map(topic
-> numNodes*numParts/2)), new StringDecoder(), new StringDecoder())
+      fail("Should fail with MessageStreamsExistException")
+    } catch {
+      case e: MessageStreamsExistException => // expected
+    }
     zkConsumerConnector1.shutdown
     info("all consumer connectors stopped")
     requestHandlerLogger.setLevel(Level.ERROR)
   }
 
-  def sendMessages(conf: KafkaConfig, 
-                   messagesPerNode: Int, 
-                   header: String, 
+  def sendMessages(conf: KafkaConfig,
+                   messagesPerNode: Int,
+                   header: String,
                    compressed: CompressionCodec): List[String] = {
     var messages: List[String] = Nil
-    val producer: kafka.producer.Producer[Int, String] = 
+    val producer: kafka.producer.Producer[Int, String] =
       TestUtils.createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
         encoder = classOf[StringEncoder].getName,
         keyEncoder = classOf[IntEncoder].getName)
@@ -94,8 +102,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     messages
   }
 
-  def sendMessages(messagesPerNode: Int, 
-                   header: String, 
+  def sendMessages(messagesPerNode: Int,
+                   header: String,
                    compressed: CompressionCodec = NoCompressionCodec): List[String] = {
     var messages: List[String] = Nil
     for(conf <- configs)
@@ -103,7 +111,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     messages
   }
 
-  def getMessages(nMessagesPerThread: Int, 
+  def getMessages(nMessagesPerThread: Int,
                   jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaStream[String,
String]]]): List[String] = {
     var messages: List[String] = Nil
     import scala.collection.JavaConversions._
@@ -126,5 +134,5 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     val javaMap = new java.util.HashMap[String, java.lang.Integer]()
     scalaMap.foreach(m => javaMap.put(m._1, m._2.asInstanceOf[java.lang.Integer]))
     javaMap
-  }  
+  }
 }


Mime
View raw message