kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: MINOR: Remove unused SecurityProtocol.TRACE
Date Thu, 14 Sep 2017 20:39:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2656659e0 -> dfd625daa


MINOR: Remove unused SecurityProtocol.TRACE

It adds complexity for no benefit since we don't use
it anywhere.

Also removed a few unused imports, variables and
default parameters.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Rajini Sivaram <rajinisivaram@googlemail.com>,
Jason Gustafson <jason@confluent.io>

Closes #3856 from ijuma/remove-security-protocol-trace


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dfd625da
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dfd625da
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dfd625da

Branch: refs/heads/trunk
Commit: dfd625daa36de2e34e6c596967775394c55bc605
Parents: 2656659
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Sep 14 13:14:06 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Sep 14 13:39:30 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientUtils.java   |  2 -
 .../kafka/clients/CommonClientConfigs.java      | 13 +------
 .../kafka/common/network/ChannelBuilders.java   |  1 -
 .../kafka/common/protocol/SecurityProtocol.java | 39 ++++----------------
 .../scala/kafka/consumer/BaseConsumer.scala     |  1 -
 .../TransactionMarkerChannelManager.scala       |  2 +-
 ...nsactionMarkerRequestCompletionHandler.scala |  2 +-
 .../consumer/ZookeeperConsumerConnector.scala   |  1 -
 .../main/scala/kafka/server/KafkaConfig.scala   |  6 +--
 .../unit/kafka/network/SocketServerTest.scala   | 23 ++----------
 10 files changed, 17 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 7d19ea4..4612322 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -82,8 +82,6 @@ public class ClientUtils {
      */
     public static ChannelBuilder createChannelBuilder(AbstractConfig config) {
         SecurityProtocol securityProtocol = SecurityProtocol.forName(config.getString(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
-        if (!SecurityProtocol.nonTestingValues().contains(securityProtocol))
-            throw new ConfigException("Invalid SecurityProtocol " + securityProtocol);
         String clientSaslMechanism = config.getString(SaslConfigs.SASL_MECHANISM);
         return ChannelBuilders.clientChannelBuilder(securityProtocol, JaasContext.Type.CLIENT,
config, null,
                 clientSaslMechanism, true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index f51c36a..380564b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -22,9 +22,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -77,7 +75,8 @@ public class CommonClientConfigs {
     public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as
metrics reporters. Implementing the <code>MetricReporter</code> interface allows
plugging in classes that will be notified of new metric creation. The JmxReporter is always
included to register JMX statistics.";
 
     public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
-    public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with
brokers. Valid values are: " + Utils.join(nonTestingSecurityProtocolNames(), ", ") + ".";
+    public static final String SECURITY_PROTOCOL_DOC = "Protocol used to communicate with
brokers. Valid values are: " +
+        Utils.join(SecurityProtocol.names(), ", ") + ".";
     public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
 
     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
@@ -89,14 +88,6 @@ public class CommonClientConfigs {
                                                          + "elapses the client will resend
the request if necessary or fail the request if "
                                                          + "retries are exhausted.";
 
-    private static List<String> nonTestingSecurityProtocolNames() {
-        List<String> names = new ArrayList<>();
-        for (SecurityProtocol protocol : SecurityProtocol.nonTestingValues())
-            names.add(protocol.name);
-        return names;
-    }
-
-
     /**
      * Postprocess the configuration so that exponential backoff is disabled when reconnect
backoff
      * is explicitly configured but the maximum reconnect backoff is not explicitly configured.

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 6dd3ddd..785c671 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -107,7 +107,6 @@ public class ChannelBuilders {
                         clientSaslMechanism, saslHandshakeRequestEnable, credentialCache);
                 break;
             case PLAINTEXT:
-            case TRACE:
                 channelBuilder = new PlaintextChannelBuilder();
                 break;
             default:

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
index 99d3b3d..c155481 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -18,43 +18,34 @@ package org.apache.kafka.common.protocol;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
 
 public enum SecurityProtocol {
     /** Un-authenticated, non-encrypted channel */
-    PLAINTEXT(0, "PLAINTEXT", false),
+    PLAINTEXT(0, "PLAINTEXT"),
     /** SSL channel */
-    SSL(1, "SSL", false),
+    SSL(1, "SSL"),
     /** SASL authenticated, non-encrypted channel */
-    SASL_PLAINTEXT(2, "SASL_PLAINTEXT", false),
+    SASL_PLAINTEXT(2, "SASL_PLAINTEXT"),
     /** SASL authenticated, SSL channel */
-    SASL_SSL(3, "SASL_SSL", false),
-    /** Currently identical to PLAINTEXT and used for testing only. We may implement extra
instrumentation when testing channel code. */
-    TRACE(Short.MAX_VALUE, "TRACE", true);
+    SASL_SSL(3, "SASL_SSL");
 
     private static final Map<Short, SecurityProtocol> CODE_TO_SECURITY_PROTOCOL;
     private static final List<String> NAMES;
-    private static final Set<SecurityProtocol> NON_TESTING_VALUES;
 
     static {
         SecurityProtocol[] protocols = SecurityProtocol.values();
         List<String> names = new ArrayList<>(protocols.length);
         Map<Short, SecurityProtocol> codeToSecurityProtocol = new HashMap<>(protocols.length);
-        Set<SecurityProtocol> nonTestingValues = EnumSet.noneOf(SecurityProtocol.class);
         for (SecurityProtocol proto : protocols) {
             codeToSecurityProtocol.put(proto.id, proto);
             names.add(proto.name);
-            if (!proto.isTesting)
-                nonTestingValues.add(proto);
         }
         CODE_TO_SECURITY_PROTOCOL = Collections.unmodifiableMap(codeToSecurityProtocol);
         NAMES = Collections.unmodifiableList(names);
-        NON_TESTING_VALUES = Collections.unmodifiableSet(nonTestingValues);
     }
 
     /** The permanent and immutable id of a security protocol -- this can't change, and must
match kafka.cluster.SecurityProtocol  */
@@ -63,24 +54,16 @@ public enum SecurityProtocol {
     /** Name of the security protocol. This may be used by client configuration. */
     public final String name;
 
-    /* Whether this security protocol is for testing/debugging */
-    private final boolean isTesting;
-
-    SecurityProtocol(int id, String name, boolean isTesting) {
+    SecurityProtocol(int id, String name) {
         this.id = (short) id;
         this.name = name;
-        this.isTesting = isTesting;
-    }
-
-    public static String getName(int id) {
-        return CODE_TO_SECURITY_PROTOCOL.get((short) id).name;
     }
 
-    public static List<String> getNames() {
+    public static List<String> names() {
         return NAMES;
     }
 
-    public static SecurityProtocol forId(Short id) {
+    public static SecurityProtocol forId(short id) {
         return CODE_TO_SECURITY_PROTOCOL.get(id);
     }
 
@@ -89,12 +72,4 @@ public enum SecurityProtocol {
         return SecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT));
     }
 
-    /**
-     * Returns the set of non-testing SecurityProtocol instances, that is, SecurityProtocol
instances that are suitable
-     * for production usage.
-     */
-    public static Set<SecurityProtocol> nonTestingValues() {
-        return NON_TESTING_VALUES;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index 2c53258..04ac2d9 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -23,7 +23,6 @@ import java.util.regex.Pattern
 import kafka.api.OffsetRequest
 import kafka.common.StreamEndException
 import kafka.message.Message
-import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.header.Headers

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 2e666ed..6c13de4 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -35,7 +35,7 @@ import java.util
 import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQueue}
 
 import collection.JavaConverters._
-import scala.collection.{concurrent, immutable, mutable}
+import scala.collection.{concurrent, immutable}
 
 object TransactionMarkerChannelManager {
   def apply(config: KafkaConfig,

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 150b444..bfa25be 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.transaction
 import kafka.utils.Logging
 import org.apache.kafka.clients.{ClientResponse, RequestCompletionHandler}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.WriteTxnMarkersResponse
 
 import scala.collection.mutable

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 467d0a6..d646938 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -19,7 +19,6 @@ package kafka.javaapi.consumer
 import kafka.serializer._
 import kafka.consumer._
 import kafka.common.{OffsetAndMetadata, TopicAndPartition, MessageStreamsExistException}
-import scala.collection.mutable
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.JavaConverters._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 33eaf48..ea0c124 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -571,8 +571,8 @@ object KafkaConfig {
   val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance
check is triggered by the controller"
   val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR
set to be elected as leader as a last resort, even though doing so may result in data loss"
   val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers.
Valid values are: " +
-    s"${SecurityProtocol.nonTestingValues.asScala.toSeq.map(_.name).mkString(", ")}. It is
an error to set this and " +
-    s"$InterBrokerListenerNameProp properties at the same time."
+    s"${SecurityProtocol.names.asScala.mkString(", ")}. It is an error to set this and $InterBrokerListenerNameProp
" +
+    "properties at the same time."
   val InterBrokerProtocolVersionDoc = "Specify which version of the inter-broker protocol
will be used.\n" +
   " This is typically bumped after all brokers were upgraded to a new version.\n" +
   " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0,
0.9.0.1 Check ApiVersion for the full list."
@@ -1151,7 +1151,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends
Abstra
   private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol
= {
     try SecurityProtocol.forName(protocolName)
     catch {
-      case e: IllegalArgumentException =>
+      case _: IllegalArgumentException =>
         throw new ConfigException(s"Invalid security protocol `$protocolName` defined in
$configName")
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dfd625da/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 2df37b7..d623374 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -50,7 +50,7 @@ import scala.util.control.ControlThrowable
 
 class SocketServerTest extends JUnitSuite {
   val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
-  props.put("listeners", "PLAINTEXT://localhost:0,TRACE://localhost:0")
+  props.put("listeners", "PLAINTEXT://localhost:0")
   props.put("num.network.threads", "1")
   props.put("socket.send.buffer.bytes", "300000")
   props.put("socket.receive.buffer.bytes", "300000")
@@ -161,18 +161,12 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def simpleRequest() {
     val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
-    val traceSocket = connect(protocol = SecurityProtocol.TRACE)
     val serializedBytes = producerRequestBytes
 
     // Test PLAINTEXT socket
     sendRequest(plainSocket, serializedBytes)
     processRequest(server.requestChannel)
     assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq)
-
-    // Test TRACE socket
-    sendRequest(traceSocket, serializedBytes)
-    processRequest(server.requestChannel)
-    assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq)
   }
 
   @Test
@@ -377,12 +371,9 @@ class SocketServerTest extends JUnitSuite {
     // open a connection
     val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
     plainSocket.setTcpNoDelay(true)
-    val traceSocket = connect(protocol = SecurityProtocol.TRACE)
-    traceSocket.setTcpNoDelay(true)
     val bytes = new Array[Byte](40)
     // send a request first to make sure the connection has been picked up by the socket
server
     sendRequest(plainSocket, bytes, Some(0))
-    sendRequest(traceSocket, bytes, Some(0))
     processRequest(server.requestChannel)
     // the following sleep is necessary to reliably detect the connection close when we send
data below
     Thread.sleep(200L)
@@ -400,13 +391,6 @@ class SocketServerTest extends JUnitSuite {
     } catch {
       case _: IOException => // expected
     }
-
-    try {
-      sendRequest(traceSocket, largeChunkOfBytes, Some(0))
-      fail("expected exception when writing to closed trace socket")
-    } catch {
-      case _: IOException => // expected
-    }
   }
 
   @Test
@@ -841,7 +825,7 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def controlThrowable(): Unit = {
     withTestableServer { testableServer =>
-      val (socket, _) = connectAndProcessRequest(testableServer)
+      connectAndProcessRequest(testableServer)
       val testableSelector = testableServer.testableSelector
 
       testableSelector.operationCounts.clear()
@@ -990,8 +974,7 @@ class SocketServerTest extends JUnitSuite {
         exception.getOrElse(new IllegalStateException(s"Test exception during $operation"))
     }
 
-    private def onOperation(operation: SelectorOperation,
-        connectionId: Option[String] = None, onFailure: => Unit = {}): Unit = {
+    private def onOperation(operation: SelectorOperation, connectionId: Option[String], onFailure:
=> Unit): Unit = {
       operationCounts(operation) += 1
       failures.remove(operation).foreach { e =>
         connectionId.foreach(allFailedChannels.add)


Mime
View raw message