kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [1/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.
Date Wed, 04 Feb 2015 04:58:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk f1ba4ff87 -> 1c6d5bbac


http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 8a305b0..69530c1 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -29,7 +29,7 @@ public class Utils {
 
     private static final Pattern HOST_PORT_PATTERN = Pattern.compile("\\[?(.+?)\\]?:(\\d+)");
 
-    public static String NL = System.getProperty("line.separator");
+    public static final String NL = System.getProperty("line.separator");
 
     /**
      * Turn the given UTF8 byte array into a string
@@ -87,10 +87,10 @@ public class Utils {
      * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
      */
     public static int readUnsignedIntLE(InputStream in) throws IOException {
-        return (in.read() << 8*0) 
-             | (in.read() << 8*1)
-             | (in.read() << 8*2)
-             | (in.read() << 8*3);
+        return (in.read() << 8 * 0) 
+             | (in.read() << 8 * 1)
+             | (in.read() << 8 * 2)
+             | (in.read() << 8 * 3);
     }
 
     /**
@@ -102,10 +102,10 @@ public class Utils {
      * @return The integer read (MUST BE TREATED WITH SPECIAL CARE TO AVOID SIGNEDNESS)
      */
     public static int readUnsignedIntLE(byte[] buffer, int offset) {
-        return (buffer[offset++] << 8*0)
-             | (buffer[offset++] << 8*1)
-             | (buffer[offset++] << 8*2)
-             | (buffer[offset]   << 8*3);
+        return (buffer[offset++] << 8 * 0)
+             | (buffer[offset++] << 8 * 1)
+             | (buffer[offset++] << 8 * 2)
+             | (buffer[offset]   << 8 * 3);
     }
 
     /**
@@ -136,10 +136,10 @@ public class Utils {
      * @param value The value to write
      */
     public static void writeUnsignedIntLE(OutputStream out, int value) throws IOException {
-        out.write(value >>> 8*0);
-        out.write(value >>> 8*1);
-        out.write(value >>> 8*2);
-        out.write(value >>> 8*3);
+        out.write(value >>> 8 * 0);
+        out.write(value >>> 8 * 1);
+        out.write(value >>> 8 * 2);
+        out.write(value >>> 8 * 3);
     }
 
     /**
@@ -151,10 +151,10 @@ public class Utils {
      * @param value The value to write
      */
     public static void writeUnsignedIntLE(byte[] buffer, int offset, int value) {
-        buffer[offset++] = (byte) (value >>> 8*0);
-        buffer[offset++] = (byte) (value >>> 8*1);
-        buffer[offset++] = (byte) (value >>> 8*2);
-        buffer[offset]   = (byte) (value >>> 8*3);
+        buffer[offset++] = (byte) (value >>> 8 * 0);
+        buffer[offset++] = (byte) (value >>> 8 * 1);
+        buffer[offset++] = (byte) (value >>> 8 * 2);
+        buffer[offset]   = (byte) (value >>> 8 * 3);
     }
 
 
@@ -285,7 +285,7 @@ public class Utils {
             case 2:
                 h ^= (data[(length & ~3) + 1] & 0xff) << 8;
             case 1:
-                h ^= (data[length & ~3] & 0xff);
+                h ^= data[length & ~3] & 0xff;
                 h *= m;
         }
 
@@ -348,11 +348,11 @@ public class Utils {
     public static <T> String join(Collection<T> list, String seperator) {
         StringBuilder sb = new StringBuilder();
         Iterator<T> iter = list.iterator();
-        while(iter.hasNext()) {
+        while (iter.hasNext()) {
             sb.append(iter.next());
-            if(iter.hasNext())
-            sb.append(seperator);  
+            if (iter.hasNext())
+                sb.append(seperator);  
         }
-      return sb.toString();
+        return sb.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
new file mode 100644
index 0000000..13ce519
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class ClientUtilsTest {
+
+    @Test
+    public void testParseAndValidateAddresses() {
+        check("127.0.0.1:8000");
+        check("mydomain.com:8080");
+        check("[::1]:8000");
+        check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000");
+    }
+
+    @Test(expected = ConfigException.class)
+    public void testNoPort() {
+        check("127.0.0.1");
+    }
+
+    private void check(String... url) {
+        ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 67bee40..8f1a7a6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kafka.clients;
 
 import java.util.ArrayDeque;
@@ -65,7 +81,7 @@ public class MockClient implements KafkaClient {
 
     @Override
     public List<ClientResponse> poll(long timeoutMs, long now) {
-        for(ClientResponse response: this.responses)
+        for (ClientResponse response: this.responses)
             if (response.request().hasCallback()) 
                 response.request().callback().onComplete(response);
         List<ClientResponse> copy = new ArrayList<ClientResponse>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 5debcd6..8b27889 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kafka.clients;
 
 import static org.junit.Assert.assertEquals;
@@ -9,7 +25,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index e51d2df..677edd3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kafka.clients.consumer;
 
 import static org.junit.Assert.*;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 864f1c7..090087a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.kafka.clients.consumer.internals;
 
 import static org.junit.Assert.*;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
index 77b23e7..4ae43ed 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
@@ -107,7 +107,7 @@ public class BufferPoolTest {
 
     private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
         final CountDownLatch latch = new CountDownLatch(1);
-        new Thread() {
+        Thread thread = new Thread() {
             public void run() {
                 try {
                     latch.await();
@@ -116,13 +116,14 @@ public class BufferPoolTest {
                 }
                 pool.deallocate(buffer);
             }
-        }.start();
+        };
+        thread.start();
         return latch;
     }
 
     private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
         final CountDownLatch completed = new CountDownLatch(1);
-        new Thread() {
+        Thread thread = new Thread() {
             public void run() {
                 try {
                     pool.allocate(size);
@@ -132,7 +133,8 @@ public class BufferPoolTest {
                     completed.countDown();
                 }
             }
-        }.start();
+        };
+        thread.start();
         return completed;
     }
 
@@ -172,12 +174,12 @@ public class BufferPoolTest {
             try {
                 for (int i = 0; i < iterations; i++) {
                     int size;
-                    if (TestUtils.random.nextBoolean())
+                    if (TestUtils.RANDOM.nextBoolean())
                         // allocate poolable size
                         size = pool.poolableSize();
                     else
                         // allocate a random size
-                        size = TestUtils.random.nextInt((int) pool.totalMemory());
+                        size = TestUtils.RANDOM.nextInt((int) pool.totalMemory());
                     ByteBuffer buffer = pool.allocate(size);
                     pool.deallocate(buffer);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 74605c3..743aa7e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -12,7 +12,7 @@
  */
 package org.apache.kafka.clients.producer;
 
-import org.apache.kafka.clients.producer.internals.Metadata;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.test.TestUtils;
@@ -49,7 +49,7 @@ public class MetadataTest {
     }
 
     /**
-     * Tests that {@link org.apache.kafka.clients.producer.internals.Metadata#awaitUpdate(int, long)} doesn't
+     * Tests that {@link org.apache.kafka.clients.Metadata#awaitUpdate(int, long)} doesn't
      * wait forever with a max timeout value of 0
      *
      * @throws Exception
@@ -68,9 +68,9 @@ public class MetadataTest {
             // expected
         }
         // now try with a higher timeout value once
-        final long TWO_SECOND_WAIT = 2000;
+        final long twoSecondWait = 2000;
         try {
-            metadata.awaitUpdate(metadata.requestUpdate(), TWO_SECOND_WAIT);
+            metadata.awaitUpdate(metadata.requestUpdate(), twoSecondWait);
             fail("Wait on metadata update was expected to timeout, but it didn't");
         } catch (TimeoutException te) {
             // expected

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index d3377ef..75513b0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -32,6 +32,7 @@ public class MockProducerTest {
     private String topic = "topic";
 
     @Test
+    @SuppressWarnings("unchecked")
     public void testAutoCompleteMock() throws Exception {
         MockProducer producer = new MockProducer(true);
         ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topic, "key".getBytes(), "value".getBytes());

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
index 82d8083..29c8417 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
@@ -31,7 +31,7 @@ public class PartitionerTest {
     private Node node0 = new Node(0, "localhost", 99);
     private Node node1 = new Node(1, "localhost", 100);
     private Node node2 = new Node(2, "localhost", 101);
-    private Node[] nodes = new Node[] { node0, node1, node2 };
+    private Node[] nodes = new Node[] {node0, node1, node2};
     private String topic = "test";
     private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes),
                                                     new PartitionInfo(topic, 1, node1, nodes, nodes),

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index e2bb8da..8333863 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -128,6 +128,7 @@ public class RecordAccumulatorTest {
         assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
     }
 
+    @SuppressWarnings("unused")
     @Test
     public void testStressfulSituation() throws Exception {
         final int numThreads = 5;
@@ -194,7 +195,7 @@ public class RecordAccumulatorTest {
         assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs);
 
         // Add data for another partition on node1, enough to make data sendable immediately
-        for (int i = 0; i < appends+1; i++)
+        for (int i = 0; i < appends + 1; i++)
             accum.append(tp2, key, value, CompressionType.NONE, null);
         result = accum.ready(cluster, time.milliseconds());
         assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
index a3700a6..1e5d1c2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java
@@ -78,7 +78,7 @@ public class RecordSendTest {
     /* create a new request result that will be completed after the given timeout */
     public ProduceRequestResult asyncRequest(final long baseOffset, final RuntimeException error, final long timeout) {
         final ProduceRequestResult request = new ProduceRequestResult();
-        new Thread() {
+        Thread thread = new Thread() {
             public void run() {
                 try {
                     sleep(timeout);
@@ -87,7 +87,8 @@ public class RecordSendTest {
                     e.printStackTrace();
                 }
             }
-        }.start();
+        };
+        thread.start();
         return request;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 888b929..558942a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -21,8 +21,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.common.Cluster;
@@ -147,8 +147,8 @@ public class SenderTest {
         partResp.set("partition", part);
         partResp.set("error_code", (short) error);
         partResp.set("base_offset", offset);
-        response.set("partition_responses", new Object[] { partResp });
-        struct.set("responses", new Object[] { response });
+        response.set("partition_responses", new Object[] {partResp});
+        struct.set("responses", new Object[] {response});
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 3cfd36d..66442ed 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -14,87 +14,67 @@ package org.apache.kafka.common.config;
 
 import static org.junit.Assert.fail;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.junit.Test;
 
 public class AbstractConfigTest {
 
-  @Test
-  public void testConfiguredInstances() {
-    testValidInputs("");
-    testValidInputs("org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter");
-    testValidInputs("org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter,org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter");
-    testInvalidInputs(",");
-    testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
-    testInvalidInputs("test1,test2");
-    testInvalidInputs("org.apache.kafka.common.config.AbstractConfigTest$TestMetricsReporter,");
-  }
-
-  private void testValidInputs(String configValue) {
-    Properties props = new Properties();
-    props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
-    TestConfig config = new TestConfig(props);
-    try {
-      config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG,
-          MetricsReporter.class);
-    } catch (ConfigException e) {
-       fail("No exceptions are expected here, valid props are :" + props);
-    }
-  }
-  
-  private void testInvalidInputs(String configValue) {
-    Properties props = new Properties();
-    props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
-    TestConfig config = new TestConfig(props);
-    try {
-      config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG,
-          MetricsReporter.class);
-      fail("Expected a config exception due to invalid props :" + props);
-    } catch (ConfigException e) {
-      // this is good
+    @Test
+    public void testConfiguredInstances() {
+        testValidInputs("");
+        testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter");
+        testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter");
+        testInvalidInputs(",");
+        testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
+        testInvalidInputs("test1,test2");
+        testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
     }
-  }
-
-  private static class TestConfig extends AbstractConfig {
-
-    private static final ConfigDef config;
-
-    public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
-    private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters.";
 
-    static {
-      config = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG,
-          Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC);
+    private void testValidInputs(String configValue) {
+        Properties props = new Properties();
+        props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
+        TestConfig config = new TestConfig(props);
+        try {
+            config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+        } catch (ConfigException e) {
+            fail("No exceptions are expected here, valid props are :" + props);
+        }
     }
 
-    public TestConfig(Map<? extends Object, ? extends Object> props) {
-      super(config, props);
+    private void testInvalidInputs(String configValue) {
+        Properties props = new Properties();
+        props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
+        TestConfig config = new TestConfig(props);
+        try {
+            config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+            fail("Expected a config exception due to invalid props :" + props);
+        } catch (ConfigException e) {
+            // this is good
+        }
     }
-  }
-  
-  public static class TestMetricsReporter implements MetricsReporter {
 
-    @Override
-    public void configure(Map<String, ?> configs) {
-    }
+    private static class TestConfig extends AbstractConfig {
 
-    @Override
-    public void init(List<KafkaMetric> metrics) {
-}
+        private static final ConfigDef CONFIG;
 
-    @Override
-    public void metricChange(KafkaMetric metric) {
-    }
+        public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
+        private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters.";
+
+        static {
+            CONFIG = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG,
+                                            Type.LIST,
+                                            "",
+                                            Importance.LOW,
+                                            METRIC_REPORTER_CLASSES_DOC);
+        }
 
-    @Override
-    public void close() {
+        public TestConfig(Map<? extends Object, ? extends Object> props) {
+            super(CONFIG, props);
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 16d3fed..44c2ef0 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -16,7 +16,6 @@ import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -110,7 +109,7 @@ public class ConfigDefTest {
 
     @Test(expected = ConfigException.class)
     public void testInvalidDefaultRange() {
-        new ConfigDef().define("name", Type.INT, -1, Range.between(0,10), Importance.HIGH, "docs");
+        new ConfigDef().define("name", Type.INT, -1, Range.between(0, 10), Importance.HIGH, "docs");
     }
 
     @Test(expected = ConfigException.class)
@@ -120,7 +119,7 @@ public class ConfigDefTest {
 
     @Test
     public void testValidators() {
-        testValidators(Type.INT, Range.between(0,10), 5, new Object[]{1, 5, 9}, new Object[]{-1, 11});
+        testValidators(Type.INT, Range.between(0, 10), 5, new Object[]{1, 5, 9}, new Object[]{-1, 11});
         testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default",
                 new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs"});
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
new file mode 100644
index 0000000..7c7ead1
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.List;
+import java.util.Map;
+
+public class FakeMetricsReporter implements MetricsReporter {
+
+    @Override
+    public void configure(Map<String, ?> configs) {}
+
+    @Override
+    public void init(List<KafkaMetric> metrics) {}
+
+    @Override
+    public void metricChange(KafkaMetric metric) {}
+
+    @Override
+    public void close() {}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 998a57c..544e120 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -36,7 +36,7 @@ import org.junit.Test;
 
 public class MetricsTest {
 
-    private static double EPS = 0.000001;
+    private static final double EPS = 0.000001;
 
     MockTime time = new MockTime();
     Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time);
@@ -71,7 +71,7 @@ public class MetricsTest {
         s.add(new MetricName("test.count", "grp1"), new Count());
         s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
                              new Percentile(new MetricName("test.median", "grp1"), 50.0),
-                             new Percentile(new MetricName("test.perc99_9", "grp1"),99.9)));
+                             new Percentile(new MetricName("test.perc99_9", "grp1"), 99.9)));
 
         Sensor s2 = metrics.sensor("test.sensor2");
         s2.add(new MetricName("s2.total", "grp1"), new Total());

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
index 3be6b2d..a55cc32 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/HistogramTest.java
@@ -22,7 +22,6 @@ import java.util.Arrays;
 import java.util.Random;
 
 
-import org.apache.kafka.common.metrics.stats.Histogram;
 import org.apache.kafka.common.metrics.stats.Histogram.BinScheme;
 import org.apache.kafka.common.metrics.stats.Histogram.ConstantBinScheme;
 import org.apache.kafka.common.metrics.stats.Histogram.LinearBinScheme;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index a14659a..0d030bc 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -40,7 +40,6 @@ import org.junit.Test;
  */
 public class SelectorTest {
 
-    private static final List<NetworkSend> EMPTY = new ArrayList<NetworkSend>();
     private static final int BUFFER_SIZE = 4 * 1024;
 
     private EchoServer server;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 4480e9b..8b92634 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -23,12 +23,6 @@ import static org.junit.Assert.fail;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.SchemaException;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.protocol.types.Type;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -53,8 +47,8 @@ public class ProtocolSerializationTest {
                                              .set("int64", (long) 1)
                                              .set("string", "1")
                                              .set("bytes", "1".getBytes())
-                                             .set("array", new Object[] { 1 });
-        this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] { 1, 2, 3 }));
+                                             .set("array", new Object[] {1});
+        this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] {1, 2, 3}));
     }
 
     @Test
@@ -68,9 +62,9 @@ public class ProtocolSerializationTest {
         check(Type.STRING, "A\u00ea\u00f1\u00fcC");
         check(Type.BYTES, ByteBuffer.allocate(0));
         check(Type.BYTES, ByteBuffer.wrap("abcd".getBytes()));
-        check(new ArrayOf(Type.INT32), new Object[] { 1, 2, 3, 4 });
+        check(new ArrayOf(Type.INT32), new Object[] {1, 2, 3, 4});
         check(new ArrayOf(Type.STRING), new Object[] {});
-        check(new ArrayOf(Type.STRING), new Object[] { "hello", "there", "beautiful" });
+        check(new ArrayOf(Type.STRING), new Object[] {"hello", "there", "beautiful"});
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 94a1112..e343327 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -71,7 +71,7 @@ public class MemoryRecordsTest {
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<Object[]>();
         for (CompressionType type: CompressionType.values())
-            values.add(new Object[] { type });
+            values.add(new Object[] {type});
         return values;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
index 2765913..957fc8f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
@@ -63,7 +63,7 @@ public class RecordTest {
     @Test
     public void testChecksum() {
         assertEquals(record.checksum(), record.computeChecksum());
-        assertEquals(record.checksum(), record.computeChecksum(
+        assertEquals(record.checksum(), Record.computeChecksum(
             this.key == null ? null : this.key.array(),
             this.value == null ? null : this.value.array(),
             this.compression, 0, -1));
@@ -102,7 +102,7 @@ public class RecordTest {
         for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
             for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
                 for (CompressionType compression : CompressionType.values())
-                    values.add(new Object[] { key, value, compression });
+                    values.add(new Object[] {key, value, compression});
         return values;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index df37fc6..13237fd 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -17,6 +17,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.Errors;
 import org.junit.Test;
 
 import java.lang.reflect.Method;
@@ -31,7 +32,7 @@ import static org.junit.Assert.assertEquals;
 public class RequestResponseTest {
 
     @Test
-    public void testSerialization() throws Exception{
+    public void testSerialization() throws Exception {
         List<AbstractRequestResponse> requestList = Arrays.asList(
                 createRequestHeader(),
                 createResponseHeader(),
@@ -67,7 +68,7 @@ public class RequestResponseTest {
     }
 
     private AbstractRequestResponse createRequestHeader() {
-        return new RequestHeader((short)10, (short)1, "", 10);
+        return new RequestHeader((short) 10, (short) 1, "", 10);
     }
 
     private AbstractRequestResponse createResponseHeader() {
@@ -79,7 +80,7 @@ public class RequestResponseTest {
     }
 
     private AbstractRequestResponse createConsumerMetadataResponse() {
-        return new ConsumerMetadataResponse((short)1, new Node(10, "host1", 2014));
+        return new ConsumerMetadataResponse((short) 1, new Node(10, "host1", 2014));
     }
 
     private AbstractRequestResponse createFetchRequest() {
@@ -91,7 +92,7 @@ public class RequestResponseTest {
 
     private AbstractRequestResponse createFetchResponse() {
         Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
-        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData((short)0, 1000000, ByteBuffer.allocate(10)));
+        responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
         return new FetchResponse(responseData);
     }
 
@@ -100,7 +101,7 @@ public class RequestResponseTest {
     }
 
     private AbstractRequestResponse createHeartBeatResponse() {
-        return new HeartbeatResponse((short)0);
+        return new HeartbeatResponse(Errors.NONE.code());
     }
 
     private AbstractRequestResponse createJoinGroupRequest() {
@@ -108,7 +109,7 @@ public class RequestResponseTest {
     }
 
     private AbstractRequestResponse createJoinGroupResponse() {
-        return new JoinGroupResponse((short)0, 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
+        return new JoinGroupResponse(Errors.NONE.code(), 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
     }
 
     private AbstractRequestResponse createListOffsetRequest() {
@@ -119,7 +120,7 @@ public class RequestResponseTest {
 
     private AbstractRequestResponse createListOffsetResponse() {
         Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
-        responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData((short)0, Arrays.asList(100L)));
+        responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
         return new ListOffsetResponse(responseData);
     }
 
@@ -145,7 +146,7 @@ public class RequestResponseTest {
 
     private AbstractRequestResponse createOffsetCommitResponse() {
         Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
-        responseData.put(new TopicPartition("test", 0), (short)0);
+        responseData.put(new TopicPartition("test", 0), Errors.NONE.code());
         return new OffsetCommitResponse(responseData);
     }
 
@@ -155,19 +156,19 @@ public class RequestResponseTest {
 
     private AbstractRequestResponse createOffsetFetchResponse() {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
-        responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", (short)0));
+        responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code()));
         return new OffsetFetchResponse(responseData);
     }
 
     private AbstractRequestResponse createProduceRequest() {
         Map<TopicPartition, ByteBuffer> produceData = new HashMap<TopicPartition, ByteBuffer>();
         produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10));
-        return new ProduceRequest((short)0, 5000, produceData);
+        return new ProduceRequest(Errors.NONE.code(), 5000, produceData);
     }
 
     private AbstractRequestResponse createProduceResponse() {
         Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
-        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse((short) 0, 10000));
+        responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000));
         return new ProduceResponse(responseData);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index b6e1497..f5cd61c 100644
--- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -35,13 +35,13 @@ public class SerializationTest {
 
     @Test
     public void testStringSerializer() {
-       String str = "my string";
+        String str = "my string";
         String mytopic = "testTopic";
         List<String> encodings = new ArrayList<String>();
         encodings.add("UTF8");
         encodings.add("UTF-16");
 
-        for ( String encoding : encodings) {
+        for (String encoding : encodings) {
             SerDeser<String> serDeser = getStringSerDeser(encoding);
             Serializer<String> serializer = serDeser.serializer;
             Deserializer<String> deserializer = serDeser.deserializer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
deleted file mode 100644
index 6e37ea5..0000000
--- a/clients/src/test/java/org/apache/kafka/common/utils/ClientUtilsTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import org.apache.kafka.common.config.ConfigException;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-public class ClientUtilsTest {
-
-    @Test
-    public void testParseAndValidateAddresses() {
-        check("127.0.0.1:8000");
-        check("mydomain.com:8080");
-        check("[::1]:8000");
-        check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000");
-    }
-
-    @Test(expected = ConfigException.class)
-    public void testNoPort() {
-        check("127.0.0.1");
-    }
-
-    private void check(String... url) {
-        ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
index 6b32381..c39c3cf 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/CrcTest.java
@@ -25,7 +25,7 @@ public class CrcTest {
 
     @Test
     public void testUpdate() {
-        final byte bytes[] = "Any String you want".getBytes();
+        final byte[] bytes = "Any String you want".getBytes();
         final int len = bytes.length;
 
         Crc32 crc1 = new Crc32();
@@ -33,10 +33,10 @@ public class CrcTest {
         Crc32 crc3 = new Crc32();
 
         crc1.update(bytes, 0, len);
-        for(int i = 0; i < len; i++)
+        for (int i = 0; i < len; i++)
             crc2.update(bytes[i]);
-        crc3.update(bytes, 0, len/2);
-        crc3.update(bytes, len/2, len-len/2);
+        crc3.update(bytes, 0, len / 2);
+        crc3.update(bytes, len / 2, len - len / 2);
 
         assertEquals("Crc values should be the same", crc1.getValue(), crc2.getValue());
         assertEquals("Crc values should be the same", crc1.getValue(), crc3.getValue());

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
index b24d4de..8cd19b2 100644
--- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -162,7 +162,6 @@ public class Microbenchmarks {
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
                 public void run() {
-                    int sum = 0;
                     long start = System.nanoTime();
                     for (int j = 0; j < iters; j++)
                         map.get(keys.get(j % threads.size()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 76a17e8..20dba7b 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -35,15 +35,15 @@ import org.apache.kafka.common.PartitionInfo;
  */
 public class TestUtils {
 
-    public static File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
+    public static final File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
 
-    public static String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
-    public static String DIGITS = "0123456789";
-    public static String LETTERS_AND_DIGITS = LETTERS + DIGITS;
+    public static final String LETTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+    public static final String DIGITS = "0123456789";
+    public static final String LETTERS_AND_DIGITS = LETTERS + DIGITS;
 
     /* A consistent random number generator to make tests repeatable */
-    public static final Random seededRandom = new Random(192348092834L);
-    public static final Random random = new Random();
+    public static final Random SEEDED_RANDOM = new Random(192348092834L);
+    public static final Random RANDOM = new Random();
 
     public static Cluster singletonCluster(String topic, int partitions) {
         return clusterWith(1, topic, partitions);
@@ -92,7 +92,7 @@ public class TestUtils {
      */
     public static byte[] randomBytes(int size) {
         byte[] bytes = new byte[size];
-        seededRandom.nextBytes(bytes);
+        SEEDED_RANDOM.nextBytes(bytes);
         return bytes;
     }
 
@@ -105,7 +105,7 @@ public class TestUtils {
     public static String randomString(int len) {
         StringBuilder b = new StringBuilder();
         for (int i = 0; i < len; i++)
-            b.append(LETTERS_AND_DIGITS.charAt(seededRandom.nextInt(LETTERS_AND_DIGITS.length())));
+            b.append(LETTERS_AND_DIGITS.charAt(SEEDED_RANDOM.nextInt(LETTERS_AND_DIGITS.length())));
         return b.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
index facf509..7f45a90 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerRebalanceListener.java
@@ -17,9 +17,6 @@
 
 package kafka.javaapi.consumer;
 
-import kafka.common.TopicAndPartition;
-import kafka.consumer.ConsumerThreadId;
-
 import java.util.Map;
 import java.util.Set;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/message/CompressionFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala
index c721040..b047f68 100644
--- a/core/src/main/scala/kafka/message/CompressionFactory.scala
+++ b/core/src/main/scala/kafka/message/CompressionFactory.scala
@@ -22,7 +22,7 @@ import java.util.zip.GZIPOutputStream
 import java.util.zip.GZIPInputStream
 import java.io.InputStream
 
-import org.apache.kafka.common.message.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
+import org.apache.kafka.common.record.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
 
 object CompressionFactory {
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
index 7909d25..026d819 100644
--- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
+++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
@@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class KafkaMigrationTool
 {
-  private static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
+  private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName());
   private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
   private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
   private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
@@ -194,7 +194,7 @@ public class KafkaMigrationTool
       kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
       /** Disable shallow iteration because the message format is different between 07 and 08, we have to get each individual message **/
       if(kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) {
-        logger.warn("Shallow iterator should not be used in the migration tool");
+        log.warn("Shallow iterator should not be used in the migration tool");
         kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
       }
       Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);
@@ -230,7 +230,7 @@ public class KafkaMigrationTool
           try {
             ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07);
           } catch(Exception e) {
-            logger.error("Error while shutting down Kafka consumer", e);
+            log.error("Error while shutting down Kafka consumer", e);
           }
           for(MigrationThread migrationThread : migrationThreads) {
             migrationThread.shutdown();
@@ -241,7 +241,7 @@ public class KafkaMigrationTool
           for(ProducerThread producerThread : producerThreads) {
             producerThread.awaitShutdown();
           }
-          logger.info("Kafka migration tool shutdown successfully");
+          log.info("Kafka migration tool shutdown successfully");
         }
       });
 
@@ -266,7 +266,7 @@ public class KafkaMigrationTool
     }
     catch (Throwable e){
       System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
-      logger.error("Kafka migration tool failed: ", e);
+      log.error("Kafka migration tool failed: ", e);
     }
   }
 
@@ -388,7 +388,7 @@ public class KafkaMigrationTool
           KeyedMessage<byte[], byte[]> data = producerDataChannel.receiveRequest();
           if(!data.equals(shutdownMessage)) {
             producer.send(data);
-            if(logger.isDebugEnabled()) logger.debug("Sending message %s".format(new String(data.message())));
+            if(logger.isDebugEnabled()) logger.debug(String.format("Sending message %s", new String(data.message())));
           }
           else
             break;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/core/src/main/scala/kafka/utils/Crc32.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Crc32.java b/core/src/main/scala/kafka/utils/Crc32.java
index af9fe0d..0e0e7bc 100644
--- a/core/src/main/scala/kafka/utils/Crc32.java
+++ b/core/src/main/scala/kafka/utils/Crc32.java
@@ -62,16 +62,16 @@ public class Crc32 implements Checksum {
       final int c1 =(b[off+1] ^ (localCrc >>>= 8)) & 0xff;
       final int c2 =(b[off+2] ^ (localCrc >>>= 8)) & 0xff;
       final int c3 =(b[off+3] ^ (localCrc >>>= 8)) & 0xff;
-      localCrc = (T[T8_7_start + c0] ^ T[T8_6_start + c1])
-          ^ (T[T8_5_start + c2] ^ T[T8_4_start + c3]);
+      localCrc = (T[T8_7_START + c0] ^ T[T8_6_START + c1])
+          ^ (T[T8_5_START + c2] ^ T[T8_4_START + c3]);
 
       final int c4 = b[off+4] & 0xff;
       final int c5 = b[off+5] & 0xff;
       final int c6 = b[off+6] & 0xff;
       final int c7 = b[off+7] & 0xff;
 
-      localCrc ^= (T[T8_3_start + c4] ^ T[T8_2_start + c5])
-           ^ (T[T8_1_start + c6] ^ T[T8_0_start + c7]);
+      localCrc ^= (T[T8_3_START + c4] ^ T[T8_2_START + c5])
+           ^ (T[T8_1_START + c6] ^ T[T8_0_START + c7]);
 
       off += 8;
       len -= 8;
@@ -79,13 +79,13 @@ public class Crc32 implements Checksum {
 
     /* loop unroll - duff's device style */
     switch(len) {
-      case 7: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
-      case 6: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
-      case 5: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
-      case 4: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
-      case 3: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
-      case 2: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
-      case 1: localCrc = (localCrc >>> 8) ^ T[T8_0_start + ((localCrc ^ b[off++]) & 0xff)];
+      case 7: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+      case 6: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+      case 5: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+      case 4: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+      case 3: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+      case 2: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
+      case 1: localCrc = (localCrc >>> 8) ^ T[T8_0_START + ((localCrc ^ b[off++]) & 0xff)];
       default:
         /* nothing */
     }
@@ -96,21 +96,21 @@ public class Crc32 implements Checksum {
 
   @Override
   final public void update(int b) {
-    crc = (crc >>> 8) ^ T[T8_0_start + ((crc ^ b) & 0xff)];
+    crc = (crc >>> 8) ^ T[T8_0_START + ((crc ^ b) & 0xff)];
   }
 
   /*
    * CRC-32 lookup tables generated by the polynomial 0xEDB88320.
    * See also TestPureJavaCrc32.Table.
    */
-  private static final int T8_0_start = 0*256;
-  private static final int T8_1_start = 1*256;
-  private static final int T8_2_start = 2*256;
-  private static final int T8_3_start = 3*256;
-  private static final int T8_4_start = 4*256;
-  private static final int T8_5_start = 5*256;
-  private static final int T8_6_start = 6*256;
-  private static final int T8_7_start = 7*256;
+  private static final int T8_0_START = 0*256;
+  private static final int T8_1_START = 1*256;
+  private static final int T8_2_START = 2*256;
+  private static final int T8_3_START = 3*256;
+  private static final int T8_4_START = 4*256;
+  private static final int T8_5_START = 5*256;
+  private static final int T8_6_START = 6*256;
+  private static final int T8_7_START = 7*256;
 
   private static final int[] T = new int[] {
   	/* T8_0 */

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
index c79192c..0d66fe5 100644
--- a/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
+++ b/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
@@ -22,7 +22,7 @@ import kafka.javaapi.FetchResponse;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
@@ -71,10 +71,9 @@ public class SimpleConsumerDemo {
       printMessages((ByteBufferMessageSet) fetchResponse.messageSet(KafkaProperties.topic2, 0));
 
     System.out.println("Testing single multi-fetch");
-    Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>() {{
-        put(KafkaProperties.topic2, new ArrayList<Integer>(){{ add(0); }});
-        put(KafkaProperties.topic3, new ArrayList<Integer>(){{ add(0); }});
-    }};
+    Map<String, List<Integer>> topicMap = new HashMap<String, List<Integer>>();
+    topicMap.put(KafkaProperties.topic2, Collections.singletonList(0));
+    topicMap.put(KafkaProperties.topic3, Collections.singletonList(0));
     req = new FetchRequestBuilder()
             .clientId(KafkaProperties.clientId)
             .addFetch(KafkaProperties.topic2, 0, 0L, 100)


Mime
View raw message