kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: KAFKA-2576; ConsumerPerformance hangs when SSL enabled for Multi-Partition Topic
Date Thu, 24 Sep 2015 16:25:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk b9ceac3ac -> 54dd6e3ad


KAFKA-2576; ConsumerPerformance hangs when SSL enabled for Multi-Partition Topic

We now write to the channel with an empty buffer when
there are pending bytes remaining and all data has been
sent.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Sriharsha Chintalapani <schintalapani@hortonworks.com>, Ben Stopford <benstopford@gmail.com>,
Jun Rao <junrao@gmail.com>

Closes #236 from ijuma/kafka-2576-ssl-multi-partition-topic-hang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54dd6e3a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54dd6e3a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54dd6e3a

Branch: refs/heads/trunk
Commit: 54dd6e3ad7038ce4a32108b44153ec81caff2c68
Parents: b9ceac3
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Sep 24 09:25:11 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Sep 24 09:25:11 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/ApiUtils.scala    |  9 +++-
 .../main/scala/kafka/api/FetchResponse.scala    | 56 ++++++++++++--------
 2 files changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/54dd6e3a/core/src/main/scala/kafka/api/ApiUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiUtils.scala b/core/src/main/scala/kafka/api/ApiUtils.scala
index 1f80de1..ca0a63f 100644
--- a/core/src/main/scala/kafka/api/ApiUtils.scala
+++ b/core/src/main/scala/kafka/api/ApiUtils.scala
@@ -14,10 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- package kafka.api
+package kafka.api
 
 import java.nio._
+import java.nio.channels.GatheringByteChannel
 import kafka.common._
+import org.apache.kafka.common.network.TransportLayer
 
 /**
  * Helper functions specific to parsing or serializing requests and responses
@@ -107,5 +109,10 @@ object ApiUtils {
       throw new KafkaException(name + " has value " + value + " which is not in the range
" + range + ".")
     else value
   }
+
+  private[api] def hasPendingWrites(channel: GatheringByteChannel): Boolean = channel match
{
+    case t: TransportLayer => t.hasPendingWrites
+    case _ => false
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/54dd6e3a/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 2c07033..aa15612 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -24,9 +24,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.message.{MessageSet, ByteBufferMessageSet}
 import kafka.api.ApiUtils._
 import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.network.TransportLayer
-import org.apache.kafka.common.network.Send
-import org.apache.kafka.common.network.MultiSend
+import org.apache.kafka.common.network.{SSLTransportLayer, TransportLayer, Send, MultiSend}
 
 import scala.collection._
 
@@ -55,6 +53,7 @@ case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError,
hw: L
 
 class PartitionDataSend(val partitionId: Int,
                         val partitionData: FetchResponsePartitionData) extends Send {
+  private val emptyBuffer = ByteBuffer.allocate(0)
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0
   private var pending = false
@@ -71,15 +70,20 @@ class PartitionDataSend(val partitionId: Int,
 
   override def writeTo(channel: GatheringByteChannel): Long = {
     var written = 0L
-    if(buffer.hasRemaining)
+    if (buffer.hasRemaining)
       written += channel.write(buffer)
-    if (!buffer.hasRemaining && messagesSentSize < messageSize) {
-      val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize
- messagesSentSize)
-      messagesSentSize += bytesSent
-      written += bytesSent
+    if (!buffer.hasRemaining) {
+      if (messagesSentSize < messageSize) {
+        val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize
- messagesSentSize)
+        messagesSentSize += bytesSent
+        written += bytesSent
+      }
+      if (messagesSentSize >= messageSize && hasPendingWrites(channel))
+        channel.write(emptyBuffer)
     }
-    if (channel.isInstanceOf[TransportLayer])
-      pending = channel.asInstanceOf[TransportLayer].hasPendingWrites
+
+    pending = hasPendingWrites(channel)
+
     written
   }
 
@@ -112,6 +116,8 @@ case class TopicData(topic: String, partitionData: Map[Int, FetchResponsePartiti
 
 class TopicDataSend(val dest: String, val topicData: TopicData) extends Send {
 
+  private val emptyBuffer = ByteBuffer.allocate(0)
+
   private var sent = 0L
 
   private var pending = false
@@ -135,14 +141,16 @@ class TopicDataSend(val dest: String, val topicData: TopicData) extends
Send {
       throw new KafkaException("This operation cannot be completed on a complete request.")
 
     var written = 0L
-    if(buffer.hasRemaining)
+    if (buffer.hasRemaining)
       written += channel.write(buffer)
-    if(!buffer.hasRemaining && !sends.completed) {
-      written += sends.writeTo(channel)
+    if (!buffer.hasRemaining) {
+      if (!sends.completed)
+        written += sends.writeTo(channel)
+      if (sends.completed && hasPendingWrites(channel))
+        written += channel.write(emptyBuffer)
     }
 
-    if (channel.isInstanceOf[TransportLayer])
-      pending = channel.asInstanceOf[TransportLayer].hasPendingWrites
+    pending = hasPendingWrites(channel)
 
     sent += written
     written
@@ -245,6 +253,9 @@ case class FetchResponse(correlationId: Int,
 
 
 class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse) extends Send
{
+
+  private val emptyBuffer = ByteBuffer.allocate(0)
+
   private val payloadSize = fetchResponse.sizeInBytes
 
   private var sent = 0L
@@ -272,15 +283,18 @@ class FetchResponseSend(val dest: String, val fetchResponse: FetchResponse)
exte
       throw new KafkaException("This operation cannot be completed on a complete request.")
 
     var written = 0L
-    if(buffer.hasRemaining)
+
+    if (buffer.hasRemaining)
       written += channel.write(buffer)
-    if(!buffer.hasRemaining && !sends.completed) {
-      written += sends.writeTo(channel)
+    if (!buffer.hasRemaining) {
+      if (!sends.completed)
+        written += sends.writeTo(channel)
+      if (sends.completed && hasPendingWrites(channel))
+        written += channel.write(emptyBuffer)
     }
-    sent += written
 
-    if (channel.isInstanceOf[TransportLayer])
-      pending = channel.asInstanceOf[TransportLayer].hasPendingWrites
+    sent += written
+    pending = hasPendingWrites(channel)
 
     written
   }


Mime
View raw message