kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [1/7] kafka git commit: KAFKA-1760: New consumer.
Date Fri, 30 Jan 2015 02:39:31 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 11ec9bf5a -> 0699ff2ce


http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index cd16ced..a1f72f8 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -192,7 +192,7 @@ object SerializationTestUtils {
   }
 
   def createConsumerMetadataResponse: ConsumerMetadataResponse = {
-    ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError)
+    ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError, 0)
   }
 
   def createHeartbeatRequestAndHeader: HeartbeatRequestAndHeader = {
@@ -237,7 +237,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
   private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
-  private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode)
+  private val consumerMetadataResponseNoCoordinator = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode,
0)
   private val heartbeatRequest = SerializationTestUtils.createHeartbeatRequestAndHeader
   private val heartbeatResponse = SerializationTestUtils.createHeartbeatResponseAndHeader
   private val joinGroupRequest = SerializationTestUtils.createJoinGroupRequestAndHeader

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 3cf7c9b..ef4c9ae 100644
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -17,11 +17,14 @@
 
 package kafka.integration
 
+import java.util.Arrays
+import scala.collection.mutable.Buffer
 import kafka.server._
 import kafka.utils.{Utils, TestUtils}
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.common.KafkaException
+import kafka.utils.TestUtils
 
 /**
  * A test harness that brings up some number of broker nodes
@@ -29,15 +32,22 @@ import kafka.common.KafkaException
 trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
 
   val configs: List[KafkaConfig]
-  var servers: List[KafkaServer] = null
+  var servers: Buffer[KafkaServer] = null
   var brokerList: String = null
-
+  var alive: Array[Boolean] = null
+  
+  def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
+  
+  def bootstrapUrl = configs.map(c => c.hostName + ":" + c.port).mkString(",")
+  
   override def setUp() {
     super.setUp
     if(configs.size <= 0)
       throw new KafkaException("Must suply at least one server config.")
     brokerList = TestUtils.getBrokerListStrFromConfigs(configs)
-    servers = configs.map(TestUtils.createServer(_))
+    servers = configs.map(TestUtils.createServer(_)).toBuffer
+    alive = new Array[Boolean](servers.length)
+    Arrays.fill(alive, true)
   }
 
   override def tearDown() {
@@ -45,4 +55,27 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness
{
     servers.map(server => server.config.logDirs.map(Utils.rm(_)))
     super.tearDown
   }
+  
+  /**
+   * Pick a broker at random and kill it if it isn't already dead
+   * Return the id of the broker killed
+   */
+  def killRandomBroker(): Int = {
+    val index = TestUtils.random.nextInt(servers.length)
+    if(alive(index)) {
+      servers(index).shutdown()
+      alive(index) = false
+    }
+    index
+  }
+  
+  /**
+   * Restart any dead brokers
+   */
+  def restartDeadBrokers() {
+    for(i <- 0 until servers.length if !alive(i)) {
+      servers(i) = TestUtils.createServer(configs(i))
+      alive(i) = true
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index a5386a0..aeb7a19 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -32,6 +32,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionExc
 import kafka.utils.{StaticPartitioner, TestUtils, Utils}
 import kafka.serializer.StringEncoder
 import java.util.Properties
+import TestUtils._
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -113,7 +114,8 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with
   }
 
   private def produceAndMultiFetch(producer: Producer[String, String]) {
-    createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"))
+    for(topic <- List("test1", "test2", "test3", "test4"))
+      TestUtils.createTopic(zkClient, topic, servers = servers)
 
     // send some messages
     val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0));
@@ -181,7 +183,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with
 
   private def multiProduce(producer: Producer[String, String]) {
     val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
-    createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
+    topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
 
     val messages = new mutable.HashMap[String, Seq[String]]
     val builder = new FetchRequestBuilder()
@@ -215,7 +217,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with
 
   def testPipelinedProduceRequests() {
     val topics = Map("test4" -> 0, "test1" -> 0, "test2" -> 0, "test3" -> 0)
-    createSimpleTopicsAndAwaitLeader(zkClient, topics.keys)
+    topics.keys.map(topic => TestUtils.createTopic(zkClient, topic, servers = servers))
     val props = new Properties()
     props.put("request.required.acks", "0")
     val pipelinedProducer: Producer[String, String] =
@@ -265,15 +267,4 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness
with
       assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload)))
     }
   }
-
-  /**
-   * For testing purposes, just create these topics each with one partition and one replica
for
-   * which the provided broker should the leader for.  Create and wait for broker to lead.
 Simple.
-   */
-  private def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Iterable[String])
{
-    for( topic <- topics ) {
-      AdminUtils.createTopic(zkClient, topic, partitions = 1, replicationFactor = 1)
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition = 0)
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index d5896ed..c674078 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -36,6 +36,8 @@ class MockScheduler(val time: Time) extends Scheduler {
   
   /* a priority queue of tasks ordered by next execution time */
   var tasks = new PriorityQueue[MockTask]()
+  
+  def isStarted = true
 
   def startup() {}
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index b364ac2..cfea63b 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -90,4 +90,21 @@ class SchedulerTest {
       assertTrue("Should count to 20", counter1.get >= 20)
     }
   }
+  
+  @Test
+  def testRestart() {
+    // schedule a task to increment a counter
+    mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1)
+    mockTime.sleep(1)
+    assertEquals(1, counter1.get())
+    
+    // restart the scheduler
+    mockTime.scheduler.shutdown()
+    mockTime.scheduler.startup()
+    
+    // schedule another task to increment the counter
+    mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1)
+    mockTime.sleep(1)
+    assertEquals(2, counter1.get())
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ac15d34..54755e8 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -45,6 +45,7 @@ import kafka.log._
 import junit.framework.AssertionFailedError
 import junit.framework.Assert._
 import org.apache.kafka.clients.producer.KafkaProducer
+import collection.Iterable
 
 import scala.collection.Map
 
@@ -709,24 +710,21 @@ object TestUtils extends Logging {
   /**
    * Create new LogManager instance with default configuration for testing
    */
-  def createLogManager(
-    logDirs: Array[File] = Array.empty[File],
-    defaultConfig: LogConfig = LogConfig(),
-    cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
-    time: MockTime = new MockTime()) =
-  {
-    new LogManager(
-      logDirs = logDirs,
-      topicConfigs = Map(),
-      defaultConfig = defaultConfig,
-      cleanerConfig = cleanerConfig,
-      ioThreads = 4,
-      flushCheckMs = 1000L,
-      flushCheckpointMs = 10000L,
-      retentionCheckMs = 1000L,
-      scheduler = time.scheduler,
-      time = time,
-      brokerState = new BrokerState())
+  def createLogManager(logDirs: Array[File] = Array.empty[File],
+                       defaultConfig: LogConfig = LogConfig(),
+                       cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
+                       time: MockTime = new MockTime()): LogManager = {
+    new LogManager(logDirs = logDirs,
+                   topicConfigs = Map(),
+                   defaultConfig = defaultConfig,
+                   cleanerConfig = cleanerConfig,
+                   ioThreads = 4,
+                   flushCheckMs = 1000L,
+                   flushCheckpointMs = 10000L,
+                   retentionCheckMs = 1000L,
+                   scheduler = time.scheduler,
+                   time = time,
+                   brokerState = new BrokerState())
   }
 
   def sendMessagesToPartition(configs: Seq[KafkaConfig],


Mime
View raw message