kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject kafka git commit: kafka-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer); patched by Dong Lin; reviewed by Jun Rao
Date Thu, 16 Jul 2015 23:38:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 458ebeb04 -> 8fa24a617


kafka-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer);
patched by Dong Lin; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8fa24a61
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8fa24a61
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8fa24a61

Branch: refs/heads/trunk
Commit: 8fa24a6171781bbdb2b814dcdc37520a92511f69
Parents: 458ebeb
Author: Dong Lin <lindong28@gmail.com>
Authored: Thu Jul 16 16:38:41 2015 -0700
Committer: Jun Rao <junrao@gmail.com>
Committed: Thu Jul 16 16:38:41 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/SimpleConsumer.scala   | 15 ++++++++++++++-
 .../scala/kafka/server/AbstractFetcherThread.scala   |  4 +++-
 2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8fa24a61/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index c16f7ed..7ebc040 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -18,7 +18,7 @@
 package kafka.consumer
 
 
-import java.nio.channels.ClosedByInterruptException
+import java.nio.channels.{AsynchronousCloseException, ClosedByInterruptException}
 
 import kafka.api._
 import kafka.network._
@@ -59,6 +59,16 @@ class SimpleConsumer(val host: String,
     connect()
   }
 
+  /**
+   * Unblock thread by closing channel and triggering AsynchronousCloseException if a read
operation is in progress.
+   *
+   * This handles a bug found in Java 1.7 and below, where interrupting a thread can not
correctly unblock
+   * the thread from waiting on ReadableByteChannel.read().
+   */
+  def disconnectToHandleJavaIOBug() = {
+    disconnect()
+  }
+
   def close() {
     lock synchronized {
       disconnect()
@@ -76,6 +86,9 @@ class SimpleConsumer(val host: String,
       } catch {
         case e : ClosedByInterruptException =>
           throw e
+        // Should not observe this exception when running Kafka with Java 1.8
+        case e: AsynchronousCloseException =>
+          throw e
         case e : Throwable =>
           info("Reconnect due to socket error: %s".format(e.toString))
           // retry once

http://git-wip-us.apache.org/repos/asf/kafka/blob/8fa24a61/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 83fc474..f843061 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -67,7 +67,9 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
 
   override def shutdown(){
-    initiateShutdown()
+    val justShutdown = initiateShutdown()
+    if (justShutdown && isInterruptible)
+      simpleConsumer.disconnectToHandleJavaIOBug()
     inLock(partitionMapLock) {
       partitionMapCond.signalAll()
     }


Mime
View raw message