kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joest...@apache.org
Subject kafka git commit: KAFKA-1812 Allow IpV6 in configuration with parseCsvMap patch by Jeff Holoman reviewed by Gwen Shapira and Joe Stein
Date Fri, 12 Dec 2014 08:45:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 280162996 -> e8ffbd0fe


KAFKA-1812 Allow IpV6 in configuration with parseCsvMap patch by Jeff Holoman reviewed by
Gwen Shapira and Joe Stein


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e8ffbd0f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e8ffbd0f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e8ffbd0f

Branch: refs/heads/trunk
Commit: e8ffbd0fee0bc715ad0fe6c9afe85715f84d8e51
Parents: 2801629
Author: Joe Stein <joe.stein@stealth.ly>
Authored: Fri Dec 12 03:46:29 2014 -0500
Committer: Joe Stein <joe.stein@stealth.ly>
Committed: Fri Dec 12 03:46:29 2014 -0500

----------------------------------------------------------------------
 core/src/main/scala/kafka/utils/Utils.scala     | 105 ++++++++++---------
 .../test/scala/unit/kafka/utils/UtilsTest.scala |  36 +++++++
 2 files changed, 91 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e8ffbd0f/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 58685cc..738c1af 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -33,10 +33,10 @@ import kafka.common.KafkaStorageException
 
 /**
  * General helper functions!
- * 
+ *
  * This is for general helper functions that aren't specific to Kafka logic. Things that
should have been included in
- * the standard library etc. 
- * 
+ * the standard library etc.
+ *
  * If you are making a new helper function and want to add it to this class please ensure
the following:
  * 1. It has documentation
  * 2. It is the most general possible utility, not just the thing you needed in one particular
place
@@ -68,18 +68,18 @@ object Utils extends Logging {
    * @param runnable The runnable to execute in the background
    * @return The unstarted thread
    */
-  def daemonThread(name: String, runnable: Runnable): Thread = 
+  def daemonThread(name: String, runnable: Runnable): Thread =
     newThread(name, runnable, true)
-  
+
   /**
    * Create a daemon thread
    * @param name The name of the thread
    * @param fun The runction to execute in the thread
    * @return The unstarted thread
    */
-  def daemonThread(name: String, fun: () => Unit): Thread = 
+  def daemonThread(name: String, fun: () => Unit): Thread =
     daemonThread(name, runnable(fun))
-  
+
   /**
    * Create a new thread
    * @param name The name of the thread
@@ -88,16 +88,16 @@ object Utils extends Logging {
    * @return The unstarted thread
    */
   def newThread(name: String, runnable: Runnable, daemon: Boolean): Thread = {
-    val thread = new Thread(runnable, name) 
+    val thread = new Thread(runnable, name)
     thread.setDaemon(daemon)
     thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
       def uncaughtException(t: Thread, e: Throwable) {
         error("Uncaught exception in thread '" + t.getName + "':", e)
-      } 
+      }
     })
     thread
   }
-   
+
   /**
    * Create a new thread
    * @param runnable The work for the thread to do
@@ -114,7 +114,7 @@ object Utils extends Logging {
     })
     thread
   }
-  
+
   /**
    * Read the given byte buffer into a byte array
    */
@@ -161,7 +161,7 @@ object Utils extends Logging {
     else
       new FileInputStream(file).getChannel()
   }
-  
+
   /**
    * Do the given action and log any exceptions thrown without rethrowing them
    * @param log The log method to use for logging. E.g. logger.warn
@@ -174,7 +174,7 @@ object Utils extends Logging {
       case e: Throwable => log(e.getMessage(), e)
     }
   }
-  
+
   /**
    * Test if two byte buffers are equal. In this case equality means having
    * the same bytes from the current position to the limit
@@ -191,7 +191,7 @@ object Utils extends Logging {
         return false
     return true
   }
-  
+
   /**
    * Translate the given buffer into a string
    * @param buffer The buffer to translate
@@ -202,7 +202,7 @@ object Utils extends Logging {
     buffer.get(bytes)
     new String(bytes, encoding)
   }
-  
+
   /**
    * Print an error message and shutdown the JVM
    * @param message The error message
@@ -211,19 +211,19 @@ object Utils extends Logging {
     System.err.println(message)
     System.exit(1)
   }
-  
+
   /**
    * Recursively delete the given file/directory and any subfiles (if any exist)
    * @param file The root file at which to begin deleting
    */
   def rm(file: String): Unit = rm(new File(file))
-  
+
   /**
    * Recursively delete the list of files/directories and any subfiles (if any exist)
    * @param a sequence of files to be deleted
    */
   def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f)))
-  
+
   /**
    * Recursively delete the given file/directory and any subfiles (if any exist)
    * @param file The root file at which to begin deleting
@@ -242,7 +242,7 @@ object Utils extends Logging {
 	    file.delete()
 	  }
   }
-  
+
   /**
    * Register the given mbean with the platform mbean server,
    * unregistering any mbean that was there before. Note,
@@ -270,7 +270,7 @@ object Utils extends Logging {
       }
     }
   }
-  
+
   /**
    * Unregister the mbean with the given name, if there is one registered
    * @param name The mbean name to unregister
@@ -283,16 +283,16 @@ object Utils extends Logging {
         mbs.unregisterMBean(objName)
     }
   }
-  
+
   /**
-   * Read an unsigned integer from the current position in the buffer, 
+   * Read an unsigned integer from the current position in the buffer,
    * incrementing the position by 4 bytes
    * @param buffer The buffer to read from
    * @return The integer read, as a long to avoid signedness
    */
-  def readUnsignedInt(buffer: ByteBuffer): Long = 
+  def readUnsignedInt(buffer: ByteBuffer): Long =
     buffer.getInt() & 0xffffffffL
-  
+
   /**
    * Read an unsigned integer from the given position without modifying the buffers
    * position
@@ -300,33 +300,33 @@ object Utils extends Logging {
    * @param index the index from which to read the integer
    * @return The integer read, as a long to avoid signedness
    */
-  def readUnsignedInt(buffer: ByteBuffer, index: Int): Long = 
+  def readUnsignedInt(buffer: ByteBuffer, index: Int): Long =
     buffer.getInt(index) & 0xffffffffL
-  
+
   /**
    * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
    * @param buffer The buffer to write to
    * @param value The value to write
    */
-  def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit = 
+  def writetUnsignedInt(buffer: ByteBuffer, value: Long): Unit =
     buffer.putInt((value & 0xffffffffL).asInstanceOf[Int])
-  
+
   /**
    * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
    * @param buffer The buffer to write to
    * @param index The position in the buffer at which to begin writing
    * @param value The value to write
    */
-  def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit = 
+  def writeUnsignedInt(buffer: ByteBuffer, index: Int, value: Long): Unit =
     buffer.putInt(index, (value & 0xffffffffL).asInstanceOf[Int])
-  
+
   /**
    * Compute the CRC32 of the byte array
    * @param bytes The array to compute the checksum for
    * @return The CRC32
    */
   def crc32(bytes: Array[Byte]): Long = crc32(bytes, 0, bytes.length)
-  
+
   /**
    * Compute the CRC32 of the segment of the byte array given by the specificed size and
offset
    * @param bytes The bytes to checksum
@@ -339,7 +339,7 @@ object Utils extends Logging {
     crc.update(bytes, offset, size)
     crc.getValue()
   }
-  
+
   /**
    * Compute the hash code for the given items
    */
@@ -356,7 +356,7 @@ object Utils extends Logging {
     }
     return h
   }
-  
+
   /**
    * Group the given values by keys extracted with the given function
    */
@@ -368,12 +368,12 @@ object Utils extends Logging {
         case Some(l: List[V]) => m.put(k, v :: l)
         case None => m.put(k, List(v))
       }
-    } 
+    }
     m
   }
-  
+
   /**
-   * Read some bytes into the provided buffer, and return the number of bytes read. If the

+   * Read some bytes into the provided buffer, and return the number of bytes read. If the
    * channel has been closed or we get -1 on the read for any reason, throw an EOFException
    */
   def read(channel: ReadableByteChannel, buffer: ByteBuffer): Int = {
@@ -381,8 +381,8 @@ object Utils extends Logging {
       case -1 => throw new EOFException("Received -1 when reading from channel, socket
has likely been closed.")
       case n: Int => n
     }
-  } 
-  
+  }
+
   /**
    * Throw an exception if the given value is null, else return it. You can use this like:
    * val myValue = Utils.notNull(expressionThatShouldntBeNull)
@@ -407,15 +407,20 @@ object Utils extends Logging {
   /**
    * This method gets comma separated values which contains key,value pairs and returns a
map of
    * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
+   * Also supports strings with multiple ":" such as IpV6 addresses, taking the last occurrence
+   * of the ":" in the pair as the split, eg a:b:c:val1, d:e:f:val2 => a:b:c -> val1,
d:e:f -> val2
    */
   def parseCsvMap(str: String): Map[String, String] = {
     val map = new mutable.HashMap[String, String]
-    if("".equals(str))
-      return map    
-    val keyVals = str.split("\\s*,\\s*").map(s => s.split("\\s*:\\s*"))
-    keyVals.map(pair => (pair(0), pair(1))).toMap
+    if ("".equals(str))
+      return map
+    val keyVals = str.split("\\s*,\\s*").map(s => {
+      val lio = s.lastIndexOf(":")
+      Pair(s.substring(0,lio).trim, s.substring(lio + 1).trim)
+    })
+    keyVals.toMap
   }
-  
+
   /**
    * Parse a comma separated string into a sequence of strings.
    * Whitespace surrounding the comma will be removed.
@@ -467,7 +472,7 @@ object Utils extends Logging {
       stream.close()
     }
   }
-  
+
   /**
    * Get the absolute value of the given number. If the number is Int.MinValue return 0.
    * This is different from java.lang.Math.abs or scala.math.abs in that they return Int.MinValue
(!).
@@ -496,7 +501,7 @@ object Utils extends Logging {
       throw new KafkaStorageException("Failed to create file %s.".format(path))
     f
   }
-  
+
   /**
    * Turn a properties map into a string
    */
@@ -531,7 +536,7 @@ object Utils extends Logging {
     }
     evaluated
   }
-  
+
   /**
    * Read some properties with the given default values
    */
@@ -541,7 +546,7 @@ object Utils extends Logging {
     props.load(reader)
     props
   }
-  
+
   /**
    * Read a big-endian integer from a byte array
    */
@@ -551,7 +556,7 @@ object Utils extends Logging {
     ((bytes(offset + 2) & 0xFF) << 8) |
     (bytes(offset + 3) & 0xFF)
   }
-  
+
   /**
    * Execute the given function inside the lock
    */
@@ -590,7 +595,7 @@ object Utils extends Logging {
        */
       case c if ((c >= '\u0000' && c <= '\u001f') || (c >= '\u007f' &&
c <= '\u009f')) => "\\u%04x".format(c: Int)
       case c => c
-    }.mkString 
+    }.mkString
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/e8ffbd0f/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index 0d0f0e2..066553c 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -105,6 +105,42 @@ class UtilsTest extends JUnitSuite {
   }
 
   @Test
+  def testCsvMap() {
+    val emptyString: String = ""
+    val emptyMap = Utils.parseCsvMap(emptyString)
+    val emptyStringMap = Map.empty[String, String]
+    assertTrue(emptyMap != null)
+    assertTrue(emptyStringMap.equals(emptyStringMap))
+
+    val kvPairsIpV6: String = "a:b:c:v,a:b:c:v"
+    val ipv6Map = Utils.parseCsvMap(kvPairsIpV6)
+    for (m <- ipv6Map) {
+      assertTrue(m._1.equals("a:b:c"))
+      assertTrue(m._2.equals("v"))
+    }
+
+    val singleEntry:String = "key:value"
+    val singleMap = Utils.parseCsvMap(singleEntry)
+    val value = singleMap.getOrElse("key", 0)
+    assertTrue(value.equals("value"))
+
+    val kvPairsIpV4: String = "192.168.2.1/30:allow, 192.168.2.1/30:allow"
+    val ipv4Map = Utils.parseCsvMap(kvPairsIpV4)
+    for (m <- ipv4Map) {
+      assertTrue(m._1.equals("192.168.2.1/30"))
+      assertTrue(m._2.equals("allow"))
+    }
+
+    val kvPairsSpaces: String = "key:value      , key:   value"
+    val spaceMap = Utils.parseCsvMap(kvPairsSpaces)
+    for (m <- spaceMap) {
+      assertTrue(m._1.equals("key"))
+      assertTrue(m._2.equals("value"))
+    }
+  }
+
+
+  @Test
   def testInLock() {
     val lock = new ReentrantLock()
     val result = inLock(lock) {


Mime
View raw message