kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1163911 - /incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
Date Thu, 01 Sep 2011 05:47:22 GMT
Author: jkreps
Date: Thu Sep  1 05:47:21 2011
New Revision: 1163911

URL: http://svn.apache.org/viewvc?rev=1163911&view=rev
Log:
KAFKA-124 Console consumer does not stop consuming if the program reading from standard out
dies. Check for errors on the output stream and exit if no one is listening.


Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1163911&r1=1163910&r2=1163911&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Thu Sep
 1 05:47:21 2011
@@ -136,20 +136,27 @@ object ConsoleConsumer {
         } catch {
           case e =>
             if (skipMessageOnError)
-              logger.error("error processing message, skipping and resume consumption: "
+ e)
+              logger.error("Error processing message, skipping this message: ", e)
             else
               throw e
         }
+        if(System.out.checkError()) { 
+          // This means no one is listening to our output stream any more, time to shutdown
+          System.err.println("Unable to write to standard out, closing consumer.")
+          formatter.close()
+          connector.shutdown()
+          System.exit(1)
+        }
       }
     } catch {
-      case e => logger.error("error processing message, stop consuming: " + e)
+      case e => logger.error("Error processing message, stopping consumer: ", e)
     }
       
     System.out.flush()
     formatter.close()
     connector.shutdown()
   }
-  
+
   def tryParse(parser: OptionParser, args: Array[String]) = {
     try {
       parser.parse(args : _*)



Mime
View raw message