kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: KAFKA-4039; Fix deadlock during shutdown due to log truncation not allowed
Date Thu, 02 Feb 2017 22:24:18 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 76550dd89 -> cb674e548


http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index 21c13b6..c934c4a 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -17,42 +17,22 @@
 package kafka
 
 import java.io.{File, FileOutputStream}
-import java.security.Permission
 import java.util
 
 import kafka.server.KafkaConfig
+import kafka.utils.Exit
 import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.internals.FatalExitError
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
 class KafkaTest {
 
-  val originalSecurityManager: SecurityManager = System.getSecurityManager
-
-  class ExitCalled extends SecurityException {
-  }
-
-  private class NoExitSecurityManager extends SecurityManager {
-    override def checkExit(status: Int): Unit = {
-      throw new ExitCalled
-    }
-
-    override def checkPermission(perm : Permission): Unit = {
-    }
-
-    override def checkPermission(perm : Permission, context: Object): Unit = {
-    }
-  }
-
   @Before
-  def setSecurityManager() : Unit = {
-    System.setSecurityManager(new NoExitSecurityManager)
-  }
+  def setUp(): Unit = Exit.setExitProcedure((status, _) => throw new FatalExitError(status))
 
   @After
-  def setOriginalSecurityManager() : Unit = {
-    System.setSecurityManager(originalSecurityManager)
-  }
+  def tearDown(): Unit = Exit.resetExitProcedure()
 
   @Test
   def testGetKafkaConfigFromArgs(): Unit = {
@@ -77,25 +57,25 @@ class KafkaTest {
     assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy)
   }
 
-  @Test(expected = classOf[ExitCalled])
+  @Test(expected = classOf[FatalExitError])
   def testGetKafkaConfigFromArgsWrongSetValue(): Unit = {
     val propertiesFile = prepareDefaultConfig()
     KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "a=b=c")))
   }
 
-  @Test(expected = classOf[ExitCalled])
+  @Test(expected = classOf[FatalExitError])
   def testGetKafkaConfigFromArgsNonArgsAtTheEnd(): Unit = {
     val propertiesFile = prepareDefaultConfig()
     KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "broker.id=1",
"broker.id=2")))
   }
 
-  @Test(expected = classOf[ExitCalled])
+  @Test(expected = classOf[FatalExitError])
   def testGetKafkaConfigFromArgsNonArgsOnly(): Unit = {
     val propertiesFile = prepareDefaultConfig()
     KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "broker.id=2")))
   }
 
-  @Test(expected = classOf[ExitCalled])
+  @Test(expected = classOf[FatalExitError])
   def testGetKafkaConfigFromArgsNonArgsAtTheBegging(): Unit = {
     val propertiesFile = prepareDefaultConfig()
     KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override",
"broker.id=2")))

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
index 1685130..3a40cf8 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
@@ -16,8 +16,7 @@
   */
 package kafka.admin
 
-import kafka.utils.CommandLineUtils
-import kafka.utils.CommandLineUtils.ExitPolicy
+import kafka.utils.Exit
 import org.junit.Assert.assertTrue
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -26,14 +25,12 @@ class ReassignPartitionsCommandArgsTest extends JUnitSuite {
 
   @Before
   def setUp() {
-    CommandLineUtils.exitPolicy(new ExitPolicy {
-      override def exit(msg: String): Nothing = throw new IllegalArgumentException(msg)
-    })
+    Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
   }
 
   @After
   def tearDown() {
-    CommandLineUtils.exitPolicy(CommandLineUtils.DEFAULT_EXIT_POLICY)
+    Exit.resetExitProcedure()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
index b079f25..4057562 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
@@ -18,6 +18,8 @@
 package kafka.log
 
 import java.nio._
+
+import kafka.utils.Exit
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Assert._
@@ -73,7 +75,7 @@ object OffsetMapTest {
   def main(args: Array[String]) {
     if(args.length != 2) {
       System.err.println("USAGE: java OffsetMapTest size load")
-      System.exit(1)
+      Exit.exit(1)
     }
     val test = new OffsetMapTest()
     val size = args(0).toInt

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
index e9adca8..981ed6a 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
@@ -15,7 +15,7 @@
   * limitations under the License.
   */
 
-package unit.kafka.server
+package kafka.server
 
 import org.apache.kafka.common.requests.ApiVersionsResponse
 import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, Protocol}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
index a8f9fc6..e436b8d 100644
--- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala
@@ -24,7 +24,6 @@ import org.junit.{After, Before, Test}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils
 import TestUtils._
-import kafka.common._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.serialization.StringSerializer

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
new file mode 100644
index 0000000..31f7efb
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/ShutdownableThreadTest.scala
@@ -0,0 +1,54 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.utils
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import org.apache.kafka.common.internals.FatalExitError
+import org.junit.{After, Test}
+import org.junit.Assert.{assertEquals, assertTrue}
+
+class ShutdownableThreadTest {
+
+  @After
+  def tearDown(): Unit = Exit.resetExitProcedure()
+
+  @Test
+  def testShutdownWhenCalledAfterThreadStart(): Unit = {
+    @volatile var statusCodeOption: Option[Int] = None
+    Exit.setExitProcedure { (statusCode, _) =>
+      statusCodeOption = Some(statusCode)
+      // Sleep until interrupted to emulate the fact that `System.exit()` never returns
+      Thread.sleep(Long.MaxValue)
+      throw new AssertionError
+    }
+    val latch = new CountDownLatch(1)
+    val thread = new ShutdownableThread("shutdownable-thread-test") {
+      override def doWork: Unit = {
+        latch.countDown()
+        throw new FatalExitError
+      }
+    }
+    thread.start()
+    assertTrue("doWork was not invoked", latch.await(10, TimeUnit.SECONDS))
+
+    thread.shutdown()
+    TestUtils.waitUntilTrue(() => statusCodeOption.isDefined, "Status code was not set
by exit procedure")
+    assertEquals(1, statusCodeOption.get)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 98daec1..b0f8a43 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -247,7 +247,7 @@ object TestUtils extends Logging {
     // create topic
     AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, topicConfig)
     // wait until the update metadata request for new topic reaches all servers
-    (0 until numPartitions).map { case i =>
+    (0 until numPartitions).map { i =>
       TestUtils.waitUntilMetadataIsPropagated(servers, topic, i)
       i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, i)
     }.toMap
@@ -876,7 +876,7 @@ object TestUtils extends Logging {
       "Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned))
     TestUtils.waitUntilTrue(() => {
         val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head
-        leaderBroker.replicaManager.underReplicatedPartitionCount() == 0
+        leaderBroker.replicaManager.underReplicatedPartitionCount == 0
       },
       "Reassigned partition [%s,%d] is under-replicated as reported by the leader %d".format(topic,
partitionToBeReassigned, leader.get))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
index e06e034..5017cde 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -56,7 +57,7 @@ public class ShutdownDeadlockTest {
         streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(final Thread t, final Throwable e) {
-                System.exit(-1);
+                Exit.exit(1);
             }
         });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 6f29e00..0d905fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 
@@ -169,7 +170,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
                     public void onCompletion(final RecordMetadata metadata, final Exception
exception) {
                         if (exception != null) {
                             exception.printStackTrace();
-                            System.exit(-1);
+                            Exit.exit(1);
                         }
                     }
                 });

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index 6ed97bb..0430c2e 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -127,10 +128,10 @@ public class ClientCompatibilityTest {
         } catch (ArgumentParserException e) {
             if (args.length == 0) {
                 parser.printHelp();
-                System.exit(0);
+                Exit.exit(0);
             } else {
                 parser.handleError(e);
-                System.exit(1);
+                Exit.exit(1);
             }
         }
         TestConfig testConfig = new TestConfig(res);
@@ -140,10 +141,10 @@ public class ClientCompatibilityTest {
         } catch (Throwable t) {
             System.out.printf("FAILED: Caught exception %s\n\n", t.getMessage());
             t.printStackTrace();
-            System.exit(1);
+            Exit.exit(1);
         }
         System.out.println("SUCCESS.");
-        System.exit(0);
+        Exit.exit(0);
     }
 
     private static String toHexString(byte[] buf) {
@@ -345,7 +346,7 @@ public class ClientCompatibilityTest {
             } catch (RuntimeException e) {
                 System.out.println("The second message in this topic was not ours. Please
use a new " +
                     "topic when running this program.");
-                System.exit(1);
+                Exit.exit(1);
             }
         } catch (RecordTooLargeException e) {
             log.debug("Got RecordTooLargeException", e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index c277b83..4117090 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -36,6 +36,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 
 public class ProducerPerformance {
@@ -129,10 +130,10 @@ public class ProducerPerformance {
         } catch (ArgumentParserException e) {
             if (args.length == 0) {
                 parser.printHelp();
-                System.exit(0);
+                Exit.exit(0);
             } else {
                 parser.handleError(e);
-                System.exit(1);
+                Exit.exit(1);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index cd17217..0ffe26f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -40,6 +40,7 @@ import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
 
 import java.io.Closeable;
@@ -602,7 +603,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback,
Cons
         ArgumentParser parser = argParser();
         if (args.length == 0) {
             parser.printHelp();
-            System.exit(0);
+            Exit.exit(0);
         }
 
         try {
@@ -617,7 +618,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback,
Cons
             consumer.run();
         } catch (ArgumentParserException e) {
             parser.handleError(e);
-            System.exit(1);
+            Exit.exit(1);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
index faa9698..14d1642 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
@@ -22,6 +22,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 
@@ -217,10 +218,10 @@ public class VerifiableLog4jAppender {
         } catch (ArgumentParserException e) {
             if (args.length == 0) {
                 parser.printHelp();
-                System.exit(0);
+                Exit.exit(0);
             } else {
                 parser.handleError(e);
-                System.exit(1);
+                Exit.exit(1);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb674e54/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index e81eb8f..7617689 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -41,6 +41,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.Exit;
 
 /**
  * Primarily intended for use with system testing, this producer prints metadata
@@ -207,10 +208,10 @@ public class VerifiableProducer {
         } catch (ArgumentParserException e) {
             if (args.length == 0) {
                 parser.printHelp();
-                System.exit(0);
+                Exit.exit(0);
             } else {
                 parser.handleError(e);
-                System.exit(1);
+                Exit.exit(1);
             }
         }
 


Mime
View raw message