kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/5] kafka git commit: KAFKA-4507; Clients should support older brokers (KIP-97)
Date Wed, 11 Jan 2017 19:31:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d17339537 -> 3d60f1e64


http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index e04e1b7..92546a6 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -33,15 +33,15 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
     val timeout = 10000
     // Single topic
     TestUtils.createTopic(zkUtils, "topic-1", 1, 1, servers)
-    validateValidDeleteTopicRequests(new DeleteTopicsRequest(Set("topic-1").asJava, timeout))
+    validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("topic-1").asJava,
timeout).build())
     // Multi topic
     TestUtils.createTopic(zkUtils, "topic-3", 5, 2, servers)
     TestUtils.createTopic(zkUtils, "topic-4", 1, 2, servers)
-    validateValidDeleteTopicRequests(new DeleteTopicsRequest(Set("topic-3", "topic-4").asJava,
timeout))
+    validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("topic-3", "topic-4").asJava,
timeout).build())
   }
 
   private def validateValidDeleteTopicRequests(request: DeleteTopicsRequest): Unit = {
-    val response = sendDeleteTopicsRequest(request, 0)
+    val response = sendDeleteTopicsRequest(request)
 
     val error = response.errors.values.asScala.find(_ != Errors.NONE)
     assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty)
@@ -57,14 +57,14 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
     val timeoutTopic = "invalid-timeout"
 
     // Basic
-    validateErrorDeleteTopicRequests(new DeleteTopicsRequest(Set("invalid-topic").asJava,
timeout),
+    validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("invalid-topic").asJava,
timeout).build(),
       Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION))
 
     // Partial
     TestUtils.createTopic(zkUtils, "partial-topic-1", 1, 1, servers)
-    validateErrorDeleteTopicRequests(new DeleteTopicsRequest(Set(
+    validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set(
       "partial-topic-1",
-      "partial-invalid-topic").asJava, timeout),
+      "partial-invalid-topic").asJava, timeout).build(),
       Map(
         "partial-topic-1" -> Errors.NONE,
         "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION
@@ -74,7 +74,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
     // Timeout
     TestUtils.createTopic(zkUtils, timeoutTopic, 5, 2, servers)
     // Must be a 0ms timeout to avoid transient test failures. Even a timeout of 1ms has
succeeded in the past.
-    validateErrorDeleteTopicRequests(new DeleteTopicsRequest(Set(timeoutTopic).asJava, 0),
+    validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set(timeoutTopic).asJava,
0).build(),
       Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT))
     // The topic should still get deleted eventually
     TestUtils.waitUntilTrue(() => !servers.head.metadataCache.contains(timeoutTopic),
s"Topic $timeoutTopic is never deleted")
@@ -82,7 +82,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
   }
 
   private def validateErrorDeleteTopicRequests(request: DeleteTopicsRequest, expectedResponse:
Map[String, Errors]): Unit = {
-    val response = sendDeleteTopicsRequest(request, 0)
+    val response = sendDeleteTopicsRequest(request)
     val errors = response.errors.asScala
     assertEquals("The response size should match", expectedResponse.size, response.errors.size)
 
@@ -97,22 +97,23 @@ class DeleteTopicsRequestTest extends BaseRequestTest {
 
   @Test
   def testNotController() {
-    val request = new DeleteTopicsRequest(Set("not-controller").asJava, 1000)
-    val response = sendDeleteTopicsRequest(request, 0, notControllerSocketServer)
+    val request = new DeleteTopicsRequest.Builder(Set("not-controller").asJava, 1000).build()
+    val response = sendDeleteTopicsRequest(request, notControllerSocketServer)
 
     val error = response.errors.asScala.head._2
     assertEquals("Expected controller error when routed incorrectly",  Errors.NOT_CONTROLLER,
error)
   }
 
   private def validateTopicIsDeleted(topic: String): Unit = {
-    val metadata = sendMetadataRequest(new MetadataRequest(List(topic).asJava)).topicMetadata.asScala
+    val metadata = sendMetadataRequest(new MetadataRequest.
+        Builder(List(topic).asJava).build).topicMetadata.asScala
     TestUtils.waitUntilTrue (() => !metadata.exists(p => p.topic.equals(topic) &&
p.error() == Errors.NONE),
       s"The topic $topic should not exist")
   }
 
-  private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, version: Short, socketServer:
SocketServer = controllerSocketServer): DeleteTopicsResponse = {
-    val response = send(request, ApiKeys.DELETE_TOPICS, Some(version), socketServer)
-    DeleteTopicsResponse.parse(response, version)
+  private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, socketServer: SocketServer
= controllerSocketServer): DeleteTopicsResponse = {
+    val response = send(request, ApiKeys.DELETE_TOPICS, socketServer)
+    DeleteTopicsResponse.parse(response, request.version)
   }
 
   private def sendMetadataRequest(request: MetadataRequest): MetadataResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index a886609..2857fc7 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -117,7 +117,8 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
       val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, 2, null, correlationId)
       val messageBytes = "message".getBytes
       val records = MemoryRecords.readableRecords(ByteBuffer.wrap(messageBytes))
-      val request = new ProduceRequest(1, 10000, Map(topicPartition -> records).asJava)
+      val request = new ProduceRequest.Builder(
+          1, 10000, Map(topicPartition -> records).asJava).build()
       val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.sizeOf)
       byteBuffer.put(headerBytes)
       request.writeTo(byteBuffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index e6260a9..c18d949 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -56,7 +56,8 @@ class FetchRequestTest extends BaseRequestTest {
 
   private def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: Int, topicPartitions:
Seq[TopicPartition],
                                  offsetMap: Map[TopicPartition, Long] = Map.empty): FetchRequest
=
-    new FetchRequest(Int.MaxValue, 0, maxResponseBytes, createPartitionMap(maxPartitionBytes,
topicPartitions, offsetMap))
+    new FetchRequest.Builder(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, topicPartitions,
offsetMap))
+      .setMaxBytes(maxResponseBytes).build()
 
   private def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
                                  offsetMap: Map[TopicPartition, Long] = Map.empty): util.LinkedHashMap[TopicPartition,
FetchRequest.PartitionData] = {
@@ -67,8 +68,8 @@ class FetchRequestTest extends BaseRequestTest {
     partitionMap
   }
 
-  private def sendFetchRequest(leaderId: Int, request: FetchRequest, version: Option[Short]
= None): FetchResponse = {
-    val response = send(request, ApiKeys.FETCH, version, destination = brokerSocketServer(leaderId))
+  private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse = {
+    val response = send(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId))
     FetchResponse.parse(response)
   }
 
@@ -155,8 +156,10 @@ class FetchRequestTest extends BaseRequestTest {
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head
     producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition,
       "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
-    val fetchRequest = new FetchRequest(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes,
Seq(topicPartition)))
-    val fetchResponse = sendFetchRequest(leaderId, fetchRequest, Some(2))
+    val fetchRequestBuilder = new FetchRequest.Builder(
+      Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, Seq(topicPartition))).
+      setVersion(2)
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequestBuilder.build())
     val partitionData = fetchResponse.responseData.get(topicPartition)
     assertEquals(Errors.NONE.code, partitionData.errorCode)
     assertTrue(partitionData.highWatermark > 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 9cf6318..7e85c19 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -143,10 +143,10 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
           Seq(brokerId1, brokerId2).map(Integer.valueOf).asJava, LeaderAndIsr.initialZKVersion,
           Set(0, 1).map(Integer.valueOf).asJava)
       )
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, staleControllerEpoch,
partitionStates.asJava,
-        nodes.toSet.asJava)
+      val requestBuilder = new LeaderAndIsrRequest.Builder(
+          controllerId, staleControllerEpoch, partitionStates.asJava, nodes.toSet.asJava)
 
-      controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest,
+      controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, requestBuilder,
         staleControllerEpochCallback)
       TestUtils.waitUntilTrue(() => staleControllerEpochDetected, "Controller epoch should
be stale")
       assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 138c36d..43ff785 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -68,7 +68,8 @@ class MetadataCacheTest {
       new TopicPartition(topic, 1) -> new PartitionState(controllerEpoch, 1, 1, asList(1),
zkVersion, asSet(1)),
       new TopicPartition(topic, 2) -> new PartitionState(controllerEpoch, 2, 2, asList(2),
zkVersion, asSet(2)))
 
-    val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch,
partitionStates.asJava, brokers.asJava)
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
+      controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
     for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL)) {
@@ -115,7 +116,8 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch,
asList(0), zkVersion, asSet(0)))
 
-    val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch,
partitionStates.asJava, brokers.asJava)
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
+      controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
     val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT)
@@ -155,7 +157,8 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch,
isr, zkVersion, replicas))
 
-    val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch,
partitionStates.asJava, brokers.asJava)
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
+      controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
     // Validate errorUnavailableEndpoints = false
@@ -211,7 +214,8 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch,
isr, zkVersion, replicas))
 
-    val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch,
partitionStates.asJava, brokers.asJava)
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
+      controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
     // Validate errorUnavailableEndpoints = false
@@ -259,7 +263,8 @@ class MetadataCacheTest {
     val isr = asList[Integer](0, 1)
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch,
isr, 3, replicas))
-    val updateMetadataRequest = new UpdateMetadataRequest(2, controllerEpoch, partitionStates.asJava,
brokers.asJava)
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(
+      2, controllerEpoch, partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
     try {
@@ -288,7 +293,8 @@ class MetadataCacheTest {
       val isr = asList[Integer](0, 1)
       val partitionStates = Map(
         new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch,
isr, 3, replicas))
-      val updateMetadataRequest = new UpdateMetadataRequest(2, controllerEpoch, partitionStates.asJava,
brokers.asJava)
+      val updateMetadataRequest = new UpdateMetadataRequest.Builder(
+        2, controllerEpoch, partitionStates.asJava, brokers.asJava).build()
       cache.updateCache(15, updateMetadataRequest)
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 9bcf4fd..f3bb912 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -36,14 +36,14 @@ class MetadataRequestTest extends BaseRequestTest {
 
   @Test
   def testClusterIdWithRequestVersion1() {
-    val v1MetadataResponse = sendMetadataRequest(MetadataRequest.allTopics, 1)
+    val v1MetadataResponse = sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
     val v1ClusterId = v1MetadataResponse.clusterId
     assertNull(s"v1 clusterId should be null", v1ClusterId)
   }
 
   @Test
   def testClusterIdIsValid() {
-    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics, 2)
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(2.toShort))
     isValidClusterId(metadataResponse.clusterId)
   }
 
@@ -51,7 +51,7 @@ class MetadataRequestTest extends BaseRequestTest {
   def testControllerId() {
     val controllerServer = servers.find(_.kafkaController.isActive).get
     val controllerId = controllerServer.config.brokerId
-    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
 
     assertEquals("Controller id should match the active controller",
       controllerId, metadataResponse.controller.id)
@@ -64,14 +64,14 @@ class MetadataRequestTest extends BaseRequestTest {
     val controllerId2 = controllerServer2.config.brokerId
     assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2)
     TestUtils.waitUntilTrue(() => {
-      val metadataResponse2 = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+      val metadataResponse2 = sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
       metadataResponse2.controller != null && controllerServer2.apis.brokerId ==
metadataResponse2.controller.id
     }, "Controller id should match the active controller after failover", 5000)
   }
 
   @Test
   def testRack() {
-    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
     // Validate rack matches what's set in generateConfigs() above
     metadataResponse.brokers.asScala.foreach { broker =>
       assertEquals("Rack information should match config", s"rack/${broker.id}", broker.rack)
@@ -86,7 +86,7 @@ class MetadataRequestTest extends BaseRequestTest {
     TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers)
     TestUtils.createTopic(zkUtils, notInternalTopic, 3, 2, servers)
 
-    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+    val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
     assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
 
     val topicMetadata = metadataResponse.topicMetadata.asScala
@@ -107,7 +107,7 @@ class MetadataRequestTest extends BaseRequestTest {
 
     // v0, Doesn't support a "no topics" request
     // v1, Empty list represents "no topics"
-    val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava),
1)
+    val metadataResponse = sendMetadataRequest(new MetadataRequest(List[String]().asJava,
1.toShort))
     assertTrue("Response should have no errors", metadataResponse.errors.isEmpty)
     assertTrue("Response should have no topics", metadataResponse.topicMetadata.isEmpty)
   }
@@ -119,12 +119,12 @@ class MetadataRequestTest extends BaseRequestTest {
     TestUtils.createTopic(zkUtils, "t2", 3, 2, servers)
 
     // v0, Empty list represents all topics
-    val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava),
0)
+    val metadataResponseV0 = sendMetadataRequest(new MetadataRequest(List[String]().asJava,
0.toShort))
     assertTrue("V0 Response should have no errors", metadataResponseV0.errors.isEmpty)
     assertEquals("V0 Response should have 2 (all) topics", 2, metadataResponseV0.topicMetadata.size())
 
     // v1, Null represents all topics
-    val metadataResponseV1 = sendMetadataRequest(MetadataRequest.allTopics(), 1)
+    val metadataResponseV1 = sendMetadataRequest(MetadataRequest.allTopics(1.toShort))
     assertTrue("V1 Response should have no errors", metadataResponseV1.errors.isEmpty)
     assertEquals("V1 Response should have 2 (all) topics", 2, metadataResponseV1.topicMetadata.size())
   }
@@ -138,7 +138,7 @@ class MetadataRequestTest extends BaseRequestTest {
     TestUtils.createTopic(zkUtils, replicaDownTopic, 1, replicaCount, servers)
 
     // Kill a replica node that is not the leader
-    val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava),
1)
+    val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava,
1.toShort))
     val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     val downNode = servers.find { server =>
       val serverId = server.apis.brokerId
@@ -149,14 +149,14 @@ class MetadataRequestTest extends BaseRequestTest {
     downNode.shutdown()
 
     TestUtils.waitUntilTrue(() => {
-      val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava),
1)
+      val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava,
1.toShort))
       val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
       val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get
       replica.host == "" & replica.port == -1
     }, "Replica was not found down", 5000)
 
     // Validate version 0 still filters unavailable replicas and contains error
-    val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava),
0)
+    val v0MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava,
0.toShort))
     val v0BrokerIds = v0MetadataResponse.brokers().asScala.map(_.id).toSeq
     assertTrue("Response should have no errors", v0MetadataResponse.errors.isEmpty)
     assertFalse(s"The downed broker should not be in the brokers list", v0BrokerIds.contains(downNode))
@@ -166,7 +166,7 @@ class MetadataRequestTest extends BaseRequestTest {
     assertTrue(s"Response should have ${replicaCount - 1} replicas", v0PartitionMetadata.replicas.size
== replicaCount - 1)
 
     // Validate version 1 returns unavailable replicas with no error
-    val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava),
1)
+    val v1MetadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava,
1.toShort))
     val v1BrokerIds = v1MetadataResponse.brokers().asScala.map(_.id).toSeq
     assertTrue("Response should have no errors", v1MetadataResponse.errors.isEmpty)
     assertFalse(s"The downed broker should not be in the brokers list", v1BrokerIds.contains(downNode))
@@ -176,8 +176,8 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicas.size)
   }
 
-  private def sendMetadataRequest(request: MetadataRequest, version: Short): MetadataResponse
= {
-    val response = send(request, ApiKeys.METADATA, Some(version))
-    MetadataResponse.parse(response, version)
+  private def sendMetadataRequest(request: MetadataRequest): MetadataResponse = {
+    val response = send(request, ApiKeys.METADATA)
+    MetadataResponse.parse(response, request.version)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 4dfe4b5..8ed93d9 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -40,7 +40,8 @@ class ProduceRequestTest extends BaseRequestTest {
     def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): ProduceResponse.PartitionResponse
= {
       val topicPartition = new TopicPartition("topic", partition)
       val partitionRecords = Map(topicPartition -> memoryRecords)
-      val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
+      val produceResponse = sendProduceRequest(leader,
+          new ProduceRequest.Builder(-1, 3000, partitionRecords.asJava).build())
       assertEquals(1, produceResponse.responses.size)
       val (tp, partitionResponse) = produceResponse.responses.asScala.head
       assertEquals(topicPartition, tp)
@@ -76,7 +77,8 @@ class ProduceRequestTest extends BaseRequestTest {
     memoryRecords.buffer.array.update(40, 0)
     val topicPartition = new TopicPartition("topic", partition)
     val partitionRecords = Map(topicPartition -> memoryRecords)
-    val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava))
+    val produceResponse = sendProduceRequest(leader, 
+      new ProduceRequest.Builder(-1, 3000, partitionRecords.asJava).build())
     assertEquals(1, produceResponse.responses.size)
     val (tp, partitionResponse) = produceResponse.responses.asScala.head
     assertEquals(topicPartition, tp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b710a4f..bcf0a9c 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -149,9 +149,9 @@ class ReplicaManagerTest {
       val partition = rm.getOrCreatePartition(new TopicPartition(topic, 0))
       partition.getOrCreateReplica(0)
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0,
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0,
0, 0, brokerList, 0, brokerSet)).asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava)
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 
@@ -174,9 +174,9 @@ class ReplicaManagerTest {
         responseCallback = fetchCallback)
 
       // Make this replica the follower
-      val leaderAndIsrRequest2 = new LeaderAndIsrRequest(0, 0,
+      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0,
1, 1, brokerList, 0, brokerSet)).asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava)
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {})
 
       assertTrue(produceCallbackFired)
@@ -208,9 +208,9 @@ class ReplicaManagerTest {
       partition.getOrCreateReplica(0)
       
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0,
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0,
         collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0,
0, 0, brokerList, 0, brokerSet)).asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava)
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
       rm.getLeaderReplicaIfLocal(new TopicPartition(topic, 0))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 9e19e39..34d7d14 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -19,13 +19,15 @@ package kafka.server
 import java.io.IOException
 import java.net.Socket
 import java.util.Collections
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
+
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils, SecurityProtocol}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse}
 import org.apache.kafka.common.requests.SaslHandshakeRequest
 import org.apache.kafka.common.requests.SaslHandshakeResponse
 import org.junit.Test
 import org.junit.Assert._
 import kafka.api.SaslTestHarness
+import org.apache.kafka.common.protocol.types.Struct
 
 class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
   override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
@@ -39,7 +41,8 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness
{
   def testApiVersionsRequestBeforeSaslHandshakeRequest() {
     val plaintextSocket = connect(protocol = securityProtocol)
     try {
-      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest,
0)
+      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket,
+          new ApiVersionsRequest.Builder().setVersion(0).build())
       ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse)
       sendSaslHandshakeRequestValidateResponse(plaintextSocket)
     } finally {
@@ -53,7 +56,8 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness
{
     try {
       sendSaslHandshakeRequestValidateResponse(plaintextSocket)
       try {
-        sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest, 0)
+        sendApiVersionsRequest(plaintextSocket,
+            new ApiVersionsRequest.Builder().setVersion(0).build())
         fail("Versions Request during Sasl handshake did not fail")
       } catch {
         case _: IOException => // expected exception
@@ -67,9 +71,12 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness
{
   def testApiVersionsRequestWithUnsupportedVersion() {
     val plaintextSocket = connect(protocol = securityProtocol)
     try {
-      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest,
Short.MaxValue)
+      val apiVersionsRequest = new ApiVersionsRequest(
+        new Struct(ProtoUtils.requestSchema(ApiKeys.API_VERSIONS.id, 0)), Short.MaxValue);
+      val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, apiVersionsRequest)
       assertEquals(Errors.UNSUPPORTED_VERSION.code(), apiVersionsResponse.errorCode)
-      val apiVersionsResponse2 = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest,
0)
+      val apiVersionsResponse2 = sendApiVersionsRequest(plaintextSocket,
+          new ApiVersionsRequest.Builder().setVersion(0).build())
       ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse2)
       sendSaslHandshakeRequestValidateResponse(plaintextSocket)
     } finally {
@@ -77,13 +84,13 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness
{
     }
   }
 
-  private def sendApiVersionsRequest(socket: Socket, request: ApiVersionsRequest, version:
Short): ApiVersionsResponse = {
-    val response = send(request, ApiKeys.API_VERSIONS, version, socket)
+  private def sendApiVersionsRequest(socket: Socket, request: ApiVersionsRequest): ApiVersionsResponse
= {
+    val response = send(request, ApiKeys.API_VERSIONS, socket)
     ApiVersionsResponse.parse(response)
   }
 
   private def sendSaslHandshakeRequestValidateResponse(socket: Socket) {
-    val response = send(new SaslHandshakeRequest("PLAIN"), ApiKeys.SASL_HANDSHAKE, 0.toShort,
socket)
+    val response = send(new SaslHandshakeRequest("PLAIN"), ApiKeys.SASL_HANDSHAKE, socket)
     val handshakeResponse = SaslHandshakeResponse.parse(response)
     assertEquals(Errors.NONE.code, handshakeResponse.errorCode())
     assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms())

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d60f1e6/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 132c1af..8bee6c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
@@ -39,7 +38,6 @@ import org.apache.kafka.common.requests.DeleteTopicsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
@@ -98,7 +96,7 @@ public class StreamsKafkaClient {
                 streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG),
                 streamsConfig.getInt(StreamsConfig.SEND_BUFFER_CONFIG),
                 streamsConfig.getInt(StreamsConfig.RECEIVE_BUFFER_CONFIG),
-                streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
+                streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG), time, true);
     }
 
     public void close() throws IOException {
@@ -127,8 +125,9 @@ public class StreamsKafkaClient {
 
             topicRequestDetails.put(internalTopicConfig.name(), topicDetails);
         }
-        final CreateTopicsRequest createTopicsRequest = new CreateTopicsRequest(topicRequestDetails,
streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG));
-        final ClientResponse clientResponse = sendRequest(createTopicsRequest, ApiKeys.CREATE_TOPICS);
+        final CreateTopicsRequest.Builder createTopicsRequest =
+                new CreateTopicsRequest.Builder(topicRequestDetails, streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG));
+        final ClientResponse clientResponse = sendRequest(createTopicsRequest);
         if (!(clientResponse.responseBody() instanceof CreateTopicsResponse)) {
             throw new StreamsException("Inconsistent response type for internal topic creation
request. Expected CreateTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
         }
@@ -167,8 +166,9 @@ public class StreamsKafkaClient {
      */
     private void deleteTopics(final Set<String> topics) {
 
-        final DeleteTopicsRequest deleteTopicsRequest = new DeleteTopicsRequest(topics, streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG));
-        final ClientResponse clientResponse = sendRequest(deleteTopicsRequest, ApiKeys.DELETE_TOPICS);
+        final DeleteTopicsRequest.Builder deleteTopicsRequest =
+                new DeleteTopicsRequest.Builder(topics, streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG));
+        final ClientResponse clientResponse = sendRequest(deleteTopicsRequest);
         if (!(clientResponse.responseBody() instanceof DeleteTopicsResponse)) {
             throw new StreamsException("Inconsistent response type for internal topic deletion
request. Expected DeleteTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
         }
@@ -185,9 +185,8 @@ public class StreamsKafkaClient {
      * Send a request to kafka broker of this client. Keep polling until the corresponding
response is received.
      *
      * @param request
-     * @param apiKeys
      */
-    private ClientResponse sendRequest(final AbstractRequest request, final ApiKeys apiKeys)
{
+    private ClientResponse sendRequest(final AbstractRequest.Builder<?> request) {
 
         String brokerId = null;
 
@@ -212,9 +211,8 @@ public class StreamsKafkaClient {
             throw new StreamsException("Could not find any available broker.");
         }
 
-        final RequestHeader requestHeader = kafkaClient.nextRequestHeader(apiKeys);
-
-        final ClientRequest clientRequest = new ClientRequest(brokerId, Time.SYSTEM.milliseconds(),
true, requestHeader, request, null);
+        final ClientRequest clientRequest = kafkaClient.newClientRequest(
+                brokerId, request, Time.SYSTEM.milliseconds(), true, null);
 
         kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
 
@@ -226,7 +224,7 @@ public class StreamsKafkaClient {
                 if (responseList.size() > 1) {
                     throw new StreamsException("Sent one request but received multiple or
no responses.");
                 }
-                if (responseList.get(0).requestHeader().equals(requestHeader)) {
+                if (responseList.get(0).requestHeader().correlationId() ==  clientRequest.correlationId())
{
                     return responseList.get(0);
                 } else {
                     throw new StreamsException("Inconsistent response received.");
@@ -244,7 +242,7 @@ public class StreamsKafkaClient {
      */
     public MetadataResponse.TopicMetadata getTopicMetadata(final String topic) {
 
-        final ClientResponse clientResponse = sendRequest(MetadataRequest.allTopics(), ApiKeys.METADATA);
+        final ClientResponse clientResponse = sendRequest(MetadataRequest.Builder.allTopics());
         if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
             throw new StreamsException("Inconsistent response type for internal topic metadata
request. Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
         }
@@ -259,7 +257,7 @@ public class StreamsKafkaClient {
 
 
     public Collection<MetadataResponse.TopicMetadata> fetchTopicMetadata() {
-        final ClientResponse clientResponse = sendRequest(MetadataRequest.allTopics(), ApiKeys.METADATA);
+        final ClientResponse clientResponse = sendRequest(MetadataRequest.Builder.allTopics());
         if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
             throw new StreamsException("Inconsistent response type for internal topic metadata
request. Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
         }


Mime
View raw message