From commits-return-7823-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Thu Sep 14 20:39:54 2017 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5D1E01A2EC for ; Thu, 14 Sep 2017 20:39:54 +0000 (UTC) Received: (qmail 46672 invoked by uid 500); 14 Sep 2017 20:39:54 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 46633 invoked by uid 500); 14 Sep 2017 20:39:53 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 46624 invoked by uid 99); 14 Sep 2017 20:39:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Sep 2017 20:39:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 04367F554E; Thu, 14 Sep 2017 20:39:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jgus@apache.org To: commits@kafka.apache.org Message-Id: <2219731f120a41c5abf2ac48f24a6745@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Remove unused SecurityProtocol.TRACE Date: Thu, 14 Sep 2017 20:39:51 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk 2656659e0 -> dfd625daa MINOR: Remove unused SecurityProtocol.TRACE It adds complexity for no benefit since we don't use it anywhere. Also removed a few unused imports, variables and default parameters. Author: Ismael Juma Reviewers: Manikumar Reddy , Rajini Sivaram , Jason Gustafson Closes #3856 from ijuma/remove-security-protocol-trace Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dfd625da Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dfd625da Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dfd625da Branch: refs/heads/trunk Commit: dfd625daa36de2e34e6c596967775394c55bc605 Parents: 2656659 Author: Ismael Juma Authored: Thu Sep 14 13:14:06 2017 -0700 Committer: Jason Gustafson Committed: Thu Sep 14 13:39:30 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/ClientUtils.java | 2 - .../kafka/clients/CommonClientConfigs.java | 13 +------ .../kafka/common/network/ChannelBuilders.java | 1 - .../kafka/common/protocol/SecurityProtocol.java | 39 ++++---------------- .../scala/kafka/consumer/BaseConsumer.scala | 1 - .../TransactionMarkerChannelManager.scala | 2 +- ...nsactionMarkerRequestCompletionHandler.scala | 2 +- .../consumer/ZookeeperConsumerConnector.scala | 1 - .../main/scala/kafka/server/KafkaConfig.scala | 6 +-- .../unit/kafka/network/SocketServerTest.scala | 23 ++---------- 10 files changed, 17 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 7d19ea4..4612322 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -82,8 +82,6 @@ public class ClientUtils { */ public static ChannelBuilder createChannelBuilder(AbstractConfig config) { SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); - if (!SecurityProtocol.nonTestingValues().contains(securityProtocol)) - throw new ConfigException("Invalid SecurityProtocol " + securityProtocol); String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM); return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT, config, null, clientSaslMechanism, true); http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index f51c36a..380564b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -22,9 +22,7 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -77,7 +75,8 @@ public class CommonClientConfigs { public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics."; public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol"; - public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + Utils.join(nonTestingSecurityProtocolNames(), ", ") + "."; + public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with brokers. Valid values are: " + + Utils.join(SecurityProtocol.names(), ", ") + "."; public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms"; @@ -89,14 +88,6 @@ public class CommonClientConfigs { + "elapses the client will resend the request if necessary or fail the request if " + "retries are exhausted."; - private static List nonTestingSecurityProtocolNames() { - List names = new ArrayList<>(); - for (SecurityProtocol protocol : SecurityProtocol.nonTestingValues()) - names.add(protocol.name); - return names; - } - - /** * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff * is explicitly configured but the maximum reconnect backoff is not explicitly configured. http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 6dd3ddd..785c671 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -107,7 +107,6 @@ public class ChannelBuilders { clientSaslMechanism, saslHandshakeRequestEnable, credentialCache); break; case PLAINTEXT: - case TRACE: channelBuilder = new PlaintextChannelBuilder(); break; default: http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java index 99d3b3d..c155481 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java @@ -18,43 +18,34 @@ package org.apache.kafka.common.protocol; import java.util.ArrayList; import java.util.Collections; -import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Set; public enum SecurityProtocol { /** Un-authenticated, non-encrypted channel */ - PLAINTEXT(0, "PLAINTEXT", false), + PLAINTEXT(0, "PLAINTEXT"), /** SSL channel */ - SSL(1, "SSL", false), + SSL(1, "SSL"), /** SASL authenticated, non-encrypted channel */ - SASL_PLAINTEXT(2, "SASL_PLAINTEXT", false), + SASL_PLAINTEXT(2, "SASL_PLAINTEXT"), /** SASL authenticated, SSL channel */ - SASL_SSL(3, "SASL_SSL", false), - /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */ - TRACE(Short.MAX_VALUE, "TRACE", true); + SASL_SSL(3, "SASL_SSL"); private static final Map CODE_TO_SECURITY_PROTOCOL; private static final List NAMES; - private static final Set NON_TESTING_VALUES; static { SecurityProtocol[] protocols = SecurityProtocol.values(); List names = new ArrayList<>(protocols.length); Map codeToSecurityProtocol = new HashMap<>(protocols.length); - Set nonTestingValues = EnumSet.noneOf(SecurityProtocol.class); for (SecurityProtocol proto : protocols) { codeToSecurityProtocol.put(proto.id, proto); names.add(proto.name); - if (!proto.isTesting) - nonTestingValues.add(proto); } CODE_TO_SECURITY_PROTOCOL = Collections.unmodifiableMap(codeToSecurityProtocol); NAMES = Collections.unmodifiableList(names); - NON_TESTING_VALUES = Collections.unmodifiableSet(nonTestingValues); } /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */ @@ -63,24 +54,16 @@ public enum SecurityProtocol { /** Name of the security protocol. This may be used by client configuration. */ public final String name; - /* Whether this security protocol is for testing/debugging */ - private final boolean isTesting; - - SecurityProtocol(int id, String name, boolean isTesting) { + SecurityProtocol(int id, String name) { this.id = (short) id; this.name = name; - this.isTesting = isTesting; - } - - public static String getName(int id) { - return CODE_TO_SECURITY_PROTOCOL.get((short) id).name; } - public static List getNames() { + public static List names() { return NAMES; } - public static SecurityProtocol forId(Short id) { + public static SecurityProtocol forId(short id) { return CODE_TO_SECURITY_PROTOCOL.get(id); } @@ -89,12 +72,4 @@ public enum SecurityProtocol { return SecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT)); } - /** - * Returns the set of non-testing SecurityProtocol instances, that is, SecurityProtocol instances that are suitable - * for production usage. - */ - public static Set nonTestingValues() { - return NON_TESTING_VALUES; - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/consumer/BaseConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala index 2c53258..04ac2d9 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -23,7 +23,6 @@ import java.util.regex.Pattern import kafka.api.OffsetRequest import kafka.common.StreamEndException import kafka.message.Message -import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.header.Headers http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 2e666ed..6c13de4 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -35,7 +35,7 @@ import java.util import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue} import collection.JavaConverters._ -import scala.collection.{concurrent, immutable, mutable} +import scala.collection.{concurrent, immutable} object TransactionMarkerChannelManager { def apply(config: KafkaConfig, http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala index 150b444..bfa25be 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala @@ -20,7 +20,7 @@ package kafka.coordinator.transaction import kafka.utils.Logging import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.WriteTxnMarkersResponse import scala.collection.mutable http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index 467d0a6..d646938 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -19,7 +19,6 @@ package kafka.javaapi.consumer import kafka.serializer._ import kafka.consumer._ import kafka.common.{OffsetAndMetadata, TopicAndPartition, MessageStreamsExistException} -import scala.collection.mutable import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 33eaf48..ea0c124 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -571,8 +571,8 @@ object KafkaConfig { val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller" val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss" val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Valid values are: " + - s"${SecurityProtocol.nonTestingValues.asScala.toSeq.map(_.name).mkString(", ")}. It is an error to set this and " + - s"$InterBrokerListenerNameProp properties at the same time." + s"${SecurityProtocol.names.asScala.mkString(", ")}. It is an error to set this and $InterBrokerListenerNameProp " + + "properties at the same time." val InterBrokerProtocolVersionDoc = "Specify which version of the inter-broker protocol will be used.\n" + " This is typically bumped after all brokers were upgraded to a new version.\n" + " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list." @@ -1151,7 +1151,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = { try SecurityProtocol.forName(protocolName) catch { - case e: IllegalArgumentException => + case _: IllegalArgumentException => throw new ConfigException(s"Invalid security protocol `$protocolName` defined in $configName") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 2df37b7..d623374 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -50,7 +50,7 @@ import scala.util.control.ControlThrowable class SocketServerTest extends JUnitSuite { val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) - props.put("listeners", "PLAINTEXT://localhost:0,TRACE://localhost:0") + props.put("listeners", "PLAINTEXT://localhost:0") props.put("num.network.threads", "1") props.put("socket.send.buffer.bytes", "300000") props.put("socket.receive.buffer.bytes", "300000") @@ -161,18 +161,12 @@ class SocketServerTest extends JUnitSuite { @Test def simpleRequest() { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) - val traceSocket = connect(protocol = SecurityProtocol.TRACE) val serializedBytes = producerRequestBytes // Test PLAINTEXT socket sendRequest(plainSocket, serializedBytes) processRequest(server.requestChannel) assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq) - - // Test TRACE socket - sendRequest(traceSocket, serializedBytes) - processRequest(server.requestChannel) - assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq) } @Test @@ -377,12 +371,9 @@ class SocketServerTest extends JUnitSuite { // open a connection val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) plainSocket.setTcpNoDelay(true) - val traceSocket = connect(protocol = SecurityProtocol.TRACE) - traceSocket.setTcpNoDelay(true) val bytes = new Array[Byte](40) // send a request first to make sure the connection has been picked up by the socket server sendRequest(plainSocket, bytes, Some(0)) - sendRequest(traceSocket, bytes, Some(0)) processRequest(server.requestChannel) // the following sleep is necessary to reliably detect the connection close when we send data below Thread.sleep(200L) @@ -400,13 +391,6 @@ class SocketServerTest extends JUnitSuite { } catch { case _: IOException => // expected } - - try { - sendRequest(traceSocket, largeChunkOfBytes, Some(0)) - fail("expected exception when writing to closed trace socket") - } catch { - case _: IOException => // expected - } } @Test @@ -841,7 +825,7 @@ class SocketServerTest extends JUnitSuite { @Test def controlThrowable(): Unit = { withTestableServer { testableServer => - val (socket, _) = connectAndProcessRequest(testableServer) + connectAndProcessRequest(testableServer) val testableSelector = testableServer.testableSelector testableSelector.operationCounts.clear() @@ -990,8 +974,7 @@ class SocketServerTest extends JUnitSuite { exception.getOrElse(new IllegalStateException(s"Test exception during $operation")) } - private def onOperation(operation: SelectorOperation, - connectionId: Option[String] = None, onFailure: => Unit = {}): Unit = { + private def onOperation(operation: SelectorOperation, connectionId: Option[String], onFailure: => Unit): Unit = { operationCounts(operation) += 1 failures.remove(operation).foreach { e => connectionId.foreach(allFailedChannels.add)