kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove usages of JavaConversions and fix some typos (#5115)
Date Sun, 03 Jun 2018 06:46:15 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0cacbcf  MINOR: Remove usages of JavaConversions and fix some typos (#5115)
0cacbcf is described below

commit 0cacbcf30e0a90ab9fad7bc310e5477cf959f1fd
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
AuthorDate: Sat Jun 2 23:45:44 2018 -0700

    MINOR: Remove usages of JavaConversions and fix some typos (#5115)
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../src/main/scala/kafka/controller/KafkaController.scala |  2 +-
 .../kafka/coordinator/group/GroupMetadataManager.scala    |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala          |  2 +-
 core/src/main/scala/kafka/tools/ConsoleConsumer.scala     | 15 +++++++--------
 core/src/test/scala/unit/kafka/log/LogTest.scala          |  2 +-
 .../unit/kafka/server/AddPartitionsToTxnRequestTest.scala |  4 ++--
 docs/streams/upgrade-guide.html                           |  2 +-
 docs/upgrade.html                                         |  2 +-
 .../main/java/org/apache/kafka/streams/KafkaStreams.java  |  2 +-
 .../streams/integration/InternalTopicIntegrationTest.java |  4 +++-
 .../streams/integration/utils/EmbeddedKafkaCluster.java   |  9 +++++----
 .../kafka/streams/integration/utils/KafkaEmbedded.java    | 13 +++++++------
 12 files changed, 31 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a1d14e6..9c33874 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -181,7 +181,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient,
time: Ti
   }
 
   /**
-   * On controlled shutdown shutdown, the controller first determines the partitions that
the
+   * On controlled shutdown, the controller first determines the partitions that the
    * shutting down broker leads, and moves leadership of those partitions to another broker
    * that is in that partition's ISR.
    *
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 2787251..35a0574 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -1212,7 +1212,7 @@ object GroupMetadataManager {
   }
 
   /**
-   * Decodes the group metadata messages' payload and retrieves its member metadatafrom it
+   * Decodes the group metadata messages' payload and retrieves its member metadata from
it
    *
    * @param buffer input byte-buffer
    * @return a group metadata object from the message
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dcdfae0..9f1ab62 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1275,7 +1275,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           Collections.emptyMap())
       )
     } else {
-      // let the coordinator to handle join-group
+      // let the coordinator handle join-group
       val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol =>
         (protocol.name, Utils.toArray(protocol.metadata))).toList
       groupCoordinator.handleJoinGroup(
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index befc16c..c1f8b81 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -37,7 +37,6 @@ import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
 import org.apache.kafka.common.utils.Utils
 
-import scala.collection.JavaConversions
 import scala.collection.JavaConverters._
 
 /**
@@ -170,7 +169,7 @@ object ConsoleConsumer extends Logging {
   def checkErr(output: PrintStream, formatter: MessageFormatter): Boolean = {
     val gotError = output.checkError()
     if (gotError) {
-      // This means no one is listening to our output stream any more, time to shutdown
+      // This means no one is listening to our output stream anymore, time to shutdown
       System.err.println("Unable to write to standard out, closing consumer.")
     }
     gotError
@@ -535,21 +534,21 @@ class DefaultMessageFormatter extends MessageFormatter {
     // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
     if (props.containsKey("key.deserializer")) {
       keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
-      keyDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(propertiesWithKeyPrefixStripped("key.deserializer.",
props)).asJava, true)
+      keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.",
props).asScala.asJava, true)
     }
     // Note that `toString` will be called on the instance returned by `Deserializer.deserialize`
     if (props.containsKey("value.deserializer")) {
       valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).newInstance().asInstanceOf[Deserializer[_]])
-      valueDeserializer.get.configure(JavaConversions.propertiesAsScalaMap(propertiesWithKeyPrefixStripped("value.deserializer.",
props)).asJava, false)
+      valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.",
props).asScala.asJava, false)
     }
   }
 
   private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties
= {
     val newProps = new Properties()
-    import scala.collection.JavaConversions._
-    for ((key, value) <- props if key.startsWith(prefix) && key.length > prefix.length)
-      newProps.put(key.substring(prefix.length), value)
-
+    props.asScala.foreach { case (key, value) =>
+      if (key.startsWith(prefix) && key.length > prefix.length)
+        newProps.put(key.substring(prefix.length), value)
+    }
     newProps
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 28fcdfc..bf74a3e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2515,7 +2515,7 @@ class LogTest {
     log.onHighWatermarkIncremented(log.logEndOffset)
     log.deleteOldSegments()
 
-    //The the first entry should have gone from (0,0) => (0,5)
+    //The first entry should have gone from (0,0) => (0,5)
     assertEquals(ListBuffer(EpochEntry(0, 5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries)
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
index 4a47400..9071f95 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestTest.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartition
 import org.junit.{Before, Test}
 import org.junit.Assert._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class AddPartitionsToTxnRequestTest extends BaseRequestTest {
   private val topic1 = "foobartopic"
@@ -69,7 +69,7 @@ class AddPartitionsToTxnRequestTest extends BaseRequestTest {
     val transactionalId = "foobar"
     val producerId = 1000L
     val producerEpoch: Short = 0
-    val builder = new AddPartitionsToTxnRequest.Builder(transactionalId, producerId, producerEpoch,
partitions)
+    val builder = new AddPartitionsToTxnRequest.Builder(transactionalId, producerId, producerEpoch,
partitions.asJava)
     builder.build()
   }
 }
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 45d1f7d..07f8544 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -139,7 +139,7 @@
     <p>
         We have added support to allow routing records dynamically to Kafka topics. More
specifically, in both the lower-level <code>Topology#addSink</code> and higher-level
<code>KStream#to</code> APIs, we have added variants that
         take a <code>TopicNameExtractor</code> instance instead of a specific
<code>String</code> typed topic name, such that for each received record from
the upstream processor, the library will dynamically determine which Kafka topic to write
to
-        based on the record's key and value, as well as record context. Note that all the
Kafka topics that that may possibly be used are still considered as user topics and hence
required to be pre-created. In addition to that, we have modified the
+        based on the record's key and value, as well as record context. Note that all the
Kafka topics that may possibly be used are still considered as user topics and hence required
to be pre-created. In addition to that, we have modified the
         <code>StreamPartitioner</code> interface to add the topic name parameter
since the topic name now may not be known beforehand; users who have customized implementations
of this interface would need to update their code while upgrading their application
         to use Kafka Streams 2.0.0.
     </p>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 03d1feb..f3e5d9d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -59,7 +59,7 @@
         with the new protocol by default.</li>
     <li>Bumping the protocol version and restarting can be done any time after the
brokers are upgraded. It does not have to be immediately after.
         Similarly for the message format version.</li>
-    <li>If you are using Java8 method references in your Kafka Streams code you might
need to update your code to resolve method ambiguties.
+    <li>If you are using Java8 method references in your Kafka Streams code you might
need to update your code to resolve method ambiguities.
         Hot-swapping the jar-file only might not work.</li>
 </ol>
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 612a453..e109345 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1028,7 +1028,7 @@ public class KafkaStreams {
      * @param <T>                 return type
      * @return A facade wrapping the local {@link StateStore} instances
      * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store
with {@code storeName} and
-     * {@code queryableStoreType} doesnt' exist
+     * {@code queryableStoreType} doesn't exist
      */
     public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType)
{
         validateIsRunning();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index e8ee507..d379e0d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -46,6 +46,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import scala.collection.JavaConverters;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -116,7 +117,8 @@ public class InternalTopicIntegrationTest {
                 DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE,
                 Time.SYSTEM, "testMetricGroup", "testMetricType")) {
             final AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
-            final Map<String, Properties> topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
+            final Map<String, Properties> topicConfigs =
+                JavaConverters.mapAsJavaMapConverter(adminZkClient.getAllTopicConfigs()).asJava();
 
             for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet())
{
                 if (topicConfig.getKey().equals(changelog)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index c33a720..3ea365c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -29,6 +29,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.rules.ExternalResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -296,8 +297,8 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
         @Override
         public boolean conditionMet() {
-            final Set<String> allTopics = new HashSet<>();
-            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            final Set<String> allTopics = new HashSet<>(
+                    JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
             return !allTopics.removeAll(deletedTopics);
         }
     }
@@ -311,8 +312,8 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
         @Override
         public boolean conditionMet() {
-            final Set<String> allTopics = new HashSet<>();
-            allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
+            final Set<String> allTopics = new HashSet<>(
+                    JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
             return allTopics.equals(remainingTopics);
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index 6aafac0..55986bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -20,21 +20,19 @@ import kafka.admin.RackAwareMode;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaConfig$;
 import kafka.server.KafkaServer;
-import kafka.utils.CoreUtils;
 import kafka.utils.MockTime;
 import kafka.utils.TestUtils;
 import kafka.zk.AdminZkClient;
 import kafka.zk.KafkaZkClient;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
 import java.util.Properties;
 
 /**
@@ -128,9 +126,12 @@ public class KafkaEmbedded {
             brokerList(), zookeeperConnect());
         kafka.shutdown();
         kafka.awaitShutdown();
-        log.debug("Removing logs.dir at {} ...", logDir);
-        final List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath());
-        CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq());
+        log.debug("Removing log dir at {} ...", logDir);
+        try {
+            Utils.delete(logDir);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
         tmpFolder.delete();
         log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at
{}) ...",
             brokerList(), zookeeperConnect());

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.

Mime
View raw message