kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4796; Fix some findbugs warnings in Kafka Java client
Date Sat, 04 Mar 2017 00:52:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d2792e356 -> d9b784e14


KAFKA-4796; Fix some findbugs warnings in Kafka Java client

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2593 from cmccabe/KAFKA-4796


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d9b784e1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d9b784e1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d9b784e1

Branch: refs/heads/trunk
Commit: d9b784e1470714c8b04e7c3d74f626a96ca1591e
Parents: d2792e3
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Sat Mar 4 00:52:26 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Mar 4 00:52:26 2017 +0000

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |  2 +-
 .../producer/internals/DefaultPartitioner.java  |  4 +-
 .../apache/kafka/common/config/ConfigDef.java   |  7 +-
 .../apache/kafka/common/protocol/Protocol.java  |  5 --
 .../kafka/common/protocol/types/Struct.java     |  2 +-
 .../org/apache/kafka/common/utils/Bytes.java    |  3 +-
 .../org/apache/kafka/common/utils/Utils.java    | 67 +++++---------------
 .../org/apache/kafka/clients/MockClient.java    |  2 +-
 .../internals/AbstractCoordinatorTest.java      |  2 +-
 .../internals/RecordAccumulatorTest.java        |  2 +-
 .../clients/producer/internals/SenderTest.java  |  4 +-
 .../types/ProtocolSerializationTest.java        | 12 ++++
 .../kafka/common/security/JaasContextTest.java  |  4 +-
 .../security/scram/ScramMessagesTest.java       |  2 +-
 .../common/utils/AbstractIteratorTest.java      |  2 +-
 .../apache/kafka/common/utils/UtilsTest.java    | 64 +++++++++++++++++++
 .../org/apache/kafka/test/TestSslUtils.java     |  4 +-
 17 files changed, 113 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 9f13307..a39695f 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -32,7 +32,7 @@
               files=".*/protocol/Errors.java"/>
 
     <suppress checks="BooleanExpressionComplexity"
-              files="KafkaLZ4BlockOutputStream.java"/>
+              files="(Utils|KafkaLZ4BlockOutputStream).java"/>
 
     <suppress checks="CyclomaticComplexity"
               files="ConsumerCoordinator.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index 086534a..9d4ecbf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -18,9 +18,9 @@ package org.apache.kafka.clients.producer.internals;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.producer.Partitioner;
@@ -73,7 +73,7 @@ public class DefaultPartitioner implements Partitioner {
     private int nextValue(String topic) {
         AtomicInteger counter = topicCounterMap.get(topic);
         if (null == counter) {
-            counter = new AtomicInteger(new Random().nextInt());
+            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
             AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
             if (currentCounter != null) {
                 counter = currentCounter;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index f7cb8a9..3396f63 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -72,7 +72,10 @@ import java.util.Set;
  * functionality for accessing configs.
  */
 public class ConfigDef {
-
+    /**
+     * A unique Java object which represents the lack of a default value.<p>
+     * The 'new' here is intentional.
+     */
     public static final Object NO_DEFAULT_VALUE = new String("");
 
     private final Map<String, ConfigKey> configKeys;
@@ -816,7 +819,7 @@ public class ConfigDef {
 
         public void ensureValid(String name, Object o) {
             if (o == null)
-                throw new ConfigException(name, o, "Value must be non-null");
+                throw new ConfigException(name, null, "Value must be non-null");
             Number n = (Number) o;
             if (min != null && n.doubleValue() < min.doubleValue())
                 throw new ConfigException(name, o, "Value must be at least " + min);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 25d380b..3343133 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1090,11 +1090,6 @@ public class Protocol {
                 Type innerType = ((ArrayOf) field.type).type();
                 if (!subTypes.containsKey(field.name))
                     subTypes.put(field.name, innerType);
-            } else if (field.type instanceof Schema) {
-                b.append(field.name);
-                b.append(" ");
-                if (!subTypes.containsKey(field.name))
-                    subTypes.put(field.name, field.type);
             } else {
                 b.append(field.name);
                 b.append(" ");

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index c32aea7..325690d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -349,7 +349,7 @@ public class Struct {
             } else {
                 Object thisField = this.get(f);
                 Object otherField = other.get(f);
-                result = (thisField == null && otherField == null) || thisField.equals(otherField);
+                return (thisField == null) ? (otherField == null) : thisField.equals(otherField);
             }
             if (!result)
                 return false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
index e28d925..4099155 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Comparator;
 
@@ -138,7 +139,7 @@ public class Bytes implements Comparable<Bytes> {
      */
     public final static Comparator<byte[]> BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();
 
-    private interface ByteArrayComparator extends Comparator<byte[]> {
+    private interface ByteArrayComparator extends Comparator<byte[]>, Serializable
{
 
         int compare(final byte[] buffer1, int offset1, int length1,
                     final byte[] buffer2, int offset2, int length2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 20ab814..ed5eddb 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -103,16 +103,6 @@ public class Utils {
     }
 
     /**
-     * Read an unsigned integer from the current position in the buffer, incrementing the
position by 4 bytes
-     *
-     * @param buffer The buffer to read from
-     * @return The integer read, as a long to avoid signedness
-     */
-    public static long readUnsignedInt(ByteBuffer buffer) {
-        return buffer.getInt() & 0xffffffffL;
-    }
-
-    /**
      * Read an unsigned integer from the given position without modifying the buffers position
      *
      * @param buffer the buffer to read from
@@ -130,28 +120,13 @@ public class Utils {
      * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
      */
     public static int readUnsignedIntLE(InputStream in) throws IOException {
-        return (in.read() << 8 * 0)
-             | (in.read() << 8 * 1)
-             | (in.read() << 8 * 2)
-             | (in.read() << 8 * 3);
+        return in.read()
+                | (in.read() << 8)
+                | (in.read() << 16)
+                | (in.read() << 24);
     }
 
     /**
-     * Get the little-endian value of an integer as a byte array.
-     * @param val The value to convert to a little-endian array
-     * @return The little-endian encoded array of bytes for the value
-     */
-    public static byte[] toArrayLE(int val) {
-        return new byte[] {
-            (byte) (val >> 8 * 0),
-            (byte) (val >> 8 * 1),
-            (byte) (val >> 8 * 2),
-            (byte) (val >> 8 * 3)
-        };
-    }
-
-
-    /**
      * Read an unsigned integer stored in little-endian format from a byte array
      * at a given offset.
      *
@@ -160,20 +135,10 @@ public class Utils {
      * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
      */
     public static int readUnsignedIntLE(byte[] buffer, int offset) {
-        return (buffer[offset++] << 8 * 0)
-             | (buffer[offset++] << 8 * 1)
-             | (buffer[offset++] << 8 * 2)
-             | (buffer[offset]   << 8 * 3);
-    }
-
-    /**
-     * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
-     *
-     * @param buffer The buffer to write to
-     * @param value The value to write
-     */
-    public static void writeUnsignedInt(ByteBuffer buffer, long value) {
-        buffer.putInt((int) (value & 0xffffffffL));
+        return (buffer[offset] << 0 & 0xff)
+                | ((buffer[offset + 1] & 0xff) << 8)
+                | ((buffer[offset + 2] & 0xff) << 16)
+                | ((buffer[offset + 3] & 0xff) << 24);
     }
 
     /**
@@ -194,10 +159,10 @@ public class Utils {
      * @param value The value to write
      */
     public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException
{
-        out.write(value >>> 8 * 0);
-        out.write(value >>> 8 * 1);
-        out.write(value >>> 8 * 2);
-        out.write(value >>> 8 * 3);
+        out.write(value);
+        out.write(value >>> 8);
+        out.write(value >>> 16);
+        out.write(value >>> 24);
     }
 
     /**
@@ -209,10 +174,10 @@ public class Utils {
      * @param value The value to write
      */
     public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
-        buffer[offset++] = (byte) (value >>> 8 * 0);
-        buffer[offset++] = (byte) (value >>> 8 * 1);
-        buffer[offset++] = (byte) (value >>> 8 * 2);
-        buffer[offset]   = (byte) (value >>> 8 * 3);
+        buffer[offset] = (byte) value;
+        buffer[offset + 1] = (byte) (value >>> 8);
+        buffer[offset + 2] = (byte) (value >>> 16);
+        buffer[offset + 3]   = (byte) (value >>> 24);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index f97e407..7e05881 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -47,7 +47,7 @@ public class MockClient implements KafkaClient {
         }
     };
 
-    private class FutureResponse {
+    private static class FutureResponse {
         public final AbstractResponse responseBody;
         public final boolean disconnected;
         public final RequestMatcher requestMatcher;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 3eb6561..45ee29a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -483,7 +483,7 @@ public class AbstractCoordinatorTest {
         return new SyncGroupResponse(error, ByteBuffer.allocate(0));
     }
 
-    public class DummyCoordinator extends AbstractCoordinator {
+    public static class DummyCoordinator extends AbstractCoordinator {
 
         private int onJoinPrepareInvokes = 0;
         private int onJoinCompleteInvokes = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index b96594f..1cb510e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -84,7 +84,7 @@ public class RecordAccumulatorTest {
     public void testFull() throws Exception {
         long now = time.milliseconds();
         int batchSize = 1024;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * batchSize, CompressionType.NONE,
10L, 100L, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10L * batchSize, CompressionType.NONE,
10L, 100L, metrics, time);
         int appends = batchSize / msgSize;
         for (int i = 0; i < appends; i++) {
             // append to the first batch

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index e9a7188..50ea219 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -151,7 +151,7 @@ public class SenderTest {
             sender.run(time.milliseconds()); // connect
             sender.run(time.milliseconds()); // send produce request
             String id = client.requests().peek().destination();
-            Node node = new Node(Integer.valueOf(id), "localhost", 0);
+            Node node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
             client.disconnect(id);
@@ -210,7 +210,7 @@ public class SenderTest {
             sender.run(time.milliseconds()); // send produce request
             String id = client.requests().peek().destination();
             assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
-            Node node = new Node(Integer.valueOf(id), "localhost", 0);
+            Node node = new Node(Integer.parseInt(id), "localhost", 0);
             assertEquals(1, client.inFlightRequestCount());
             assertTrue("Client ready status should be true", client.isReady(node, 0L));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 74c9302..1c14e82 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.protocol.types;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -258,4 +259,15 @@ public class ProtocolSerializationTest {
         assertEquals("The object read back should be the same as what was written.", obj,
result);
     }
 
+    @Test
+    public void testStructEquals() {
+        Schema schema = new Schema(new Field("field1", Type.NULLABLE_STRING), new Field("field2",
Type.NULLABLE_STRING));
+        Struct emptyStruct1 = new Struct(schema);
+        Struct emptyStruct2 = new Struct(schema);
+        assertEquals(emptyStruct1, emptyStruct2);
+
+        Struct mostlyEmptyStruct = new Struct(schema).set("field1", "foo");
+        assertNotEquals(emptyStruct1, mostlyEmptyStruct);
+        assertNotEquals(mostlyEmptyStruct, emptyStruct1);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
index 8d98a11..30799c5 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/JaasContextTest.java
@@ -58,8 +58,8 @@ public class JaasContextTest {
     }
 
     @After
-    public void tearDown() {
-        jaasConfigFile.delete();
+    public void tearDown() throws Exception {
+        Files.delete(jaasConfigFile.toPath());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
index 53939ef..de97ce2 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
@@ -252,7 +252,7 @@ public class ScramMessagesTest {
         checkServerFinalMessage(m, null, serverSignature);
 
         // Default format used by Kafka clients for final message with error
-        str = String.format("e=other-error", serverSignature);
+        str = "e=other-error";
         m = createScramMessage(ServerFinalMessage.class, str);
         checkServerFinalMessage(m, "other-error", null);
         m = new ServerFinalMessage(m.toBytes());

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
index 96adbbe..5ddab74 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java
@@ -52,7 +52,7 @@ public class AbstractIteratorTest {
         iter.next();
     }
 
-    class ListIterator<T> extends AbstractIterator<T> {
+    static class ListIterator<T> extends AbstractIterator<T> {
         private List<T> list;
         private int position = 0;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index c3f69fa..5f36c1c 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
@@ -35,6 +37,7 @@ import org.junit.Test;
 import static org.apache.kafka.common.utils.Utils.formatAddress;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -333,4 +336,65 @@ public class UtilsTest {
                 assertEquals(closeablesWithException[i].closeException, suppressed[i - 1]);
         }
     }
+
+    @Test
+    public void testReadUnsignedIntLEFromArray() {
+        byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05};
+        assertEquals(0x04030201, Utils.readUnsignedIntLE(array1, 0));
+        assertEquals(0x05040302, Utils.readUnsignedIntLE(array1, 1));
+
+        byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5,
(byte) 0xf6};
+        assertEquals(0xf4f3f2f1, Utils.readUnsignedIntLE(array2, 0));
+        assertEquals(0xf6f5f4f3, Utils.readUnsignedIntLE(array2, 2));
+    }
+
+    @Test
+    public void testReadUnsignedIntLEFromInputStream() throws IOException {
+        byte[] array1 = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09};
+        ByteArrayInputStream is1 = new ByteArrayInputStream(array1);
+        assertEquals(0x04030201, Utils.readUnsignedIntLE(is1));
+        assertEquals(0x08070605, Utils.readUnsignedIntLE(is1));
+
+        byte[] array2 = {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4, (byte) 0xf5,
(byte) 0xf6, (byte) 0xf7, (byte) 0xf8};
+        ByteArrayInputStream is2 = new ByteArrayInputStream(array2);
+        assertEquals(0xf4f3f2f1, Utils.readUnsignedIntLE(is2));
+        assertEquals(0xf8f7f6f5, Utils.readUnsignedIntLE(is2));
+    }
+
+    @Test
+    public void testWriteUnsignedIntLEToArray() {
+        int value1 = 0x04030201;
+
+        byte[] array1 = new byte[4];
+        Utils.writeUnsignedIntLE(array1, 0, value1);
+        assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04}, array1);
+
+        array1 = new byte[8];
+        Utils.writeUnsignedIntLE(array1, 2, value1);
+        assertArrayEquals(new byte[] {0, 0, 0x01, 0x02, 0x03, 0x04, 0, 0}, array1);
+
+        int value2 = 0xf4f3f2f1;
+
+        byte[] array2 = new byte[4];
+        Utils.writeUnsignedIntLE(array2, 0, value2);
+        assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4},
array2);
+
+        array2 = new byte[8];
+        Utils.writeUnsignedIntLE(array2, 2, value2);
+        assertArrayEquals(new byte[] {0, 0, (byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte)
0xf4, 0, 0}, array2);
+    }
+
+    @Test
+    public void testWriteUnsignedIntLEToOutputStream() throws IOException {
+        int value1 = 0x04030201;
+        ByteArrayOutputStream os1 = new ByteArrayOutputStream();
+        Utils.writeUnsignedIntLE(os1, value1);
+        Utils.writeUnsignedIntLE(os1, value1);
+        assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04}, os1.toByteArray());
+
+        int value2 = 0xf4f3f2f1;
+        ByteArrayOutputStream os2 = new ByteArrayOutputStream();
+        Utils.writeUnsignedIntLE(os2, value2);
+        assertArrayEquals(new byte[] {(byte) 0xf1, (byte) 0xf2, (byte) 0xf3, (byte) 0xf4},
os2.toByteArray());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d9b784e1/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index 7b78d3e..f4f8818 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -152,10 +152,8 @@ public class TestSslUtils {
     public static <T extends Certificate> void createTrustStore(
             String filename, Password password, Map<String, T> certs) throws GeneralSecurityException,
IOException {
         KeyStore ks = KeyStore.getInstance("JKS");
-        try {
-            FileInputStream in = new FileInputStream(filename);
+        try (FileInputStream in = new FileInputStream(filename)) {
             ks.load(in, password.value().toCharArray());
-            in.close();
         } catch (EOFException e) {
             ks = createEmptyKeyStore();
         }


Mime
View raw message