kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject kafka git commit: KAFKA-1054; Eliminate Scala Compilation Warnings
Date Tue, 28 Apr 2015 03:38:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 22b3db47e -> 2e90f5e51


KAFKA-1054; Eliminate Scala Compilation Warnings

Changes:
- Suppressed compiler warnings about type erasure in matching via unboxing
by Jon Riehl.
- Suppressed warning caused by slight difference in input function type
by John Riehl.
- Fix compiler warnings: ServerShutdownTest, DelayedJoinGroup function
signature by Blake Smith.
- Fix Scala 2.11 warnings. `Pair` has been deprecated, `try` without
`catch` and `finally` is useless and initialisation order fix by Ismael
Juma.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e90f5e5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e90f5e5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e90f5e5

Branch: refs/heads/trunk
Commit: 2e90f5e51f688ca2e347668ab83d2d0c2730d071
Parents: 22b3db4
Author: Jon Riehl <jriehl@spaceship.com>
Authored: Fri Sep 5 18:25:32 2014 -0500
Committer: Neha Narkhede <nehanarkhede@apache.org>
Committed: Mon Apr 27 20:37:51 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminUtils.scala       | 13 +++++++++----
 .../src/main/scala/kafka/consumer/ConsumerConfig.scala |  4 ++--
 .../main/scala/kafka/consumer/ConsumerIterator.scala   |  6 ++----
 .../scala/kafka/coordinator/DelayedJoinGroup.scala     |  4 ++--
 core/src/main/scala/kafka/utils/CoreUtils.scala        |  4 ++--
 5 files changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e90f5e5/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index eee80f9..f06edf4 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -317,12 +317,17 @@ object AdminUtils extends Logging {
     if(str != null) {
       Json.parseFull(str) match {
         case None => // there are no config overrides
-        case Some(map: Map[String, _]) => 
+        case Some(mapAnon: Map[_, _]) =>
+          val map = mapAnon collect { case (k: String, v: Any) => k -> v }
           require(map("version") == 1)
           map.get("config") match {
-            case Some(config: Map[String, String]) =>
-              for((k,v) <- config)
-                props.setProperty(k, v)
+            case Some(config: Map[_, _]) =>
+              for(configTup <- config)
+                configTup match {
+                  case (k: String, v: String) =>
+                    props.setProperty(k, v)
+                  case _ => throw new IllegalArgumentException("Invalid topic config:
" + str)
+                }
             case _ => throw new IllegalArgumentException("Invalid topic config: " + str)
           }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e90f5e5/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 9ebbee6..0199317 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -104,8 +104,6 @@ class ConsumerConfig private (val props: VerifiableProperties) extends
ZKConfig(
 
   /** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms.
*/
   val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout)
-  require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least
fetch.wait.max.ms" +
-    " to prevent unnecessary socket timeouts")
   
   /** the socket receive buffer for network requests */
   val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize)
@@ -133,6 +131,8 @@ class ConsumerConfig private (val props: VerifiableProperties) extends
ZKConfig(
   
   /** the maximum amount of time the server will block before answering the fetch request
if there isn't sufficient data to immediately satisfy fetch.min.bytes */
   val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs)
+  require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least
fetch.wait.max.ms" +
+    " to prevent unnecessary socket timeouts")
   
   /** backoff time between retries during rebalance */
   val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e90f5e5/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 0c5c451..0c6c810 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -104,10 +104,8 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk
   }
 
   def clearCurrentChunk() {
-    try {
-      debug("Clearing the current data chunk for this consumer iterator")
-      current.set(null)
-    }
+    debug("Clearing the current data chunk for this consumer iterator")
+    current.set(null)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e90f5e5/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
index df60cbc..f5bd5dc 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
@@ -28,7 +28,7 @@ import kafka.server.DelayedOperation
  */
 class DelayedJoinGroup(sessionTimeout: Long,
                        consumerRegistry: ConsumerRegistry,
-                       responseCallback: () => Unit) extends DelayedOperation(sessionTimeout)
{
+                       responseCallback: => Unit) extends DelayedOperation(sessionTimeout)
{
 
   /* always successfully complete the operation once called */
   override def tryComplete(): Boolean = {
@@ -45,4 +45,4 @@ class DelayedJoinGroup(sessionTimeout: Long,
     // TODO
     responseCallback
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e90f5e5/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 98abc45..d0a8fa7 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -66,7 +66,7 @@ object CoreUtils extends Logging {
    * @param fun The runction to execute in the thread
    * @return The unstarted thread
    */
-  def daemonThread(name: String, fun: () => Unit): Thread =
+  def daemonThread(name: String, fun: => Unit): Thread =
     Utils.daemonThread(name, runnable(fun))
 
   /**
@@ -207,7 +207,7 @@ object CoreUtils extends Logging {
       return map
     val keyVals = str.split("\\s*,\\s*").map(s => {
       val lio = s.lastIndexOf(":")
-      Pair(s.substring(0,lio).trim, s.substring(lio + 1).trim)
+      (s.substring(0,lio).trim, s.substring(lio + 1).trim)
     })
     keyVals.toMap
   }


Mime
View raw message