kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prashanthme...@apache.org
Subject svn commit: r1343255 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/common/ main/scala/kafka/log/ main/scala/kafka/producer/async/ main/scala/kafka/server/ test/scala/unit/kafka/consumer/ test/scala/unit/kafka/integ...
Date Mon, 28 May 2012 13:45:52 GMT
Author: prashanthmenon
Date: Mon May 28 13:45:51 2012
New Revision: 1343255

URL: http://svn.apache.org/viewvc?rev=1343255&view=rev
Log:
Fix retry logic for producers; patched by Prashanth Menon; reviewed by Jun Rao, Neha Narkhede;
KAFKA-49

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Mon May
28 13:45:51 2012
@@ -42,12 +42,6 @@ case class PartitionData(partition: Int,
 
   def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError,
0L, messages)
 
-  def translatePartition(topic: String, randomSelector: String => Int): Int = {
-    if (partition == ProducerRequest.RandomPartition)
-      return randomSelector(topic)
-    else 
-      return partition 
-  }
 }
 
 object TopicData {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Mon May
28 13:45:51 2012
@@ -23,7 +23,6 @@ import kafka.network._
 import kafka.utils._
 
 object ProducerRequest {
-  val RandomPartition = -1
   val CurrentVersion: Short = 0
 
   def readFrom(buffer: ByteBuffer): ProducerRequest = {
@@ -84,7 +83,7 @@ case class ProducerRequest( versionId: S
     }
   }
 
-  def sizeInBytes(): Int = {
+  def sizeInBytes: Int = {
     var size = 0 
     //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout,
data.size
     size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4;
@@ -105,12 +104,11 @@ case class ProducerRequest( versionId: S
           clientId == that.clientId &&
           requiredAcks == that.requiredAcks &&
           ackTimeout == that.ackTimeout &&
-          data.toSeq == that.data.toSeq)
+          data.toSeq == that.data.toSeq )
       case _ => false
     }
   }
 
   def topicPartitionCount = data.foldLeft(0)(_ + _.partitionData.length)
 
-  def expectResponse = requiredAcks > 0
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Mon May
28 13:45:51 2012
@@ -34,6 +34,8 @@ object ErrorMapping {
   val WrongPartitionCode = 3
   val InvalidFetchSizeCode = 4
   val InvalidFetchRequestFormatCode = 5
+  val NoLeaderForPartitionCode = 6
+  val NotLeaderForPartitionCode = 7
 
   private val exceptionToCode = 
     Map[Class[Throwable], Int](
@@ -41,7 +43,9 @@ object ErrorMapping {
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
       classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WrongPartitionCode,
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
-      classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode
+      classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
+      classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
+      classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala?rev=1343255&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala
(added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala
Mon May 28 13:45:51 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.common
+
+/**
+ * Exception raised when broker receives a produce message for partition it does not lead
+ * @param message - A more detailed and descriptive error message
+ */
+class NotLeaderForPartitionException(message: String) extends Exception(message) {
+  def this() = this(null)
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Mon May 28
13:45:51 2012
@@ -125,9 +125,10 @@ private[kafka] class LogManager(val conf
     if (topic.length <= 0)
       throw new InvalidTopicException("topic name can't be empty")
     if (partition < 0 || partition >= topicPartitionsMap.getOrElse(topic, numPartitions))
{
-      warn("Wrong partition " + partition + " valid partitions (0," +
-              (topicPartitionsMap.getOrElse(topic, numPartitions) - 1) + ")")
-      throw new InvalidPartitionException("wrong partition " + partition)
+      val error = "Wrong partition %d, valid partitions (0, %d)."
+        .format(partition, (topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
+      warn(error)
+      throw new InvalidPartitionException(error)
     }
     logs.get(topic)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
Mon May 28 13:45:51 2012
@@ -19,13 +19,13 @@ package kafka.producer.async
 
 import kafka.api.{ProducerRequest, TopicData, PartitionData}
 import kafka.cluster.Partition
+import kafka.common._
 import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
 import kafka.producer._
 import kafka.serializer.Encoder
+import kafka.utils.{Utils, Logging}
 import scala.collection.Map
 import scala.collection.mutable.{ListBuffer, HashMap}
-import kafka.utils.{Utils, Logging}
-import kafka.common.{FailedToSendMessageException, NoLeaderForPartitionException, InvalidPartitionException,
NoBrokersForPartitionException}
 
 class DefaultEventHandler[K,V](config: ProducerConfig,                               // this
api is for testing
                                private val partitioner: Partitioner[K],              // use
the other constructor
@@ -71,8 +71,13 @@ class DefaultEventHandler[K,V](config: P
             .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
         val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
 
-        if((brokerid < 0) || (!send(brokerid, messageSetPerBroker)))
-          failedProduceRequests.appendAll(eventsPerBrokerMap.map(r => r._2).flatten)
+        val failedTopicPartitions = send(brokerid, messageSetPerBroker)
+        for( (topic, partition) <- failedTopicPartitions ) {
+          eventsPerBrokerMap.get((topic, partition)) match {
+            case Some(data) => failedProduceRequests.appendAll(data)
+            case None => // nothing
+          }
+        }
       }
     } catch {
       case t: Throwable => error("Failed to send messages")
@@ -156,31 +161,37 @@ class DefaultEventHandler[K,V](config: P
    *
    * @param brokerId the broker that will receive the request
    * @param messagesPerTopic the messages as a map from (topic, partition) -> messages
+   * @return the set (topic, partitions) messages which incurred an error sending or processing
    */
-  private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]):
Boolean = {
-    try {
-      if(brokerId < 0)
-        throw new NoLeaderForPartitionException("No leader for some partition(s) on broker
%d".format(brokerId))
-      if(messagesPerTopic.size > 0) {
-        val topics = new HashMap[String, ListBuffer[PartitionData]]()
-        for(((topicName, partitionId), messagesSet) <- messagesPerTopic) {
-          topics.get(topicName) match {
-            case Some(x) => trace("found " + topicName)
-            case None => topics += topicName -> new ListBuffer[PartitionData]() //create
a new listbuffer for this topic
-          }
-          topics(topicName).append(new PartitionData(partitionId, messagesSet))
-        }
-        val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray))
-        val producerRequest = new ProducerRequest(config.correlationId, config.clientId,
config.requiredAcks, config.ackTimeout, topicData.toArray)
+  private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]):
Seq[(String, Int)] = {
+    if(brokerId < 0) {
+      messagesPerTopic.keys.toSeq
+    } else if(messagesPerTopic.size > 0) {
+      val topics = new HashMap[String, ListBuffer[PartitionData]]()
+      for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) {
+        val partitionData = topics.getOrElseUpdate(topicName, new ListBuffer[PartitionData]())
+        partitionData.append(new PartitionData(partitionId, messagesSet))
+      }
+      val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray
+      val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
config.ackTimeout, topicData)
+      try {
         val syncProducer = producerPool.getProducer(brokerId)
         val response = syncProducer.send(producerRequest)
-        // TODO: possibly send response to response callback handler
-        trace("kafka producer sent messages for topics %s to broker %d on %s:%d"
+        trace("producer sent messages for topics %s to broker %d on %s:%d"
           .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
+        var msgIdx = -1
+        val errors = new ListBuffer[(String, Int)]
+        for( topic <- topicData; partition <- topic.partitionData ) {
+          msgIdx += 1
+          if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError)
+            errors.append((topic.topic, partition.partition))
+        }
+        errors
+      } catch {
+        case e => messagesPerTopic.keys.toSeq
       }
-      true
-    }catch {
-      case t: Throwable => false
+    } else {
+      List.empty
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon May
28 13:45:51 2012
@@ -22,13 +22,13 @@ import java.lang.IllegalStateException
 import java.util.concurrent.atomic._
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
+import kafka.common._
 import kafka.log._
 import kafka.message._
 import kafka.network._
+import kafka.utils.{SystemTime, Logging}
 import org.apache.log4j.Logger
 import scala.collection._
-import kafka.utils.{SystemTime, Logging}
-import kafka.common._
 import scala.math._
 
 /**
@@ -92,18 +92,17 @@ class KafkaApis(val requestChannel: Requ
     for(topicData <- request.data) {
       for(partitionData <- topicData.partitionData) {
         msgIndex += 1
-        val partition = partitionData.translatePartition(topicData.topic, logManager.chooseRandomPartition)
         try {
           // TODO: need to handle ack's here!  Will probably move to another method.
-          kafkaZookeeper.ensurePartitionOnThisBroker(topicData.topic, partition)
-          val log = logManager.getOrCreateLog(topicData.topic, partition)
+          kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
+          val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
           log.append(partitionData.messages)
           offsets(msgIndex) = log.nextAppendOffset
           errors(msgIndex) = ErrorMapping.NoError.toShort
           trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
         } catch {
           case e =>
-            error("Error processing ProducerRequest on " + topicData.topic + ":" + partition,
e)
+            error("Error processing ProducerRequest on " + topicData.topic + ":" + partitionData.partition,
e)
             e match {
               case _: IOException =>
                 fatal("Halting due to unrecoverable I/O error while handling producer request:
" + e.getMessage, e)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Mon
May 28 13:45:51 2012
@@ -17,14 +17,14 @@
 
 package kafka.server
 
-import kafka.utils._
-import org.apache.zookeeper.Watcher.Event.KeeperState
+import java.lang.{Thread, IllegalStateException}
 import java.net.InetAddress
-import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
+import kafka.admin.AdminUtils
 import kafka.cluster.Replica
+import kafka.common.{NoLeaderForPartitionException, NotLeaderForPartitionException, KafkaZookeeperClient}
+import kafka.utils._
+import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
-import kafka.admin.AdminUtils
-import java.lang.{Thread, IllegalStateException}
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following
paths:
@@ -108,10 +108,15 @@ class KafkaZooKeeper(config: KafkaConfig
     }
   }
 
-  def ensurePartitionOnThisBroker(topic: String, partition: Int) {
-    if(!ZkUtils.isPartitionOnBroker(zkClient, topic, partition, config.brokerId))
-      throw new InvalidPartitionException("Broker %d does not host partition %d for topic
%s".
-        format(config.brokerId, partition, topic))
+  def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
+    ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
+      case Some(leader) =>
+        if(leader != config.brokerId)
+          throw new NotLeaderForPartitionException("Broker %d is not leader for partition
%d for topic %s"
+            .format(config.brokerId, partition, topic))
+      case None =>
+        throw new NoLeaderForPartitionException("There is no leader for topic %s partition
%d".format(topic, partition))
+    }
   }
 
   def getZookeeperClient = zkClient

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Mon
May 28 13:45:51 2012
@@ -23,7 +23,7 @@ import collection.mutable
 
 class ReplicaManager(config: KafkaConfig) extends Logging {
 
-  private var replicas: mutable.Map[(String, Int), Replica] = new mutable.HashMap[(String,
Int), Replica]()
+  private val replicas = new mutable.HashMap[(String, Int), Replica]()
 
   def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = {
     val replica = replicas.get((topic, partitionId))
@@ -37,7 +37,7 @@ class ReplicaManager(config: KafkaConfig
       case None =>
         val partition = new Partition(topic, partitionId)
         val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark,
log.maxSize, true)
-        replicas += (topic, partitionId) -> replica
+        replicas.put((topic, partitionId), replica)
         info("Added local replica for topic %s partition %s on broker %d"
           .format(replica.topic, replica.partition.partId, replica.brokerId))
     }
@@ -51,7 +51,7 @@ class ReplicaManager(config: KafkaConfig
       case None =>
         val partition = new Partition(topic, partitionId)
         val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
-        replicas += (topic, partitionId) -> replica
+        replicas.put((topic, partitionId), replica)
         info("Added remote replica for topic %s partition %s on broker %d"
           .format(replica.topic, replica.partition.partId, replica.brokerId))
     }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
Mon May 28 13:45:51 2012
@@ -90,6 +90,10 @@ class ZookeeperConsumerConnectorTest ext
 
     zkConsumerConnector0.shutdown
 
+    // wait to make sure the topic and partition have a leader for the successful case
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+
     // send some messages to each broker
     val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
     val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
@@ -101,9 +105,6 @@ class ZookeeperConsumerConnectorTest ext
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic
-> 1))
 
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
-
     val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
     assertEquals(sentMessages1.size, receivedMessages1.size)
     assertEquals(sentMessages1, receivedMessages1)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
Mon May 28 13:45:51 2012
@@ -17,25 +17,28 @@
 
 package kafka.integration
 
+import java.io.File
 import java.nio.ByteBuffer
+import java.util.Properties
 import junit.framework.Assert._
+import kafka.admin.CreateTopicCommand
 import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder}
 import kafka.common.{FetchRequestFormatException, OffsetOutOfRangeException, InvalidPartitionException}
-import kafka.server.{KafkaRequestHandler, KafkaConfig}
-import org.apache.log4j.{Level, Logger}
-import org.scalatest.junit.JUnit3Suite
-import java.util.Properties
+import kafka.message.Message
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
 import kafka.serializer.StringDecoder
-import kafka.message.Message
-import java.io.File
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import kafka.utils.{TestZKUtils, TestUtils}
+import org.apache.log4j.{Level, Logger}
+import org.I0Itec.zkclient.ZkClient
+import kafka.zk.ZooKeeperTestHarness
+import org.scalatest.junit.JUnit3Suite
 import scala.collection._
 
 /**
  * End to end tests of the primitive apis against a local server
  */
-class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness {
+class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with ZooKeeperTestHarness
{
   
   val port = TestUtils.choosePort
   val props = TestUtils.createBrokerConfig(0, port)
@@ -142,6 +145,8 @@ class PrimitiveApiTest extends JUnit3Sui
   }
 
   def testProduceAndMultiFetch() {
+    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"),
config.brokerId)
+
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
@@ -207,6 +212,8 @@ class PrimitiveApiTest extends JUnit3Sui
   }
 
   def testProduceAndMultiFetchWithCompression() {
+    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"),
config.brokerId)
+
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     {
@@ -272,6 +279,8 @@ class PrimitiveApiTest extends JUnit3Sui
   }
 
   def testMultiProduce() {
+    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"),
config.brokerId)
+
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
     val messages = new mutable.HashMap[String, Seq[Message]]
@@ -328,4 +337,15 @@ class PrimitiveApiTest extends JUnit3Sui
     val logFile = new File(config.logDir, newTopic + "-0")
     assertTrue(!logFile.exists)
   }
+
+  /**
+   * For testing purposes, just create these topics each with one partition and one replica
for
+   * which the provided broker should the leader for.  Create and wait for broker to lead.
 Simple.
+   */
+  def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId:
Int) {
+    for( topic <- topics ) {
+      CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
+      TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    }
+  }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
Mon May 28 13:45:51 2012
@@ -24,17 +24,17 @@ import org.easymock.EasyMock
 import org.junit.Test
 import kafka.api._
 import kafka.cluster.Broker
+import kafka.common._
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.producer.async._
 import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
 import kafka.server.KafkaConfig
+import kafka.utils.{FixedValuePartitioner, NegativePartitioner, TestZKUtils, TestUtils}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
-import collection.Map
-import collection.mutable.ListBuffer
 import org.scalatest.junit.JUnit3Suite
-import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
-import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException, InvalidConfigException,
QueueFullException}
+import scala.collection.Map
+import scala.collection.mutable.ListBuffer
 
 class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -200,7 +200,7 @@ class AsyncProducerTest extends JUnit3Su
     val handler = new DefaultEventHandler[Int,String](config,
                                                       partitioner = intPartitioner,
                                                       encoder = null.asInstanceOf[Encoder[String]],
-                                                      producerPool)
+                                                      producerPool = producerPool)
 
 
     val topic1Broker1Data = new ListBuffer[ProducerData[Int,Message]]
@@ -234,14 +234,14 @@ class AsyncProducerTest extends JUnit3Su
     props.put("zk.connect", zkConnect)
     val config = new ProducerConfig(props)
     // form expected partitions metadata
-    val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092)
+    val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
 
     val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
     val producerPool = getMockProducerPool(config, syncProducer)
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = new StringEncoder,
-                                                         producerPool)
+                                                         producerPool = producerPool)
 
     val serializedData = handler.serialize(produceData)
     val decoder = new StringDecoder
@@ -258,7 +258,7 @@ class AsyncProducerTest extends JUnit3Su
     val config = new ProducerConfig(props)
 
     // form expected partitions metadata
-    val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092)
+    val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
 
     val syncProducer = getSyncProducer(List("topic1"), List(topic1Metadata))
 
@@ -267,7 +267,7 @@ class AsyncProducerTest extends JUnit3Su
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = new NegativePartitioner,
                                                          encoder = null.asInstanceOf[Encoder[String]],
-                                                         producerPool)
+                                                         producerPool = producerPool)
     try {
       handler.partitionAndCollate(producerDataList)
       fail("Should fail with InvalidPartitionException")
@@ -297,7 +297,7 @@ class AsyncProducerTest extends JUnit3Su
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = new StringEncoder,
-                                                         producerPool)
+                                                         producerPool = producerPool)
     try {
       handler.handle(producerDataList)
       fail("Should fail with NoBrokersForPartitionException")
@@ -333,8 +333,8 @@ class AsyncProducerTest extends JUnit3Su
     val config = new ProducerConfig(props)
 
     // create topic metadata with 0 partitions
-    val topic1Metadata = getTopicMetadata("topic1", 0, "localhost", 9092)
-    val topic2Metadata = getTopicMetadata("topic2", 0, "localhost", 9092)
+    val topic1Metadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092)
+    val topic2Metadata = getTopicMetadata("topic2", 0, 0, "localhost", 9092)
 
     val syncProducer = getSyncProducer(List("topic1", "topic2"), List(topic1Metadata, topic2Metadata))
 
@@ -350,7 +350,7 @@ class AsyncProducerTest extends JUnit3Su
     val handler = new DefaultEventHandler[String,String](config,
                                                          partitioner = null.asInstanceOf[Partitioner[String]],
                                                          encoder = null.asInstanceOf[Encoder[String]],
-                                                         producerPool)
+                                                         producerPool = producerPool)
     val producerDataList = new ListBuffer[ProducerData[String,Message]]
     producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg1".getBytes)))
     producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
@@ -375,16 +375,16 @@ class AsyncProducerTest extends JUnit3Su
     val config = new ProducerConfig(props)
 
     val topic = "topic1"
-    val topic1Metadata = getTopicMetadata(topic, 0, "localhost", 9092)
+    val topic1Metadata = getTopicMetadata(topic, 0, 0, "localhost", 9092)
 
     val msgs = TestUtils.getMsgStrings(10)
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     mockSyncProducer.send(new TopicMetadataRequest(List(topic)))
     EasyMock.expectLastCall().andReturn(List(topic1Metadata))
     mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.take(5))))
-    EasyMock.expectLastCall().andReturn(null)
+    EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion,
0, Array(0.toShort), Array(0L)))
     mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.takeRight(5))))
-    EasyMock.expectLastCall().andReturn(null)
+    EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion,
0, Array(0.toShort), Array(0L)))
 	  EasyMock.replay(mockSyncProducer)
 
     val producerPool = EasyMock.createMock(classOf[ProducerPool])
@@ -403,7 +403,7 @@ class AsyncProducerTest extends JUnit3Su
     val handler = new DefaultEventHandler[String,String]( config,
                                                           partitioner = null.asInstanceOf[Partitioner[String]],
                                                           encoder = new StringEncoder,
-                                                          producerPool = producerPool)
+                                                          producerPool = producerPool )
 
     val producer = new Producer[String, String](config, handler)
     try {
@@ -420,6 +420,70 @@ class AsyncProducerTest extends JUnit3Su
   }
 
   @Test
+  def testFailedSendRetryLogic() {
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+    val config = new ProducerConfig(props)
+
+    val topic1 = "topic1"
+    val topic1Metadata = getTopicMetadata(topic1, Array(0, 1), 0, "localhost", 9092)
+    val msgs = TestUtils.getMsgStrings(2)
+
+    // producer used to return topic metadata
+    val metadataSyncProducer = EasyMock.createMock(classOf[SyncProducer])
+    metadataSyncProducer.send(new TopicMetadataRequest(List(topic1)))
+    EasyMock.expectLastCall().andReturn(List(topic1Metadata)).times(3)
+    EasyMock.replay(metadataSyncProducer)
+
+    // produce request for topic1 and partitions 0 and 1.  Let the first request fail
+    // entirely.  The second request will succeed for partition 1 but fail for partition
0.
+    // On the third try for partition 0, let it succeed.
+    val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs),
0)
+    val response1 = 
+      new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort,
0.toShort), Array(0L, 0L)) 
+    val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
+    val response2 = new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort),
Array(0L))
+    val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
+    EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate
SocketTimeoutException
+    EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)
+    EasyMock.expect(mockSyncProducer.send(request2)).andReturn(response2)
+    EasyMock.replay(mockSyncProducer)
+
+    val producerPool = EasyMock.createMock(classOf[ProducerPool])
+    EasyMock.expect(producerPool.getZkClient).andReturn(zkClient)
+    EasyMock.expect(producerPool.addProducers(config))
+    EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
+    EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
+    EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
+    EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
+    EasyMock.expect(producerPool.getAnyProducer).andReturn(metadataSyncProducer)
+    EasyMock.expect(producerPool.getProducer(0)).andReturn(mockSyncProducer)
+    EasyMock.expect(producerPool.close())
+    EasyMock.replay(producerPool)
+
+    val handler = new DefaultEventHandler[Int,String](config,
+                                                      partitioner = new FixedValuePartitioner(),
+                                                      encoder = new StringEncoder,
+                                                      producerPool = producerPool)
+    try {
+      val data = List(
+        new ProducerData[Int,String](topic1, 0, msgs),
+        new ProducerData[Int,String](topic1, 1, msgs)
+      )
+      handler.handle(data)
+      handler.close()
+    } catch {
+      case e: Exception => fail("Not expected", e)
+    }
+
+    EasyMock.verify(metadataSyncProducer)
+    EasyMock.verify(mockSyncProducer)
+    EasyMock.verify(producerPool)
+  }
+
+  @Test
   def testJavaProducer() {
     val topic = "topic1"
     val msgs = TestUtils.getMsgStrings(5)
@@ -488,10 +552,13 @@ class AsyncProducerTest extends JUnit3Su
     producerPool
   }
 
-  private def getTopicMetadata(topic: String, brokerId: Int, brokerHost: String, brokerPort:
Int): TopicMetadata = {
+  private def getTopicMetadata(topic: String, partition: Int, brokerId: Int, brokerHost:
String, brokerPort: Int): TopicMetadata = {
+    getTopicMetadata(topic, List(partition), brokerId, brokerHost, brokerPort)
+  }
+
+  private def getTopicMetadata(topic: String, partition: Seq[Int], brokerId: Int, brokerHost:
String, brokerPort: Int): TopicMetadata = {
     val broker1 = new Broker(brokerId, brokerHost, brokerHost, brokerPort)
-    val partition1Metadata = new PartitionMetadata(brokerId, Some(broker1), List(broker1))
-    new TopicMetadata(topic, List(partition1Metadata))
+    new TopicMetadata(topic, partition.map(new PartitionMetadata(_, Some(broker1), List(broker1))))
   }
 
   class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config)
{

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
Mon May 28 13:45:51 2012
@@ -17,6 +17,7 @@
 
 package kafka.producer
 
+import java.net.SocketTimeoutException
 import java.util.Properties
 import junit.framework.Assert
 import kafka.admin.CreateTopicCommand
@@ -27,7 +28,6 @@ import kafka.server.KafkaConfig
 import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
-import java.net.SocketTimeoutException
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
@@ -92,7 +92,6 @@ class SyncProducerTest extends JUnit3Sui
 
   @Test
   def testProduceCorrectlyReceivesResponse() {
-    // TODO: this will need to change with kafka-44
     val server = servers.head
     val props = new Properties()
     props.put("host", "localhost")
@@ -106,21 +105,25 @@ class SyncProducerTest extends JUnit3Sui
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
 
     // #1 - test that we get an error when partition does not belong to broker in response
-    val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0),
messages)
+    val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0),
messages, 1)
     val response = producer.send(request)
 
+    Assert.assertNotNull(response)
     Assert.assertEquals(request.correlationId, response.correlationId)
     Assert.assertEquals(response.errors.length, response.offsets.length)
     Assert.assertEquals(3, response.errors.length)
-    response.errors.foreach(Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort,
_))
+    response.errors.foreach(Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort,
_))
     response.offsets.foreach(Assert.assertEquals(-1L, _))
 
-    // #2 - test that we get correct offsets when partition is owner by broker
+    // #2 - test that we get correct offsets when partition is owned by broker
     CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "topic1", 0, 500)
     CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "topic3", 0, 500)
 
     Thread.sleep(500)
     val response2 = producer.send(request)
+    Assert.assertNotNull(response2)
     Assert.assertEquals(request.correlationId, response2.correlationId)
     Assert.assertEquals(response2.errors.length, response2.offsets.length)
     Assert.assertEquals(3, response2.errors.length)
@@ -132,7 +135,7 @@ class SyncProducerTest extends JUnit3Sui
     Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
 
     // the middle message should have been rejected because broker doesn't lead partition
-    Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, response2.errors(1))
+    Assert.assertEquals(ErrorMapping.NoLeaderForPartitionCode.toShort, response2.errors(1))
     Assert.assertEquals(-1, response2.offsets(1))
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
Mon May 28 13:45:51 2012
@@ -27,8 +27,8 @@ import kafka.utils.TestUtils
 
 class RequestPurgatoryTest {
 
-  val producerRequest1 = TestUtils.produceRequest("test", new ByteBufferMessageSet(new Message("hello1".getBytes)))
-  val producerRequest2 = TestUtils.produceRequest("test", new ByteBufferMessageSet(new Message("hello2".getBytes)))
+  val producerRequest1 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new
Message("hello1".getBytes)))
+  val producerRequest2 = TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(new
Message("hello2".getBytes)))
   var purgatory: MockRequestPurgatory = null
   
   @Before

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1343255&r1=1343254&r2=1343255&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon
May 28 13:45:51 2012
@@ -343,29 +343,20 @@ object TestUtils extends Logging {
   /**
    * Create a wired format request based on simple basic information
    */
-  def produceRequest(topic: String, message: ByteBufferMessageSet): kafka.api.ProducerRequest
= {
-    produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,ProducerRequest.RandomPartition,message)
-  }
   def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest
= {
     produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
   }
 
-  def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet):
kafka.api.ProducerRequest = {
-    val correlationId = SyncProducerConfig.DefaultCorrelationId
-    val clientId = SyncProducerConfig.DefaultClientId
-    val requiredAcks: Short = 1.toShort
-    val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs
-    val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray))
-    new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data.toArray)
+  def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet):
kafka.api.ProducerRequest = {
+    produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks)
   }
 
-  def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet):
kafka.api.ProducerRequest = {
+  def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet,
acks: Int): kafka.api.ProducerRequest = {
+    val correlationId = SyncProducerConfig.DefaultCorrelationId
     val clientId = SyncProducerConfig.DefaultClientId
-    val requiredAcks: Short = SyncProducerConfig.DefaultRequiredAcks
     val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs
-    var partitionData = Array[PartitionData]( new PartitionData(partition, message) )
-    var data = Array[TopicData]( new TopicData(topic, partitionData) )
-    new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)
+    val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray))
+    new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeout, data.toArray)
   }
 
   def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet):
kafka.javaapi.ProducerRequest = {



Mime
View raw message