kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/2] kafka git commit: KAFKA-4039; Fix deadlock during shutdown due to log truncation not allowed
Date Thu, 02 Feb 2017 22:24:19 GMT
KAFKA-4039; Fix deadlock during shutdown due to log truncation not allowed

Author: Maysam Yabandeh <myabandeh@dropbox.com>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #2474 from ijuma/kafka-4039-deadlock-during-shutdown


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cb674e54
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cb674e54
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cb674e54

Branch: refs/heads/trunk
Commit: cb674e5487f3f56647546b323dfe4fd45ccf0186
Parents: 76550dd
Author: Maysam Yabandeh <myabandeh@dropbox.com>
Authored: Thu Feb 2 22:22:31 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Feb 2 22:23:49 2017 +0000

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 +
 .../kafka/common/internals/FatalExitError.java  |  47 ++++++
 .../org/apache/kafka/common/utils/Exit.java     |  79 ++++++++++
 .../org/apache/kafka/common/utils/Utils.java    |   2 +-
 .../kafka/connect/cli/ConnectDistributed.java   |   3 +-
 .../kafka/connect/cli/ConnectStandalone.java    |   3 +-
 .../runtime/distributed/DistributedHerder.java  |   3 +-
 core/src/main/scala/kafka/Kafka.scala           |  16 +-
 .../src/main/scala/kafka/admin/AclCommand.scala |   2 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |   4 +-
 .../main/scala/kafka/cluster/Partition.scala    |   2 +-
 .../kafka/consumer/ConsumerFetcherManager.scala |   4 +-
 .../kafka/server/AbstractFetcherThread.scala    |   3 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   7 +-
 .../scala/kafka/server/KafkaHealthcheck.scala   |   1 -
 .../kafka/server/KafkaRequestHandler.scala      |  32 ++--
 .../main/scala/kafka/server/KafkaServer.scala   |  10 +-
 .../kafka/server/KafkaServerStartable.scala     |  27 ++--
 .../scala/kafka/server/OffsetCheckpoint.scala   |   4 +-
 .../kafka/server/ReplicaFetcherManager.scala    |  14 +-
 .../kafka/server/ReplicaFetcherThread.scala     |  20 +--
 .../scala/kafka/server/ReplicaManager.scala     |  25 ++--
 .../scala/kafka/tools/ConsoleConsumer.scala     |  13 +-
 .../scala/kafka/tools/ConsoleProducer.scala     |   8 +-
 .../kafka/tools/ConsumerOffsetChecker.scala     |   2 +-
 .../scala/kafka/tools/EndToEndLatency.scala     |   3 +-
 .../scala/kafka/tools/ExportZkOffsets.scala     |   6 +-
 .../main/scala/kafka/tools/GetOffsetShell.scala |   6 +-
 .../scala/kafka/tools/ImportZkOffsets.scala     |   5 +-
 core/src/main/scala/kafka/tools/JmxTool.scala   |   6 +-
 .../scala/kafka/tools/ProducerPerformance.scala |   7 +-
 .../kafka/tools/ReplicaVerificationTool.scala   |   4 +-
 .../kafka/tools/SimpleConsumerPerformance.scala |   2 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala |  14 +-
 .../kafka/tools/StateChangeLogMerger.scala      |  12 +-
 .../main/scala/kafka/tools/StreamsResetter.java |   3 +-
 .../scala/kafka/tools/UpdateOffsetsInZK.scala   |   4 +-
 .../kafka/tools/VerifyConsumerRebalance.scala   |   5 +-
 .../kafka/tools/ZooKeeperMainWrapper.scala      |   3 +-
 .../scala/kafka/utils/CommandLineUtils.scala    |  20 +--
 core/src/main/scala/kafka/utils/Exit.scala      |  53 +++++++
 .../scala/kafka/utils/ShutdownableThread.scala  |  27 ++--
 .../api/GroupCoordinatorIntegrationTest.scala   |   2 +-
 .../ReplicaFetcherThreadFatalErrorTest.scala    | 145 +++++++++++++++++++
 .../scala/kafka/security/minikdc/MiniKdc.scala  |   4 +-
 .../scala/kafka/tools/TestLogCleaning.scala     |   2 +-
 .../other/kafka/ReplicationQuotasTestRig.scala  |   4 +-
 .../scala/other/kafka/TestKafkaAppender.scala   |   6 +-
 .../other/kafka/TestLinearWriteSpeed.scala      |   2 +-
 .../scala/other/kafka/TestOffsetManager.scala   |   9 +-
 .../test/scala/unit/kafka/KafkaConfigTest.scala |  36 +----
 .../ReassignPartitionsCommandArgsTest.scala     |   9 +-
 .../scala/unit/kafka/log/OffsetMapTest.scala    |   4 +-
 .../unit/kafka/server/ApiVersionsTest.scala     |   2 +-
 .../kafka/server/BaseReplicaFetchTest.scala     |   1 -
 .../kafka/utils/ShutdownableThreadTest.scala    |  54 +++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala |   4 +-
 .../streams/tests/ShutdownDeadlockTest.java     |   3 +-
 .../kafka/streams/tests/SmokeTestDriver.java    |   3 +-
 .../kafka/tools/ClientCompatibilityTest.java    |  11 +-
 .../apache/kafka/tools/ProducerPerformance.java |   5 +-
 .../apache/kafka/tools/VerifiableConsumer.java  |   5 +-
 .../kafka/tools/VerifiableLog4jAppender.java    |   5 +-
 .../apache/kafka/tools/VerifiableProducer.java  |   5 +-
 64 files changed, 602 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 04f364c..6c72e63 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -42,6 +42,7 @@
   <allow pkg="org.apache.kafka.common" exact-match="true" />
   <allow pkg="org.apache.kafka.common.security" />
   <allow pkg="org.apache.kafka.common.utils" />
+  <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
 
   <subpackage name="common">
     <disallow pkg="org.apache.kafka.clients" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/clients/src/main/java/org/apache/kafka/common/internals/FatalExitError.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/FatalExitError.java b/clients/src/main/java/org/apache/kafka/common/internals/FatalExitError.java
new file mode 100644
index 0000000..4f23c4e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/FatalExitError.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.utils.Exit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An error that indicates the need to exit the JVM process. This should only be used by the server or command-line
+ * tools. Clients should never shutdown the JVM process.
+ *
+ * This exception is expected to be caught at the highest level of the thread so that no shared lock is held by
+ * the thread when it calls {@link Exit#exit(int)}.
+ */
+public class FatalExitError extends Error {
+
+    private static final Logger log = LoggerFactory.getLogger(FatalExitError.class);
+
+    private final static long serialVersionUID = 1L;
+
+    private final int statusCode;
+
+    public FatalExitError(int statusCode) {
+        if (statusCode == 0)
+            throw new IllegalArgumentException("statusCode must not be 0");
+        this.statusCode = statusCode;
+    }
+
+    public FatalExitError() {
+        this(1);
+    }
+
+    public int statusCode() {
+        return statusCode;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/clients/src/main/java/org/apache/kafka/common/utils/Exit.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java
new file mode 100644
index 0000000..a1357fd
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+/**
+ * Internal class that should be used instead of `Exit.exit()` and `Runtime.getRuntime().halt()` so that tests can
+ * easily change the behaviour.
+ */
+public class Exit {
+
+    public interface Procedure {
+        void execute(int statusCode, String message);
+    }
+
+    private static final Procedure DEFAULT_HALT_PROCEDURE = new Procedure() {
+        @Override
+        public void execute(int statusCode, String message) {
+            Runtime.getRuntime().halt(statusCode);
+        }
+    };
+
+    private static final Procedure DEFAULT_EXIT_PROCEDURE = new Procedure() {
+        @Override
+        public void execute(int statusCode, String message) {
+            System.exit(statusCode);
+        }
+    };
+
+    private volatile static Procedure exitProcedure = DEFAULT_EXIT_PROCEDURE;
+    private volatile static Procedure haltProcedure = DEFAULT_HALT_PROCEDURE;
+
+    public static void exit(int statusCode) {
+        exit(statusCode, null);
+    }
+
+    public static void exit(int statusCode, String message) {
+        exitProcedure.execute(statusCode, message);
+    }
+
+    public static void halt(int statusCode) {
+        halt(statusCode, null);
+    }
+
+    public static void halt(int statusCode, String message) {
+        haltProcedure.execute(statusCode, message);
+    }
+
+    public static void setExitProcedure(Procedure procedure) {
+        exitProcedure = procedure;
+    }
+
+    public static void setHaltProcedure(Procedure procedure) {
+        haltProcedure = procedure;
+    }
+
+    public static void resetExitProcedure() {
+        exitProcedure = DEFAULT_EXIT_PROCEDURE;
+    }
+
+    public static void resetHaltProcedure() {
+        haltProcedure = DEFAULT_HALT_PROCEDURE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index afa85bd..ab89f6b 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -546,7 +546,7 @@ public class Utils {
      */
     public static void croak(String message) {
         System.err.println(message);
-        System.exit(1);
+        Exit.exit(1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 778673b..f9cf207 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.cli;
 
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
@@ -52,7 +53,7 @@ public class ConnectDistributed {
     public static void main(String[] args) throws Exception {
         if (args.length < 1) {
             log.info("Usage: ConnectDistributed worker.properties");
-            System.exit(1);
+            Exit.exit(1);
         }
 
         String workerPropsFile = args[0];

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index e3be1c5..6cf04c2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.cli;
 
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
@@ -57,7 +58,7 @@ public class ConnectStandalone {
 
         if (args.length < 2) {
             log.info("Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...]");
-            System.exit(1);
+            Exit.exit(1);
         }
 
         String workerPropsFile = args[0];

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 25dfc6b..1cc9789 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -20,6 +20,7 @@ package org.apache.kafka.connect.runtime.distributed;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
@@ -205,7 +206,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             log.info("Herder stopped");
         } catch (Throwable t) {
             log.error("Uncaught exception in herder work thread, exiting: ", t);
-            System.exit(1);
+            Exit.exit(1);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala
index 88508b5..b43a2b7 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 
 import joptsimple.OptionParser
 import kafka.server.{KafkaServer, KafkaServerStartable}
-import kafka.utils.{CommandLineUtils, Logging}
+import kafka.utils.{CommandLineUtils, Exit, Logging}
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -58,20 +58,18 @@ object Kafka extends Logging {
       val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
 
       // attach shutdown handler to catch control-c
-      Runtime.getRuntime().addShutdownHook(new Thread() {
-        override def run() = {
-          kafkaServerStartable.shutdown
-        }
+      Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
+        override def run(): Unit = kafkaServerStartable.shutdown()
       })
 
-      kafkaServerStartable.startup
-      kafkaServerStartable.awaitShutdown
+      kafkaServerStartable.startup()
+      kafkaServerStartable.awaitShutdown()
     }
     catch {
       case e: Throwable =>
         fatal(e)
-        System.exit(1)
+        Exit.exit(1)
     }
-    System.exit(0)
+    Exit.exit(0)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 58c966d..8cb06e8 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -54,7 +54,7 @@ object AclCommand {
       case e: Throwable =>
         println(s"Error while executing ACL command: ${e.getMessage}")
         println(Utils.stackTrace(e))
-        System.exit(-1)
+        Exit.exit(1)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 1078ba2..57dfc5a 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -73,7 +73,7 @@ object TopicCommand extends Logging {
         exitCode = 1
     } finally {
       zkUtils.close()
-      System.exit(exitCode)
+      Exit.exit(exitCode)
     }
 
   }
@@ -360,7 +360,7 @@ object TopicCommand extends Logging {
     println("Are you sure you want to continue? [y/n]")
     if (!Console.readLine().equalsIgnoreCase("y")) {
       println("Ending your session")
-      System.exit(0)
+      Exit.exit(0)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 46815e1..c2d34d9 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -144,7 +144,7 @@ class Partition(val topic: String,
       } catch {
         case e: IOException =>
           fatal(s"Error deleting the log for partition $topicPartition", e)
-          Runtime.getRuntime.halt(1)
+          Exit.halt(1)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 3b054e4..57a97ef 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -19,7 +19,6 @@ package kafka.consumer
 
 import kafka.server.{AbstractFetcherManager, AbstractFetcherThread, BrokerAndInitialOffset}
 import kafka.cluster.{BrokerEndPoint, Cluster}
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 
@@ -98,7 +97,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
           topicPartition -> BrokerAndInitialOffset(broker, partitionMap(topicPartition).getFetchOffset())}
         )
       } catch {
-        case t: Throwable => {
+        case t: Throwable =>
           if (!isRunning.get())
             throw t /* If this thread is stopped, propagate this exception to kill the thread. */
           else {
@@ -108,7 +107,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
             lock.unlock()
           }
         }
-      }
 
       shutdownIdleFetcherThreads()
       Thread.sleep(config.refreshLeaderBackoffMs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index ec25700..febe9da 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.PartitionStates
+import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
 import org.apache.kafka.common.record.MemoryRecords
 
 /**
@@ -177,6 +177,7 @@ abstract class AbstractFetcherThread(name: String,
                     error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
                       .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
                   } catch {
+                    case e: FatalExitError => throw e
                     case e: Throwable =>
                       error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
                       updatePartitionsWithError(topicPartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 785c9ae..8a2672f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -34,8 +34,9 @@ import kafka.network._
 import kafka.network.RequestChannel.{Response, Session}
 import kafka.security.auth
 import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Write}
-import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
@@ -101,6 +102,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
+      case e: FatalExitError => throw e
       case e: Throwable =>
         if (request.requestObj != null) {
           request.requestObj.handleError(e, requestChannel, request)
@@ -154,9 +156,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
     } catch {
+      case e: FatalExitError => throw e
       case e: KafkaStorageException =>
         fatal("Disk error during leadership change.", e)
-        Runtime.getRuntime.halt(1)
+        Exit.halt(1)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index c7b398f..0b62fca 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -26,7 +26,6 @@ import kafka.cluster.EndPoint
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.I0Itec.zkclient.IZkStateListener
-import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.zookeeper.Watcher.Event.KeeperState
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 97145b4..c9c31ad 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -20,9 +20,10 @@ package kafka.server
 import kafka.network._
 import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import com.yammer.metrics.core.Meter
+import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.utils.{Time, Utils}
 
 /**
@@ -36,9 +37,10 @@ class KafkaRequestHandler(id: Int,
                           apis: KafkaApis,
                           time: Time) extends Runnable with Logging {
   this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
+  private val latch = new CountDownLatch(1)
 
   def run() {
-    while(true) {
+    while (true) {
       try {
         var req : RequestChannel.Request = null
         while (req == null) {
@@ -52,21 +54,27 @@ class KafkaRequestHandler(id: Int,
           aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
         }
 
-        if(req eq RequestChannel.AllDone) {
-          debug("Kafka request handler %d on broker %d received shut down command".format(
-            id, brokerId))
+        if (req eq RequestChannel.AllDone) {
+          debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))
+          latch.countDown()
           return
         }
         req.requestDequeueTimeMs = time.milliseconds
         trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
         apis.handle(req)
       } catch {
+        case e: FatalExitError =>
+          latch.countDown()
+          Exit.exit(e.statusCode)
         case e: Throwable => error("Exception when handling request", e)
       }
     }
   }
 
-  def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
+  def initiateShutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
+
+  def awaitShutdown(): Unit = latch.await()
+
 }
 
 class KafkaRequestHandlerPool(val brokerId: Int,
@@ -79,20 +87,18 @@ class KafkaRequestHandlerPool(val brokerId: Int,
   private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
 
   this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
-  val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
   for(i <- 0 until numThreads) {
     runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
-    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
-    threads(i).start()
+    Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()
   }
 
   def shutdown() {
     info("shutting down")
-    for(handler <- runnables)
-      handler.shutdown
-    for(thread <- threads)
-      thread.join
+    for (handler <- runnables)
+      handler.initiateShutdown()
+    for (handler <- runnables)
+      handler.awaitShutdown()
     info("shut down completely")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index dcbd3b4..779f489 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -214,8 +214,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         socketServer.startup()
 
         /* start replica manager */
-        replicaManager = new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager,
-          isShuttingDown, quotaManagers.follower)
+        replicaManager = createReplicaManager(isShuttingDown)
         replicaManager.startup()
 
         /* start kafka controller */
@@ -290,12 +289,15 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     }
   }
 
-  def notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit = {
+  private def notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit = {
     val clusterResourceListeners = new ClusterResourceListeners
     clusterResourceListeners.maybeAddAll(clusterListeners.asJava)
     clusterResourceListeners.onUpdate(new ClusterResource(clusterId))
   }
 
+  protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager =
+    new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown, quotaManagers.follower)
+
   private def initZk(): ZkUtils = {
     info(s"Connecting to zookeeper on ${config.zkConnect}")
 
@@ -571,7 +573,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
     try {
       info("shutting down")
 
-      if(isStartingUp.get)
+      if (isStartingUp.get)
         throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
 
       val canShutdown = isShuttingDown.compareAndSet(false, true)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/server/KafkaServerStartable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 15406ac..eb290d2 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -20,7 +20,7 @@ package kafka.server
 import java.util.Properties
 
 import kafka.metrics.KafkaMetricsReporter
-import kafka.utils.{VerifiableProperties, Logging}
+import kafka.utils.{Exit, Logging, VerifiableProperties}
 
 object KafkaServerStartable {
   def fromProps(serverProps: Properties) = {
@@ -35,26 +35,22 @@ class KafkaServerStartable(val serverConfig: KafkaConfig, reporters: Seq[KafkaMe
   def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
 
   def startup() {
-    try {
-      server.startup()
-    }
+    try server.startup()
     catch {
-      case e: Throwable =>
-        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
-        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
-        System.exit(1)
+      case _: Throwable =>
+        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
+        fatal("Exiting Kafka.")
+        Exit.exit(1)
     }
   }
 
   def shutdown() {
-    try {
-      server.shutdown()
-    }
+    try server.shutdown()
     catch {
-      case e: Throwable =>
-        fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
+      case _: Throwable =>
+        fatal("Halting Kafka.")
         // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
-        Runtime.getRuntime.halt(1)
+        Exit.halt(1)
     }
   }
 
@@ -66,8 +62,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig, reporters: Seq[KafkaMe
     server.brokerState.newState(newState)
   }
 
-  def awaitShutdown() = 
-    server.awaitShutdown
+  def awaitShutdown(): Unit = server.awaitShutdown()
 
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index c838e09..084ed60 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -22,7 +22,7 @@ import java.util.regex.Pattern
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection._
-import kafka.utils.Logging
+import kafka.utils.{Exit, Logging}
 import kafka.common._
 import java.io._
 
@@ -66,7 +66,7 @@ class OffsetCheckpoint(val file: File) extends Logging {
         case e: FileNotFoundException =>
           if (FileSystems.getDefault.isReadOnly) {
             fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e)
-            Runtime.getRuntime.halt(1)
+            Exit.halt(1)
           }
           throw e
       } finally {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 3894d9b..a50b0bb 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -21,19 +21,15 @@ import kafka.cluster.BrokerEndPoint
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 
-class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics: Metrics, time: Time, threadNamePrefix: Option[String] = None, quotaManager: ReplicationQuotaManager)
+class ReplicaFetcherManager(brokerConfig: KafkaConfig, protected val replicaManager: ReplicaManager, metrics: Metrics,
+                            time: Time, threadNamePrefix: Option[String] = None, quotaManager: ReplicationQuotaManager)
       extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
         "Replica", brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
-    val threadName = threadNamePrefix match {
-      case None =>
-        "ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id)
-      case Some(p) =>
-        "%s:ReplicaFetcherThread-%d-%d".format(p, fetcherId, sourceBroker.id)
-    }
-    new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig,
-      replicaMgr, metrics, time, quotaManager)
+    val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("")
+    val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
+    new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, replicaManager, metrics, time, quotaManager)
   }
 
   def shutdown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index df640eb..6e6cffa 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -26,13 +26,15 @@ import kafka.log.LogConfig
 import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_9_0}
 import kafka.common.KafkaStorageException
 import ReplicaFetcherThread._
-import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
-import org.apache.kafka.common.network.{ChannelBuilders, Mode, NetworkReceive, Selectable, Selector}
+import kafka.utils.Exit
+import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.common.internals.FatalExitError
+import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector}
 import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse}
 import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.utils.Time
@@ -143,7 +145,7 @@ class ReplicaFetcherThread(name: String,
     } catch {
       case e: KafkaStorageException =>
         fatal(s"Disk error while replicating data for $topicPartition", e)
-        Runtime.getRuntime.halt(1)
+        Exit.halt(1)
     }
   }
 
@@ -181,11 +183,11 @@ class ReplicaFetcherThread(name: String,
       // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
       if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils,
         ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
-        // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
-        fatal("Exiting because log truncation is not allowed for partition %s,".format(topicPartition) +
-          " Current leader %d's latest offset %d is less than replica %d's latest offset %d"
-          .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset))
-        System.exit(1)
+        // Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly.
+        fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader " +
+          s"${sourceBroker.id}'s latest offset $leaderEndOffset is less than replica ${brokerConfig.brokerId}'s latest " +
+          s"offset ${replica.logEndOffset.messageOffset}")
+        throw new FatalExitError
       }
 
       warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d"

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 1aa88a2..475b2ed 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -116,7 +116,7 @@ class ReplicaManager(val config: KafkaConfig,
   private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp =>
     new Partition(tp.topic, tp.partition, time, this)))
   private val replicaStateChangeLock = new Object
-  val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
+  val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManager)
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
   private var hwThreadInitialized = false
@@ -134,9 +134,7 @@ class ReplicaManager(val config: KafkaConfig,
   val leaderCount = newGauge(
     "LeaderCount",
     new Gauge[Int] {
-      def value = {
-          getLeaderPartitions().size
-      }
+      def value = getLeaderPartitions.size
     }
   )
   val partitionCount = newGauge(
@@ -148,15 +146,14 @@ class ReplicaManager(val config: KafkaConfig,
   val underReplicatedPartitions = newGauge(
     "UnderReplicatedPartitions",
     new Gauge[Int] {
-      def value = underReplicatedPartitionCount()
+      def value = underReplicatedPartitionCount
     }
   )
   val isrExpandRate = newMeter("IsrExpandsPerSec",  "expands", TimeUnit.SECONDS)
   val isrShrinkRate = newMeter("IsrShrinksPerSec",  "shrinks", TimeUnit.SECONDS)
 
-  def underReplicatedPartitionCount(): Int = {
-      getLeaderPartitions().count(_.isUnderReplicated)
-  }
+  def underReplicatedPartitionCount: Int =
+    getLeaderPartitions.count(_.isUnderReplicated)
 
   def startHighWaterMarksCheckPointThread() = {
     if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
@@ -411,7 +408,7 @@ class ReplicaManager(val config: KafkaConfig,
           // it is supposed to indicate un-expected failures of a broker in handling a produce request
           case e: KafkaStorageException =>
             fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
-            Runtime.getRuntime.halt(1)
+            Exit.halt(1)
             (topicPartition, null)
           case e@ (_: UnknownTopicOrPartitionException |
                    _: NotLeaderForPartitionException |
@@ -928,9 +925,8 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def getLeaderPartitions(): List[Partition] = {
+  private def getLeaderPartitions: List[Partition] =
     allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList
-  }
 
   def getHighWatermark(topicPartition: TopicPartition): Option[Long] = {
     getPartition(topicPartition).flatMap { partition =>
@@ -949,7 +945,7 @@ class ReplicaManager(val config: KafkaConfig,
       } catch {
         case e: IOException =>
           fatal("Error writing to highwatermark file: ", e)
-          Runtime.getRuntime.halt(1)
+          Exit.halt(1)
       }
     }
   }
@@ -964,4 +960,9 @@ class ReplicaManager(val config: KafkaConfig,
       checkpointHighWatermarks()
     info("Shut down completely")
   }
+
+  protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
+    new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 71a83b3..2edb88d 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -51,7 +51,7 @@ object ConsoleConsumer extends Logging {
     } catch {
       case e: Throwable =>
         error("Unknown error when running consumer: ", e)
-        System.exit(1)
+        Exit.exit(1)
     }
   }
 
@@ -89,14 +89,14 @@ object ConsoleConsumer extends Logging {
   def checkZk(config: ConsumerConfig) {
     if (!checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/brokers/ids")) {
       System.err.println("No brokers found in ZK.")
-      System.exit(1)
+      Exit.exit(1)
     }
 
     if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) &&
       checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id") + "/offsets")) {
       System.err.println("Found previous offset information for this group " + config.consumerProps.getProperty("group.id")
         + ". Please use --delete-consumer-offsets to delete previous offsets metadata")
-      System.exit(1)
+      Exit.exit(1)
     }
   }
 
@@ -177,7 +177,7 @@ object ConsoleConsumer extends Logging {
       checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + props.getProperty("group.id") + "/offsets")) {
       System.err.println("Found previous offset information for this group " + props.getProperty("group.id")
         + ". Please use --delete-consumer-offsets to delete previous offsets metadata")
-      System.exit(1)
+      Exit.exit(1)
     }
 
     if (config.options.has(config.deleteConsumerOffsetsOpt))
@@ -389,13 +389,12 @@ object ConsoleConsumer extends Logging {
       groupIdPassed = false
     }
 
-    def tryParse(parser: OptionParser, args: Array[String]) = {
+    def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
       try
         parser.parse(args: _*)
       catch {
         case e: OptionException =>
-          Utils.croak(e.getMessage)
-          null
+          CommandLineUtils.printUsageAndDie(parser, e.getMessage)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 93454d6..6d99d99 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -20,7 +20,7 @@ package kafka.tools
 import kafka.common._
 import kafka.message._
 import kafka.serializer._
-import kafka.utils.{CommandLineUtils, ToolsUtils}
+import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
 import kafka.producer.{NewShinyProducer, OldProducer}
 import java.util.Properties
 import java.io._
@@ -62,12 +62,12 @@ object ConsoleProducer {
     } catch {
       case e: joptsimple.OptionException =>
         System.err.println(e.getMessage)
-        System.exit(1)
+        Exit.exit(1)
       case e: Exception =>
         e.printStackTrace
-        System.exit(1)
+        Exit.exit(1)
     }
-    System.exit(0)
+    Exit.exit(0)
   }
 
   def getReaderProps(config: ProducerConfig): Properties = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index fa8febc..7cc2ea7 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -126,7 +126,7 @@ object ConsumerOffsetChecker extends Logging {
 
     if (options.has("help")) {
        parser.printHelpOn(System.out)
-       System.exit(0)
+       Exit.exit(0)
     }
 
     CommandLineUtils.checkRequiredArgs(parser, options, groupOpt, zkConnectOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/EndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/EndToEndLatency.scala b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
index b9c9e3e..06b72f8 100755
--- a/core/src/main/scala/kafka/tools/EndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/EndToEndLatency.scala
@@ -19,6 +19,7 @@ package kafka.tools
 
 import java.util.{Arrays, Collections, Properties}
 
+import kafka.utils.Exit
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.utils.Utils
@@ -44,7 +45,7 @@ object EndToEndLatency {
   def main(args: Array[String]) {
     if (args.length != 5 && args.length != 6) {
       System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes [optional] properties_file")
-      System.exit(1)
+      Exit.exit(1)
     }
 
     val brokerList = args(0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 0b11ec8..9e605ab 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -18,9 +18,11 @@
 package kafka.tools
 
 import java.io.FileWriter
+
 import joptsimple._
-import kafka.utils.{Logging, ZkUtils, ZKGroupTopicDirs, CommandLineUtils}
+import kafka.utils.{CommandLineUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
+
 import scala.collection.JavaConverters._
 
 
@@ -64,7 +66,7 @@ object ExportZkOffsets extends Logging {
     
     if (options.has("help")) {
        parser.printHelpOn(System.out)
-       System.exit(0)
+       Exit.exit(0)
     }
     
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, outFileOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 979354b..9a19b1f 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -20,10 +20,10 @@ package kafka.tools
 
 import kafka.consumer._
 import joptsimple._
-import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
+import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.common.TopicAndPartition
 import kafka.client.ClientUtils
-import kafka.utils.{ToolsUtils, CommandLineUtils}
+import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
 
 
 object GetOffsetShell {
@@ -80,7 +80,7 @@ object GetOffsetShell {
     if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
         "kafka-list-topic.sh to verify")
-      System.exit(1)
+      Exit.exit(1)
     }
     val partitions =
       if(partitionList == "") {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index 60d48fa..ded23a4 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -19,8 +19,9 @@ package kafka.tools
 
 import java.io.BufferedReader
 import java.io.FileReader
+
 import joptsimple._
-import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
+import kafka.utils.{CommandLineUtils, Exit, Logging, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.security.JaasUtils
 
@@ -61,7 +62,7 @@ object ImportZkOffsets extends Logging {
     
     if (options.has("help")) {
        parser.printHelpOn(System.out)
-       System.exit(0)
+       Exit.exit(0)
     }
     
     CommandLineUtils.checkRequiredArgs(parser, options, inFileOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 3538874..a1ceb03 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -22,11 +22,13 @@ import java.util.Date
 import java.text.SimpleDateFormat
 import javax.management._
 import javax.management.remote._
+
 import joptsimple.OptionParser
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.math._
-import kafka.utils.{CommandLineUtils, Logging}
+import kafka.utils.{CommandLineUtils, Exit, Logging}
 
 object JmxTool extends Logging {
 
@@ -71,7 +73,7 @@ object JmxTool extends Logging {
 
     if(options.has(helpOpt)) {
       parser.printHelpOn(System.out)
-      System.exit(0)
+      Exit.exit(0)
     }
 
     val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index d4c0f34..3808627 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -18,11 +18,10 @@
 package kafka.tools
 
 import kafka.metrics.KafkaMetricsReporter
-import kafka.producer.{OldProducer, NewShinyProducer}
-import kafka.utils.{ToolsUtils, VerifiableProperties, Logging, CommandLineUtils}
+import kafka.producer.{NewShinyProducer, OldProducer}
+import kafka.utils.{CommandLineUtils, Exit, Logging, ToolsUtils, VerifiableProperties}
 import kafka.message.CompressionCodec
 import kafka.serializer._
-
 import java.util.concurrent.{CountDownLatch, Executors}
 import java.util.concurrent.atomic.AtomicLong
 import java.util._
@@ -67,7 +66,7 @@ object ProducerPerformance extends Logging {
       config.dateFormat.format(startMs), config.dateFormat.format(endMs),
       config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
       totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))
-    System.exit(0)
+    Exit.exit(0)
   }
 
   class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 75d6e8a..7e31ac7 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -136,7 +136,7 @@ object ReplicaVerificationTool extends Logging {
 
     if (filteredTopicMetadata.isEmpty) {
       error("No topics found. " + topicWhiteListOpt + ", if specified, is either filtering out all topics or there is no topic.")
-      System.exit(1)
+      Exit.exit(1)
     }
 
     val topicPartitionReplicaList: Seq[TopicPartitionReplica] = filteredTopicMetadata.flatMap(
@@ -302,7 +302,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
                         + ": replica " + messageInfoFromFirstReplica.replicaId + "'s offset "
                         + messageInfoFromFirstReplica.offset + " doesn't match replica "
                         + replicaId + "'s offset " + logEntry.offset)
-                      System.exit(1)
+                      Exit.exit(1)
                     }
                     if (messageInfoFromFirstReplica.checksum != logEntry.record.checksum)
                       println(ReplicaVerificationTool.getCurrentTimeString + ": partition "

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
index 69b6ee8..a925ea8 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala
@@ -113,7 +113,7 @@ object SimpleConsumerPerformance {
         config.dateFormat.format(reportTime), config.fetchSize, totalMBRead, totalMBRead/elapsed,
         totalMessagesRead, totalMessagesRead/elapsed))
     }
-    System.exit(0)
+    Exit.exit(0)
   }
 
   class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 1ce0289..586fdf2 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -133,7 +133,7 @@ object SimpleConsumerShell extends Logging {
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
     if(topicsMetadata.size != 1 || !topicsMetadata.head.topic.equals(topic)) {
       System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
-      System.exit(1)
+      Exit.exit(1)
     }
 
     // validating partition id
@@ -141,7 +141,7 @@ object SimpleConsumerShell extends Logging {
     val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId)
     if (partitionMetadataOpt.isEmpty) {
       System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic))
-      System.exit(1)
+      Exit.exit(1)
     }
 
     // validating replica id and initializing target broker
@@ -151,7 +151,7 @@ object SimpleConsumerShell extends Logging {
       replicaOpt = partitionMetadataOpt.get.leader
       if (replicaOpt.isEmpty) {
         System.err.println("Error: user specifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(topic, partitionId))
-        System.exit(1)
+        Exit.exit(1)
       }
     }
     else {
@@ -159,7 +159,7 @@ object SimpleConsumerShell extends Logging {
       replicaOpt = replicasForPartition.find(r => r.id == replicaId)
       if(replicaOpt.isEmpty) {
         System.err.println("Error: replica %d does not exist for partition (%s, %d)".format(replicaId, topic, partitionId))
-        System.exit(1)
+        Exit.exit(1)
       }
     }
     fetchTargetBroker = replicaOpt.get
@@ -167,7 +167,7 @@ object SimpleConsumerShell extends Logging {
     // initializing starting offset
     if(startingOffset < OffsetRequest.EarliestTime) {
       System.err.println("Invalid starting offset: %d".format(startingOffset))
-      System.exit(1)
+      Exit.exit(1)
     }
     if (startingOffset < 0) {
       val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host,
@@ -180,7 +180,7 @@ object SimpleConsumerShell extends Logging {
       } catch {
         case t: Throwable =>
           System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t))
-          System.exit(1)
+          Exit.exit(1)
       } finally {
         if (simpleConsumer != null)
           simpleConsumer.close()
@@ -240,7 +240,7 @@ object SimpleConsumerShell extends Logging {
                 System.err.println("Unable to write to standard out, closing consumer.")
                 formatter.close()
                 simpleConsumer.close()
-                System.exit(1)
+                Exit.exit(1)
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index 1f148de..c8d6710 100755
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -18,11 +18,13 @@
 package kafka.tools
 
 import joptsimple._
+
 import scala.util.matching.Regex
 import collection.mutable
 import java.util.Date
 import java.text.SimpleDateFormat
-import kafka.utils.{CoreUtils, Logging, CommandLineUtils}
+
+import kafka.utils.{CommandLineUtils, CoreUtils, Exit, Logging}
 import kafka.common.Topic
 import java.io.{BufferedOutputStream, OutputStream}
 
@@ -92,12 +94,12 @@ object StateChangeLogMerger extends Logging {
     if ((!options.has(filesOpt) && !options.has(regexOpt)) || (options.has(filesOpt) && options.has(regexOpt))) {
       System.err.println("Provide arguments to exactly one of the two options \"" + filesOpt + "\" or \"" + regexOpt + "\"")
       parser.printHelpOn(System.err)
-      System.exit(1)
+      Exit.exit(1)
     }
     if (options.has(partitionsOpt) && !options.has(topicOpt)) {
       System.err.println("The option \"" + topicOpt + "\" needs to be provided an argument when specifying partition ids")
       parser.printHelpOn(System.err)
-      System.exit(1)
+      Exit.exit(1)
     }
 
     // Populate data structures.
@@ -118,7 +120,7 @@ object StateChangeLogMerger extends Logging {
       val duplicatePartitions = CoreUtils.duplicates(partitions)
       if (duplicatePartitions.nonEmpty) {
         System.err.println("The list of partitions contains repeated entries: %s".format(duplicatePartitions.mkString(",")))
-        System.exit(1)
+        Exit.exit(1)
       }
     }
     startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim)
@@ -193,4 +195,4 @@ object StateChangeLogMerger extends Logging {
     }
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 163ff12..0c9f26e 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.JaasUtils;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Exit;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -271,7 +272,7 @@ public class StreamsResetter {
     }
 
     public static void main(final String[] args) {
-        System.exit(new StreamsResetter().run(args));
+        Exit.exit(new StreamsResetter().run(args));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 8c6a8ba..c599d30 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -21,7 +21,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.common.{KafkaException, TopicAndPartition}
-import kafka.utils.{CoreUtils, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{CoreUtils, Exit, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.JaasUtils
@@ -86,6 +86,6 @@ object UpdateOffsetsInZK {
 
   private def usage() = {
     println("USAGE: " + UpdateOffsetsInZK.getClass.getName + " [earliest | latest] consumer.properties topic")
-    System.exit(1)
+    Exit.exit(1)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index a2b4705..0e5d518 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -19,8 +19,7 @@ package kafka.tools
 
 import joptsimple.OptionParser
 import org.apache.kafka.common.security._
-
-import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, CommandLineUtils}
+import kafka.utils.{CommandLineUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 
 object VerifyConsumerRebalance extends Logging {
   def main(args: Array[String]) {
@@ -39,7 +38,7 @@ object VerifyConsumerRebalance extends Logging {
 
     if (options.has("help")) {
       parser.printHelpOn(System.out)
-      System.exit(0)
+      Exit.exit(0)
     }
 
     CommandLineUtils.checkRequiredArgs(parser, options, groupOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/tools/ZooKeeperMainWrapper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ZooKeeperMainWrapper.scala b/core/src/main/scala/kafka/tools/ZooKeeperMainWrapper.scala
index 0cbe62c..14d2ceb 100644
--- a/core/src/main/scala/kafka/tools/ZooKeeperMainWrapper.scala
+++ b/core/src/main/scala/kafka/tools/ZooKeeperMainWrapper.scala
@@ -17,12 +17,13 @@
 
 package kafka.tools
 
+import kafka.utils.Exit
 import org.apache.zookeeper.ZooKeeperMain
 
 class ZooKeeperMainWrapper(args: Array[String]) extends ZooKeeperMain(args) {
   def runCmd(): Unit = {
     processCmd(this.cl)
-    System.exit(0)
+    Exit.exit(0)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/utils/CommandLineUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index edc3621..edf473e 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -25,21 +25,11 @@ import java.util.Properties
  */
 object CommandLineUtils extends Logging {
 
-   trait ExitPolicy {
-     def exit(msg: String): Nothing
-   }
-
-   val DEFAULT_EXIT_POLICY = new ExitPolicy {
-     override def exit(msg: String): Nothing = sys.exit(1)
-   }
-
-   private var exitPolicy = DEFAULT_EXIT_POLICY
-
   /**
    * Check that all the listed options are present
    */
   def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
-    for(arg <- required) {
+    for (arg <- required) {
       if(!options.has(arg))
         printUsageAndDie(parser, "Missing required argument \"" + arg + "\"")
     }
@@ -63,11 +53,9 @@ object CommandLineUtils extends Logging {
   def printUsageAndDie(parser: OptionParser, message: String): Nothing = {
     System.err.println(message)
     parser.printHelpOn(System.err)
-    exitPolicy.exit(message)
+    Exit.exit(1, Some(message))
   }
 
-  def exitPolicy(policy: ExitPolicy): Unit = this.exitPolicy = policy
-
   /**
    * Parse key-value pairs in the form key=value
    */
@@ -75,7 +63,7 @@ object CommandLineUtils extends Logging {
     val splits = args.map(_ split "=").filterNot(_.length == 0)
 
     val props = new Properties
-    for(a <- splits) {
+    for (a <- splits) {
       if (a.length == 1) {
         if (acceptMissingValue) props.put(a(0), "")
         else throw new IllegalArgumentException(s"Missing value for key ${a(0)}")
@@ -83,7 +71,7 @@ object CommandLineUtils extends Logging {
       else if (a.length == 2) props.put(a(0), a(1))
       else {
         System.err.println("Invalid command line properties: " + args.mkString(" "))
-        System.exit(1)
+        Exit.exit(1)
       }
     }
     props

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/utils/Exit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala
new file mode 100644
index 0000000..a4120b7
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Exit.scala
@@ -0,0 +1,53 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.utils
+
+import org.apache.kafka.common.utils.{Exit => JExit}
+
+/**
+  * Internal class that should be used instead of `Exit.exit()` and `Runtime.getRuntime().halt()` so that tests can
+  * easily change the behaviour.
+  */
+object Exit {
+
+  def exit(statusCode: Int, message: Option[String] = None): Nothing = {
+    JExit.exit(statusCode, message.orNull)
+    throw new AssertionError("exit should not return, but it did.")
+  }
+
+  def halt(statusCode: Int, message: Option[String] = None): Nothing = {
+    JExit.halt(statusCode, message.orNull)
+    throw new AssertionError("halt should not return, but it did.")
+  }
+
+  def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit =
+    JExit.setExitProcedure(functionToProcedure(exitProcedure))
+
+  def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit =
+    JExit.setExitProcedure(functionToProcedure(haltProcedure))
+
+  def resetExitProcedure(): Unit =
+    JExit.resetExitProcedure()
+
+  def resetHaltProcedure(): Unit =
+    JExit.resetHaltProcedure()
+
+  private def functionToProcedure(procedure: (Int, Option[String]) => Nothing) = new JExit.Procedure {
+    def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/main/scala/kafka/utils/ShutdownableThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index dc46797..29e3f51 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -20,6 +20,8 @@ package kafka.utils
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.CountDownLatch
 
+import org.apache.kafka.common.internals.FatalExitError
+
 abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)
         extends Thread(name) with Logging {
   this.setDaemon(false)
@@ -33,9 +35,8 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
   }
 
   def initiateShutdown(): Boolean = {
-    if(isRunning.compareAndSet(true, false)) {
+    if (isRunning.compareAndSet(true, false)) {
       info("Shutting down")
-      isRunning.set(false)
       if (isInterruptible)
         interrupt()
       true
@@ -57,17 +58,21 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
   def doWork(): Unit
 
   override def run(): Unit = {
-    info("Starting ")
-    try{
-      while(isRunning.get()){
+    info("Starting")
+    try {
+      while (isRunning.get)
         doWork()
-      }
-    } catch{
+    } catch {
+      case e: FatalExitError =>
+        isRunning.set(false)
+        shutdownLatch.countDown()
+        info("Stopped")
+        Exit.exit(e.statusCode())
       case e: Throwable =>
-        if(isRunning.get())
-          error("Error due to ", e)
+        if (isRunning.get())
+          error("Error due to", e)
     }
     shutdownLatch.countDown()
-    info("Stopped ")
+    info("Stopped")
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index 6381447..083dbf0 100644
--- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -10,7 +10,7 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-package integration.kafka.api
+package kafka.api
 
 import kafka.common.Topic
 import kafka.integration.KafkaServerTestHarness

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
new file mode 100644
index 0000000..3cab534
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -0,0 +1,145 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.admin.AdminUtils
+import kafka.cluster.BrokerEndPoint
+import kafka.server.ReplicaFetcherThread.{FetchRequest, PartitionData}
+import kafka.utils.{Exit, TestUtils, ZkUtils}
+import kafka.utils.TestUtils.createBrokerConfigs
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.FatalExitError
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.FetchResponse
+import org.apache.kafka.common.utils.Time
+import org.junit.{After, Test}
+
+import scala.collection.Map
+import scala.collection.JavaConverters._
+import scala.concurrent.Future
+
+class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
+
+  private var brokers: Seq[KafkaServer] = null
+  @volatile private var shutdownCompleted = false
+
+  @After
+  override def tearDown() {
+    Exit.resetExitProcedure()
+    brokers.foreach(_.shutdown())
+    super.tearDown()
+  }
+
+  /**
+    * Verifies that a follower shuts down if the offset for an `added partition` is out of range and if a fatal
+    * exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
+    * when the shutdown hook is invoked and hence this test.
+    */
+  @Test
+  def testFatalErrorInAddPartitions(): Unit = {
+
+    // Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before
+    // the metadata is propagated.
+    def createTopic(zkUtils: ZkUtils, topic: String): Unit = {
+      AdminUtils.createTopic(zkUtils, topic, partitions = 1, replicationFactor = 2)
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
+    }
+
+    val props = createBrokerConfigs(2, zkConnect)
+    brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
+      import params._
+      new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
+        override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
+        override def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]): Unit =
+          super.addPartitions(partitionAndOffsets.mapValues(_ => -1))
+      }
+    }))
+    createTopic(zkUtils, "topic")
+    TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
+  }
+
+  /**
+    * Verifies that a follower shuts down if the offset of a partition in the fetch response is out of range and if a
+    * fatal exception is thrown from `handleOffsetOutOfRange`. It's a bit tricky to ensure that there are no deadlocks
+    * when the shutdown hook is invoked and hence this test.
+    */
+  @Test
+  def testFatalErrorInProcessFetchRequest(): Unit = {
+    val props = createBrokerConfigs(2, zkConnect)
+    brokers = props.map(KafkaConfig.fromProps).map(config => createServer(config, { params =>
+      import params._
+      new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager, metrics, time, quotaManager) {
+        override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
+        override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
+          fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
+            (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE.code, -1, null)))
+          }
+        }
+      }
+    }))
+    TestUtils.createTopic(zkUtils, "topic", numPartitions = 1, replicationFactor = 2, servers = brokers)
+    TestUtils.waitUntilTrue(() => shutdownCompleted, "Shutdown of follower did not complete")
+  }
+
+  private case class FetcherThreadParams(threadName: String, fetcherId: Int, sourceBroker: BrokerEndPoint,
+                                         replicaManager: ReplicaManager, metrics: Metrics, time: Time,
+                                         quotaManager: ReplicationQuotaManager)
+
+  private def createServer(config: KafkaConfig, fetcherThread: FetcherThreadParams => ReplicaFetcherThread): KafkaServer = {
+    val time = Time.SYSTEM
+    val server = new KafkaServer(config, time) {
+
+      override def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
+        new ReplicaManager(config, metrics, time, zkUtils, kafkaScheduler, logManager, isShuttingDown,
+          quotaManagers.follower) {
+
+          override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String],
+                                                             quotaManager: ReplicationQuotaManager) =
+            new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager) {
+              override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
+                val prefix = threadNamePrefix.map(tp => s"${tp}:").getOrElse("")
+                val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
+                fetcherThread(new FetcherThreadParams(threadName, fetcherId, sourceBroker, replicaManager, metrics,
+                  time, quotaManager))
+              }
+            }
+        }
+      }
+
+    }
+
+    Exit.setExitProcedure { (_, _) =>
+      import scala.concurrent.ExecutionContext.Implicits._
+      // Run in a separate thread like shutdown hooks
+      Future {
+        server.shutdown()
+        shutdownCompleted = true
+      }
+      // Sleep until interrupted to emulate the fact that `System.exit()` never returns
+      Thread.sleep(Long.MaxValue)
+      throw new AssertionError
+    }
+    server.startup()
+    server
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
index cebfb04..0894b34 100644
--- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -25,7 +25,7 @@ import java.nio.file.Files
 import java.text.MessageFormat
 import java.util.{Locale, Properties, UUID}
 
-import kafka.utils.{CoreUtils, Logging}
+import kafka.utils.{CoreUtils, Exit, Logging}
 
 import scala.collection.JavaConverters._
 import org.apache.commons.io.IOUtils
@@ -355,7 +355,7 @@ object MiniKdc {
         start(workDir, config, keytabFile, principals)
       case _ =>
         println("Arguments: <WORKDIR> <MINIKDCPROPERTIES> <KEYTABFILE> [<PRINCIPALS>]+")
-        sys.exit(1)
+        Exit.exit(1)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/kafka/tools/TestLogCleaning.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/TestLogCleaning.scala b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
index d837885..5d889d5 100755
--- a/core/src/test/scala/kafka/tools/TestLogCleaning.scala
+++ b/core/src/test/scala/kafka/tools/TestLogCleaning.scala
@@ -100,7 +100,7 @@ object TestLogCleaning {
 
     if(options.has(dumpOpt)) {
       dumpLog(new File(options.valueOf(dumpOpt)))
-      System.exit(0)
+      Exit.exit(0)
     }
 
     CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
index cb49162..71a5091 100644
--- a/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
+++ b/core/src/test/scala/other/kafka/ReplicationQuotasTestRig.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.TopicPartition
 import kafka.server.{KafkaConfig, KafkaServer, QuotaType}
 import kafka.utils.TestUtils._
 import kafka.utils.ZkUtils._
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Exit, Logging, TestUtils, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.jfree.chart.plot.PlotOrientation
@@ -70,7 +70,7 @@ object ReplicationQuotasTestRig {
     experiments.foreach(run(_, journal, displayChartsOnScreen))
 
     if (!displayChartsOnScreen)
-      System.exit(0)
+      Exit.exit(0)
   }
 
   def run(config: ExperimentDef, journal: Journal, displayChartsOnScreen: Boolean) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/other/kafka/TestKafkaAppender.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestKafkaAppender.scala b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
index 72c7f28..ecdcac0 100644
--- a/core/src/test/scala/other/kafka/TestKafkaAppender.scala
+++ b/core/src/test/scala/other/kafka/TestKafkaAppender.scala
@@ -18,7 +18,7 @@
 package kafka
 
 import org.apache.log4j.PropertyConfigurator
-import kafka.utils.Logging
+import kafka.utils.{Exit, Logging}
 import serializer.Encoder
 
 object TestKafkaAppender extends Logging {
@@ -27,7 +27,7 @@ object TestKafkaAppender extends Logging {
     
     if(args.length < 1) {
       println("USAGE: " + TestKafkaAppender.getClass.getName + " log4j_config")
-      System.exit(1)
+      Exit.exit(1)
     }
 
     try {
@@ -36,7 +36,7 @@ object TestKafkaAppender extends Logging {
       case e: Exception =>
         System.err.println("KafkaAppender could not be initialized ! Exiting..")
         e.printStackTrace()
-        System.exit(1)
+        Exit.exit(1)
     }
 
     for (_ <- 1 to 10)

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index f0883ad..90236bb 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -121,7 +121,7 @@ object TestLinearWriteSpeed {
         writables(i) = new LogWritable(new File(dir, "kafka-test-" + i), new LogConfig(logProperties), scheduler, messageSet)
       } else {
         System.err.println("Must specify what to write to with one of --log, --channel, or --mmap") 
-        System.exit(1)
+        Exit.exit(1)
       }
     }
     bytesToWrite = (bytesToWrite / numFiles) * numFiles

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 9db2ffd..324b440 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -18,16 +18,19 @@
 package other.kafka
 
 import kafka.api._
-import kafka.utils.{ZkUtils, ShutdownableThread}
+import kafka.utils.{Exit, ShutdownableThread, ZkUtils}
 import org.apache.kafka.common.protocol.Errors
+
 import scala.collection._
 import kafka.client.ClientUtils
 import joptsimple.OptionParser
 import kafka.common.{OffsetAndMetadata, TopicAndPartition}
 import kafka.network.BlockingChannel
+
 import scala.util.Random
 import java.io.IOException
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
 import java.nio.channels.ClosedByInterruptException
@@ -237,7 +240,7 @@ object TestOffsetManager {
 
     if (options.has(helpOpt)) {
       parser.printHelpOn(System.out)
-      System.exit(0)
+      Exit.exit(0)
     }
 
     val commitIntervalMs = options.valueOf(commitIntervalOpt).intValue()


Mime
View raw message