kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [03/13] Rename client package from kafka.* to org.apache.kafka.*
Date Fri, 07 Feb 2014 00:26:34 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
new file mode 100644
index 0000000..96428a9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
@@ -0,0 +1,18 @@
+package org.apache.kafka.common.utils;
+
+/**
+ * A wrapper for Thread that sets things up nicely
+ */
+public class KafkaThread extends Thread {
+
+    public KafkaThread(String name, Runnable runnable, boolean daemon) {
+        super(runnable, name);
+        setDaemon(daemon);
+        setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            public void uncaughtException(Thread t, Throwable e) {
+                e.printStackTrace();
+            }
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
new file mode 100644
index 0000000..a3a91a9
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
@@ -0,0 +1,26 @@
+package org.apache.kafka.common.utils;
+
+/**
+ * A time implementation that uses the system clock and sleep call
+ */
+public class SystemTime implements Time {
+
+    @Override
+    public long milliseconds() {
+        return System.currentTimeMillis();
+    }
+
+    public long nanoseconds() {
+        return System.nanoTime();
+    }
+
+    @Override
+    public void sleep(long ms) {
+        try {
+            Thread.sleep(ms);
+        } catch (InterruptedException e) {
+            // no stress
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/utils/Time.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Time.java b/clients/src/main/java/org/apache/kafka/common/utils/Time.java
new file mode 100644
index 0000000..5d68915
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Time.java
@@ -0,0 +1,23 @@
+package org.apache.kafka.common.utils;
+
+/**
+ * An interface abstracting the clock to use in unit testing classes that make use of clock time
+ */
+public interface Time {
+
+    /**
+     * The current time in milliseconds
+     */
+    public long milliseconds();
+
+    /**
+     * The current time in nanoseconds
+     */
+    public long nanoseconds();
+
+    /**
+     * Sleep for the given number of milliseconds
+     */
+    public void sleep(long ms);
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
new file mode 100644
index 0000000..146eb76
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -0,0 +1,231 @@
+package org.apache.kafka.common.utils;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+
+import org.apache.kafka.common.KafkaException;
+
+
+public class Utils {
+
+    /**
+     * Turn the given UTF8 byte array into a string
+     * 
+     * @param bytes The byte array
+     * @return The string
+     */
+    public static String utf8(byte[] bytes) {
+        try {
+            return new String(bytes, "UTF8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException("This shouldn't happen.", e);
+        }
+    }
+
+    /**
+     * Turn a string into a utf8 byte[]
+     * 
+     * @param string The string
+     * @return The byte[]
+     */
+    public static byte[] utf8(String string) {
+        try {
+            return string.getBytes("UTF8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException("This shouldn't happen.", e);
+        }
+    }
+
+    /**
+     * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes
+     * 
+     * @param buffer The buffer to read from
+     * @return The integer read, as a long to avoid signedness
+     */
+    public static long readUnsignedInt(ByteBuffer buffer) {
+        return buffer.getInt() & 0xffffffffL;
+    }
+
+    /**
+     * Read an unsigned integer from the given position without modifying the buffers position
+     * 
+     * @param buffer the buffer to read from
+     * @param index the index from which to read the integer
+     * @return The integer read, as a long to avoid signedness
+     */
+    public static long readUnsignedInt(ByteBuffer buffer, int index) {
+        return buffer.getInt(index) & 0xffffffffL;
+    }
+
+    /**
+     * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+     * 
+     * @param buffer The buffer to write to
+     * @param value The value to write
+     */
+    public static void writetUnsignedInt(ByteBuffer buffer, long value) {
+        buffer.putInt((int) (value & 0xffffffffL));
+    }
+
+    /**
+     * Write the given long value as a 4 byte unsigned integer. Overflow is ignored.
+     * 
+     * @param buffer The buffer to write to
+     * @param index The position in the buffer at which to begin writing
+     * @param value The value to write
+     */
+    public static void writeUnsignedInt(ByteBuffer buffer, int index, long value) {
+        buffer.putInt(index, (int) (value & 0xffffffffL));
+    }
+
+    /**
+     * Compute the CRC32 of the byte array
+     * 
+     * @param bytes The array to compute the checksum for
+     * @return The CRC32
+     */
+    public static long crc32(byte[] bytes) {
+        return crc32(bytes, 0, bytes.length);
+    }
+
+    /**
+     * Compute the CRC32 of the segment of the byte array given by the specificed size and offset
+     * 
+     * @param bytes The bytes to checksum
+     * @param offset the offset at which to begin checksumming
+     * @param size the number of bytes to checksum
+     * @return The CRC32
+     */
+    public static long crc32(byte[] bytes, int offset, int size) {
+        Crc32 crc = new Crc32();
+        crc.update(bytes, offset, size);
+        return crc.getValue();
+    }
+
+    /**
+     * Get the absolute value of the given number. If the number is Int.MinValue return 0. This is different from
+     * java.lang.Math.abs or scala.math.abs in that they return Int.MinValue (!).
+     */
+    public static int abs(int n) {
+        return n & 0x7fffffff;
+    }
+
+    /**
+     * Get the length for UTF8-encoding a string without encoding it first
+     * 
+     * @param s The string to calculate the length for
+     * @return The length when serialized
+     */
+    public static int utf8Length(CharSequence s) {
+        int count = 0;
+        for (int i = 0, len = s.length(); i < len; i++) {
+            char ch = s.charAt(i);
+            if (ch <= 0x7F) {
+                count++;
+            } else if (ch <= 0x7FF) {
+                count += 2;
+            } else if (Character.isHighSurrogate(ch)) {
+                count += 4;
+                ++i;
+            } else {
+                count += 3;
+            }
+        }
+        return count;
+    }
+
+    /**
+     * Read the given byte buffer into a byte array
+     */
+    public static byte[] toArray(ByteBuffer buffer) {
+        return toArray(buffer, 0, buffer.limit());
+    }
+
+    /**
+     * Read a byte array from the given offset and size in the buffer
+     */
+    public static byte[] toArray(ByteBuffer buffer, int offset, int size) {
+        byte[] dest = new byte[size];
+        if (buffer.hasArray()) {
+            System.arraycopy(buffer.array(), buffer.arrayOffset() + offset, dest, 0, size);
+        } else {
+            int pos = buffer.position();
+            buffer.get(dest);
+            buffer.position(pos);
+        }
+        return dest;
+    }
+
+    /**
+     * Check that the parameter t is not null
+     * 
+     * @param t The object to check
+     * @return t if it isn't null
+     * @throws NullPointerException if t is null.
+     */
+    public static <T> T notNull(T t) {
+        if (t == null)
+            throw new NullPointerException();
+        else
+            return t;
+    }
+
+    /**
+     * Instantiate the class
+     */
+    public static Object newInstance(Class<?> c) {
+        try {
+            return c.newInstance();
+        } catch (IllegalAccessException e) {
+            throw new KafkaException("Could not instantiate class " + c.getName(), e);
+        } catch (InstantiationException e) {
+            throw new KafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e);
+        }
+    }
+
+    /**
+     * Generates 32 bit murmur2 hash from byte array
+     * @param data byte array to hash
+     * @return 32 bit hash of the given array
+     */
+    public static int murmur2(final byte[] data) {
+        int length = data.length;
+        int seed = 0x9747b28c;
+        // 'm' and 'r' are mixing constants generated offline.
+        // They're not really 'magic', they just happen to work well.
+        final int m = 0x5bd1e995;
+        final int r = 24;
+
+        // Initialize the hash to a random value
+        int h = seed ^ length;
+        int length4 = length / 4;
+
+        for (int i = 0; i < length4; i++) {
+            final int i4 = i * 4;
+            int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
+            k *= m;
+            k ^= k >>> r;
+            k *= m;
+            h *= m;
+            h ^= k;
+        }
+
+        // Handle the last few bytes of the input array
+        switch (length % 4) {
+            case 3:
+                h ^= (data[(length & ~3) + 2] & 0xff) << 16;
+            case 2:
+                h ^= (data[(length & ~3) + 1] & 0xff) << 8;
+            case 1:
+                h ^= (data[length & ~3] & 0xff);
+                h *= m;
+        }
+
+        h ^= h >>> 13;
+        h *= m;
+        h ^= h >>> 15;
+
+        return h;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/clients/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/common/network/SelectorTest.java b/clients/src/test/java/kafka/clients/common/network/SelectorTest.java
deleted file mode 100644
index 68bc9ee..0000000
--- a/clients/src/test/java/kafka/clients/common/network/SelectorTest.java
+++ /dev/null
@@ -1,292 +0,0 @@
-package kafka.clients.common.network;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import kafka.common.network.NetworkReceive;
-import kafka.common.network.NetworkSend;
-import kafka.common.network.Selectable;
-import kafka.common.network.Selector;
-import kafka.common.utils.Utils;
-import kafka.test.TestUtils;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses.
- */
-public class SelectorTest {
-
-    private static final List<NetworkSend> EMPTY = new ArrayList<NetworkSend>();
-    private static final int BUFFER_SIZE = 4 * 1024;
-
-    private EchoServer server;
-    private Selectable selector;
-
-    @Before
-    public void setup() throws Exception {
-        this.server = new EchoServer();
-        this.server.start();
-        this.selector = new Selector();
-    }
-
-    @After
-    public void teardown() throws Exception {
-        this.selector.close();
-        this.server.close();
-    }
-
-    /**
-     * Validate that when the server disconnects, a client send ends up with that node in the disconnected list.
-     */
-    @Test
-    public void testServerDisconnect() throws Exception {
-        int node = 0;
-
-        // connect and do a simple request
-        blockingConnect(node);
-        assertEquals("hello", blockingRequest(node, "hello"));
-
-        // disconnect
-        this.server.closeConnections();
-        while (!selector.disconnected().contains(node))
-            selector.poll(1000L, EMPTY);
-
-        // reconnect and do another request
-        blockingConnect(node);
-        assertEquals("hello", blockingRequest(node, "hello"));
-    }
-
-    /**
-     * Validate that the client can intentionally disconnect and reconnect
-     */
-    @Test
-    public void testClientDisconnect() throws Exception {
-        int node = 0;
-        blockingConnect(node);
-        selector.disconnect(node);
-        selector.poll(10, asList(createSend(node, "hello1")));
-        assertEquals("Request should not have succeeded", 0, selector.completedSends().size());
-        assertEquals("There should be a disconnect", 1, selector.disconnected().size());
-        assertTrue("The disconnect should be from our node", selector.disconnected().contains(node));
-        blockingConnect(node);
-        assertEquals("hello2", blockingRequest(node, "hello2"));
-    }
-
-    /**
-     * Sending a request with one already in flight should result in an exception
-     */
-    @Test(expected = IllegalStateException.class)
-    public void testCantSendWithInProgress() throws Exception {
-        int node = 0;
-        blockingConnect(node);
-        selector.poll(1000L, asList(createSend(node, "test1"), createSend(node, "test2")));
-    }
-
-    /**
-     * Sending a request to a node without an existing connection should result in an exception
-     */
-    @Test(expected = IllegalStateException.class)
-    public void testCantSendWithoutConnecting() throws Exception {
-        selector.poll(1000L, asList(createSend(0, "test")));
-    }
-
-    /**
-     * Sending a request to a node with a bad hostname should result in an exception during connect
-     */
-    @Test(expected = UnresolvedAddressException.class)
-    public void testNoRouteToHost() throws Exception {
-        selector.connect(0, new InetSocketAddress("asdf.asdf.dsc", server.port), BUFFER_SIZE, BUFFER_SIZE);
-    }
-
-    /**
-     * Sending a request to a node not listening on that port should result in disconnection
-     */
-    @Test
-    public void testConnectionRefused() throws Exception {
-        int node = 0;
-        selector.connect(node, new InetSocketAddress("localhost", TestUtils.choosePort()), BUFFER_SIZE, BUFFER_SIZE);
-        while (selector.disconnected().contains(node))
-            selector.poll(1000L, EMPTY);
-    }
-
-    /**
-     * Send multiple requests to several connections in parallel. Validate that responses are received in the order that
-     * requests were sent.
-     */
-    @Test
-    public void testNormalOperation() throws Exception {
-        int conns = 5;
-        int reqs = 500;
-
-        // create connections
-        InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
-        for (int i = 0; i < conns; i++)
-            selector.connect(i, addr, BUFFER_SIZE, BUFFER_SIZE);
-
-        // send echo requests and receive responses
-        int[] requests = new int[conns];
-        int[] responses = new int[conns];
-        int responseCount = 0;
-        List<NetworkSend> sends = new ArrayList<NetworkSend>();
-        for (int i = 0; i < conns; i++)
-            sends.add(createSend(i, i + "-" + 0));
-
-        // loop until we complete all requests
-        while (responseCount < conns * reqs) {
-            // do the i/o
-            selector.poll(0L, sends);
-
-            assertEquals("No disconnects should have occurred.", 0, selector.disconnected().size());
-
-            // handle any responses we may have gotten
-            for (NetworkReceive receive : selector.completedReceives()) {
-                String[] pieces = asString(receive).split("-");
-                assertEquals("Should be in the form 'conn-counter'", 2, pieces.length);
-                assertEquals("Check the source", receive.source(), Integer.parseInt(pieces[0]));
-                assertEquals("Check that the receive has kindly been rewound", 0, receive.payload().position());
-                assertEquals("Check the request counter", responses[receive.source()], Integer.parseInt(pieces[1]));
-                responses[receive.source()]++; // increment the expected counter
-                responseCount++;
-            }
-
-            // prepare new sends for the next round
-            sends.clear();
-            for (NetworkSend send : selector.completedSends()) {
-                int dest = send.destination();
-                requests[dest]++;
-                if (requests[dest] < reqs)
-                    sends.add(createSend(dest, dest + "-" + requests[dest]));
-            }
-        }
-    }
-
-    /**
-     * Validate that we can send and receive a message larger than the receive and send buffer size
-     */
-    @Test
-    public void testSendLargeRequest() throws Exception {
-        int node = 0;
-        blockingConnect(node);
-        String big = TestUtils.randomString(10 * BUFFER_SIZE);
-        assertEquals(big, blockingRequest(node, big));
-    }
-
-    /**
-     * Test sending an empty string
-     */
-    @Test
-    public void testEmptyRequest() throws Exception {
-        int node = 0;
-        blockingConnect(node);
-        assertEquals("", blockingRequest(node, ""));
-    }
-
-    private String blockingRequest(int node, String s) throws IOException {
-        selector.poll(1000L, asList(createSend(node, s)));
-        while (true) {
-            selector.poll(1000L, EMPTY);
-            for (NetworkReceive receive : selector.completedReceives())
-                if (receive.source() == node)
-                    return asString(receive);
-        }
-    }
-
-    /* connect and wait for the connection to complete */
-    private void blockingConnect(int node) throws IOException {
-        selector.connect(node, new InetSocketAddress("localhost", server.port), BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L, EMPTY);
-    }
-
-    private NetworkSend createSend(int node, String s) {
-        return new NetworkSend(node, ByteBuffer.wrap(s.getBytes()));
-    }
-
-    private String asString(NetworkReceive receive) {
-        return new String(Utils.toArray(receive.payload()));
-    }
-
-    /**
-     * A simple server that takes size delimited byte arrays and just echos them back to the sender.
-     */
-    static class EchoServer extends Thread {
-        public final int port;
-        private final ServerSocket serverSocket;
-        private final List<Thread> threads;
-        private final List<Socket> sockets;
-
-        public EchoServer() throws Exception {
-            this.port = TestUtils.choosePort();
-            this.serverSocket = new ServerSocket(port);
-            this.threads = Collections.synchronizedList(new ArrayList<Thread>());
-            this.sockets = Collections.synchronizedList(new ArrayList<Socket>());
-        }
-
-        public void run() {
-            try {
-                while (true) {
-                    final Socket socket = serverSocket.accept();
-                    sockets.add(socket);
-                    Thread thread = new Thread() {
-                        public void run() {
-                            try {
-                                DataInputStream input = new DataInputStream(socket.getInputStream());
-                                DataOutputStream output = new DataOutputStream(socket.getOutputStream());
-                                while (socket.isConnected() && !socket.isClosed()) {
-                                    int size = input.readInt();
-                                    byte[] bytes = new byte[size];
-                                    input.readFully(bytes);
-                                    output.writeInt(size);
-                                    output.write(bytes);
-                                    output.flush();
-                                }
-                            } catch (IOException e) {
-                                // ignore
-                            } finally {
-                                try {
-                                    socket.close();
-                                } catch (IOException e) {
-                                    // ignore
-                                }
-                            }
-                        }
-                    };
-                    thread.start();
-                    threads.add(thread);
-                }
-            } catch (IOException e) {
-                // ignore
-            }
-        }
-
-        public void closeConnections() throws IOException {
-            for (Socket socket : sockets)
-                socket.close();
-        }
-
-        public void close() throws IOException, InterruptedException {
-            this.serverSocket.close();
-            closeConnections();
-            for (Thread t : threads)
-                t.join();
-            join();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java b/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java
deleted file mode 100644
index 70603c4..0000000
--- a/clients/src/test/java/kafka/clients/producer/BufferPoolTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package kafka.clients.producer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import kafka.clients.producer.internals.BufferPool;
-import kafka.test.TestUtils;
-
-import org.junit.Test;
-
-public class BufferPoolTest {
-
-    /**
-     * Test the simple non-blocking allocation paths
-     */
-    @Test
-    public void testSimple() throws Exception {
-        int totalMemory = 64 * 1024;
-        int size = 1024;
-        BufferPool pool = new BufferPool(totalMemory, size, false);
-        ByteBuffer buffer = pool.allocate(size);
-        assertEquals("Buffer size should equal requested size.", size, buffer.limit());
-        assertEquals("Unallocated memory should have shrunk", totalMemory - size, pool.unallocatedMemory());
-        assertEquals("Available memory should have shrunk", totalMemory - size, pool.availableMemory());
-        buffer.putInt(1);
-        buffer.flip();
-        pool.deallocate(buffer);
-        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
-        assertEquals("But now some is on the free list", totalMemory - size, pool.unallocatedMemory());
-        buffer = pool.allocate(size);
-        assertEquals("Recycled buffer should be cleared.", 0, buffer.position());
-        assertEquals("Recycled buffer should be cleared.", buffer.capacity(), buffer.limit());
-        pool.deallocate(buffer);
-        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
-        assertEquals("Still a single buffer on the free list", totalMemory - size, pool.unallocatedMemory());
-        buffer = pool.allocate(2 * size);
-        pool.deallocate(buffer);
-        assertEquals("All memory should be available", totalMemory, pool.availableMemory());
-        assertEquals("Non-standard size didn't go to the free list.", totalMemory - size, pool.unallocatedMemory());
-    }
-
-    /**
-     * Test that we cannot try to allocate more memory then we have in the whole pool
-     */
-    @Test(expected = IllegalArgumentException.class)
-    public void testCantAllocateMoreMemoryThanWeHave() throws Exception {
-        BufferPool pool = new BufferPool(1024, 512, true);
-        ByteBuffer buffer = pool.allocate(1024);
-        assertEquals(1024, buffer.limit());
-        pool.deallocate(buffer);
-        buffer = pool.allocate(1025);
-    }
-
-    @Test
-    public void testNonblockingMode() throws Exception {
-        BufferPool pool = new BufferPool(2, 1, false);
-        pool.allocate(1);
-        try {
-            pool.allocate(2);
-            fail("The buffer allocated more than it has!");
-        } catch (BufferExhaustedException e) {
-            // this is good
-        }
-    }
-
-    /**
-     * Test that delayed allocation blocks
-     */
-    @Test
-    public void testDelayedAllocation() throws Exception {
-        BufferPool pool = new BufferPool(5 * 1024, 1024, true);
-        ByteBuffer buffer = pool.allocate(1024);
-        CountDownLatch doDealloc = asyncDeallocate(pool, buffer);
-        CountDownLatch allocation = asyncAllocate(pool, 5 * 1024);
-        assertEquals("Allocation shouldn't have happened yet, waiting on memory.", 1, allocation.getCount());
-        doDealloc.countDown(); // return the memory
-        allocation.await();
-    }
-
-    private CountDownLatch asyncDeallocate(final BufferPool pool, final ByteBuffer buffer) {
-        final CountDownLatch latch = new CountDownLatch(1);
-        new Thread() {
-            public void run() {
-                try {
-                    latch.await();
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-                pool.deallocate(buffer);
-            }
-        }.start();
-        return latch;
-    }
-
-    private CountDownLatch asyncAllocate(final BufferPool pool, final int size) {
-        final CountDownLatch completed = new CountDownLatch(1);
-        new Thread() {
-            public void run() {
-                try {
-                    pool.allocate(size);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                } finally {
-                    completed.countDown();
-                }
-            }
-        }.start();
-        return completed;
-    }
-
-    /**
-     * This test creates lots of threads that hammer on the pool
-     */
-    @Test
-    public void testStressfulSituation() throws Exception {
-        int numThreads = 10;
-        final int iterations = 50000;
-        final int poolableSize = 1024;
-        final int totalMemory = numThreads / 2 * poolableSize;
-        final BufferPool pool = new BufferPool(totalMemory, poolableSize, true);
-        List<StressTestThread> threads = new ArrayList<StressTestThread>();
-        for (int i = 0; i < numThreads; i++)
-            threads.add(new StressTestThread(pool, iterations));
-        for (StressTestThread thread : threads)
-            thread.start();
-        for (StressTestThread thread : threads)
-            thread.join();
-        for (StressTestThread thread : threads)
-            assertTrue("Thread should have completed all iterations successfully.", thread.success.get());
-        assertEquals(totalMemory, pool.availableMemory());
-    }
-
-    public static class StressTestThread extends Thread {
-        private final int iterations;
-        private final BufferPool pool;
-        public final AtomicBoolean success = new AtomicBoolean(false);
-
-        public StressTestThread(BufferPool pool, int iterations) {
-            this.iterations = iterations;
-            this.pool = pool;
-        }
-
-        public void run() {
-            try {
-                for (int i = 0; i < iterations; i++) {
-                    int size;
-                    if (TestUtils.random.nextBoolean())
-                        // allocate poolable size
-                        size = pool.poolableSize();
-                    else
-                        // allocate a random size
-                        size = TestUtils.random.nextInt((int) pool.totalMemory());
-                    ByteBuffer buffer = pool.allocate(size);
-                    pool.deallocate(buffer);
-                }
-                success.set(true);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/kafka/clients/producer/MetadataTest.java
deleted file mode 100644
index dd45209..0000000
--- a/clients/src/test/java/kafka/clients/producer/MetadataTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package kafka.clients.producer;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import kafka.clients.producer.internals.Metadata;
-import kafka.common.Cluster;
-import kafka.test.TestUtils;
-
-import org.junit.Test;
-
-public class MetadataTest {
-
-    private long refreshBackoffMs = 100;
-    private long metadataExpireMs = 1000;
-    private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
-
-    @Test
-    public void testMetadata() throws Exception {
-        long time = 0;
-        metadata.update(Cluster.empty(), time);
-        assertFalse("No update needed.", metadata.needsUpdate(time));
-        metadata.forceUpdate();
-        assertFalse("Still no updated needed due to backoff", metadata.needsUpdate(time));
-        time += refreshBackoffMs;
-        assertTrue("Update needed now that backoff time expired", metadata.needsUpdate(time));
-        String topic = "my-topic";
-        Thread t1 = asyncFetch(topic);
-        Thread t2 = asyncFetch(topic);
-        assertTrue("Awaiting update", t1.isAlive());
-        assertTrue("Awaiting update", t2.isAlive());
-        metadata.update(TestUtils.singletonCluster(topic, 1), time);
-        t1.join();
-        t2.join();
-        assertFalse("No update needed.", metadata.needsUpdate(time));
-        time += metadataExpireMs;
-        assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time));
-    }
-
-    private Thread asyncFetch(final String topic) {
-        Thread thread = new Thread() {
-            public void run() {
-                metadata.fetch(topic, Integer.MAX_VALUE);
-            }
-        };
-        thread.start();
-        return thread;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
deleted file mode 100644
index 24b132f..0000000
--- a/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-package kafka.clients.producer;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.junit.Test;
-
-public class MockProducerTest {
-
-    private String topic = "topic";
-
-    @Test
-    public void testAutoCompleteMock() throws Exception {
-        MockProducer producer = new MockProducer(true);
-        ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes());
-        Future<RecordMetadata> metadata = producer.send(record);
-        assertTrue("Send should be immediately complete", metadata.isDone());
-        assertFalse("Send should be successful", isError(metadata));
-        assertEquals("Offset should be 0", 0, metadata.get().offset());
-        assertEquals(topic, metadata.get().topic());
-        assertEquals("We should have the record in our history", asList(record), producer.history());
-        producer.clear();
-        assertEquals("Clear should erase our history", 0, producer.history().size());
-    }
-
-    @Test
-    public void testManualCompletion() throws Exception {
-        MockProducer producer = new MockProducer(false);
-        ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes());
-        ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes());
-        Future<RecordMetadata> md1 = producer.send(record1);
-        assertFalse("Send shouldn't have completed", md1.isDone());
-        Future<RecordMetadata> md2 = producer.send(record2);
-        assertFalse("Send shouldn't have completed", md2.isDone());
-        assertTrue("Complete the first request", producer.completeNext());
-        assertFalse("Requst should be successful", isError(md1));
-        assertFalse("Second request still incomplete", md2.isDone());
-        IllegalArgumentException e = new IllegalArgumentException("blah");
-        assertTrue("Complete the second request with an error", producer.errorNext(e));
-        try {
-            md2.get();
-            fail("Expected error to be thrown");
-        } catch (ExecutionException err) {
-            assertEquals(e, err.getCause());
-        }
-        assertFalse("No more requests to complete", producer.completeNext());
-    }
-
-    private boolean isError(Future<?> future) {
-        try {
-            future.get();
-            return false;
-        } catch (Exception e) {
-            return true;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/kafka/clients/producer/PartitionerTest.java
deleted file mode 100644
index c18da76..0000000
--- a/clients/src/test/java/kafka/clients/producer/PartitionerTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package kafka.clients.producer;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-import kafka.clients.producer.internals.Partitioner;
-import kafka.common.Cluster;
-import kafka.common.Node;
-import kafka.common.PartitionInfo;
-
-import org.junit.Test;
-
-public class PartitionerTest {
-
-    private byte[] key = "key".getBytes();
-    private byte[] value = "value".getBytes();
-    private Partitioner partitioner = new Partitioner();
-    private Node node0 = new Node(0, "localhost", 99);
-    private Node node1 = new Node(1, "localhost", 100);
-    private Node node2 = new Node(2, "localhost", 101);
-    private Node[] nodes = new Node[] { node0, node1, node2 };
-    private String topic = "test";
-    private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 0, node0, nodes, nodes),
-                                                    new PartitionInfo(topic, 1, node1, nodes, nodes),
-                                                    new PartitionInfo(topic, 2, null, nodes, nodes));
-    private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
-
-    @Test
-    public void testUserSuppliedPartitioning() {
-        assertEquals("If the user supplies a partition we should use it.",
-                     0,
-                     partitioner.partition(new ProducerRecord("test", 0, key, value), cluster));
-    }
-
-    @Test
-    public void testKeyPartitionIsStable() {
-        int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster);
-        assertEquals("Same key should yield same partition",
-                     partition,
-                     partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()), cluster));
-    }
-
-    @Test
-    public void testRoundRobinWithDownNode() {
-        for (int i = 0; i < partitions.size(); i++) {
-            int part = partitioner.partition(new ProducerRecord("test", value), cluster);
-            assertTrue("We should never choose a leader-less node in round robin", part >= 0 && part < 2);
-
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java
deleted file mode 100644
index b1ab361..0000000
--- a/clients/src/test/java/kafka/clients/producer/RecordAccumulatorTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-package kafka.clients.producer;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import kafka.clients.producer.internals.RecordAccumulator;
-import kafka.clients.producer.internals.RecordBatch;
-import kafka.common.TopicPartition;
-import kafka.common.metrics.Metrics;
-import kafka.common.record.CompressionType;
-import kafka.common.record.LogEntry;
-import kafka.common.record.Record;
-import kafka.common.record.Records;
-import kafka.common.utils.MockTime;
-
-import org.junit.Test;
-
-public class RecordAccumulatorTest {
-
-    private TopicPartition tp = new TopicPartition("test", 0);
-    private MockTime time = new MockTime();
-    private byte[] key = "key".getBytes();
-    private byte[] value = "value".getBytes();
-    private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
-    private Metrics metrics = new Metrics(time);
-
-    @Test
-    public void testFull() throws Exception {
-        long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, false, metrics, time);
-        int appends = 1024 / msgSize;
-        for (int i = 0; i < appends; i++) {
-            accum.append(tp, key, value, CompressionType.NONE, null);
-            assertEquals("No partitions should be ready.", 0, accum.ready(now).size());
-        }
-        accum.append(tp, key, value, CompressionType.NONE, null);
-        assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
-        List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE);
-        assertEquals(1, batches.size());
-        RecordBatch batch = batches.get(0);
-        Iterator<LogEntry> iter = batch.records.iterator();
-        for (int i = 0; i < appends; i++) {
-            LogEntry entry = iter.next();
-            assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
-            assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
-        }
-        assertFalse("No more records", iter.hasNext());
-    }
-
-    @Test
-    public void testAppendLarge() throws Exception {
-        int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, false, metrics, time);
-        accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null);
-        assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
-    }
-
-    @Test
-    public void testLinger() throws Exception {
-        long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, false, metrics, time);
-        accum.append(tp, key, value, CompressionType.NONE, null);
-        assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
-        time.sleep(10);
-        assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
-        List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE);
-        assertEquals(1, batches.size());
-        RecordBatch batch = batches.get(0);
-        Iterator<LogEntry> iter = batch.records.iterator();
-        LogEntry entry = iter.next();
-        assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
-        assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
-        assertFalse("No more records", iter.hasNext());
-    }
-
-    @Test
-    public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, false, metrics, time);
-        int appends = 1024 / msgSize + 1;
-        List<TopicPartition> partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1));
-        for (TopicPartition tp : partitions) {
-            for (int i = 0; i < appends; i++)
-                accum.append(tp, key, value, CompressionType.NONE, null);
-        }
-        assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size());
-
-        List<RecordBatch> batches = accum.drain(partitions, 1024);
-        assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
-    }
-
-    @Test
-    public void testStressfulSituation() throws Exception {
-        final int numThreads = 5;
-        final int msgs = 10000;
-        final int numParts = 10;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, true, metrics, time);
-        List<Thread> threads = new ArrayList<Thread>();
-        for (int i = 0; i < numThreads; i++) {
-            threads.add(new Thread() {
-                public void run() {
-                    for (int i = 0; i < msgs; i++) {
-                        try {
-                            accum.append(new TopicPartition("test", i % numParts), key, value, CompressionType.NONE, null);
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            });
-        }
-        for (Thread t : threads)
-            t.start();
-        int read = 0;
-        long now = time.milliseconds();
-        while (read < numThreads * msgs) {
-            List<TopicPartition> tps = accum.ready(now);
-            List<RecordBatch> batches = accum.drain(tps, 5 * 1024);
-            for (RecordBatch batch : batches) {
-                for (LogEntry entry : batch.records)
-                    read++;
-            }
-            accum.deallocate(batches);
-        }
-
-        for (Thread t : threads)
-            t.join();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
deleted file mode 100644
index 804c57b..0000000
--- a/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package kafka.clients.producer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import kafka.clients.producer.internals.FutureRecordMetadata;
-import kafka.clients.producer.internals.ProduceRequestResult;
-import kafka.common.TopicPartition;
-import kafka.common.errors.CorruptRecordException;
-
-import org.junit.Test;
-
-public class RecordSendTest {
-
-    private TopicPartition topicPartition = new TopicPartition("test", 0);
-    private long baseOffset = 45;
-    private long relOffset = 5;
-
-    /**
-     * Test that waiting on a request that never completes times out
-     */
-    @Test
-    public void testTimeout() throws Exception {
-        ProduceRequestResult request = new ProduceRequestResult();
-        FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset);
-        assertFalse("Request is not completed", future.isDone());
-        try {
-            future.get(5, TimeUnit.MILLISECONDS);
-            fail("Should have thrown exception.");
-        } catch (TimeoutException e) { /* this is good */
-        }
-
-        request.done(topicPartition, baseOffset, null);
-        assertTrue(future.isDone());
-        assertEquals(baseOffset + relOffset, future.get().offset());
-    }
-
-    /**
-     * Test that an asynchronous request will eventually throw the right exception
-     */
-    @Test(expected = ExecutionException.class)
-    public void testError() throws Exception {
-        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), relOffset);
-        future.get();
-    }
-
-    /**
-     * Test that an asynchronous request will eventually return the right offset
-     */
-    @Test
-    public void testBlocking() throws Exception {
-        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), relOffset);
-        assertEquals(baseOffset + relOffset, future.get().offset());
-    }
-
-    /* create a new request result that will be completed after the given timeout */
-    public ProduceRequestResult asyncRequest(final long baseOffset, final RuntimeException error, final long timeout) {
-        final ProduceRequestResult request = new ProduceRequestResult();
-        new Thread() {
-            public void run() {
-                try {
-                    sleep(timeout);
-                    request.done(topicPartition, baseOffset, error);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-        }.start();
-        return request;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/SenderTest.java b/clients/src/test/java/kafka/clients/producer/SenderTest.java
deleted file mode 100644
index 8788095..0000000
--- a/clients/src/test/java/kafka/clients/producer/SenderTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package kafka.clients.producer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Future;
-
-import kafka.clients.producer.internals.Metadata;
-import kafka.clients.producer.internals.RecordAccumulator;
-import kafka.clients.producer.internals.Sender;
-import kafka.common.Cluster;
-import kafka.common.TopicPartition;
-import kafka.common.metrics.Metrics;
-import kafka.common.network.NetworkReceive;
-import kafka.common.protocol.ApiKeys;
-import kafka.common.protocol.Errors;
-import kafka.common.protocol.ProtoUtils;
-import kafka.common.protocol.types.Struct;
-import kafka.common.record.CompressionType;
-import kafka.common.requests.RequestSend;
-import kafka.common.requests.ResponseHeader;
-import kafka.common.utils.MockTime;
-import kafka.test.MockSelector;
-import kafka.test.TestUtils;
-
-import org.junit.Before;
-import org.junit.Test;
-
-public class SenderTest {
-
-    private MockTime time = new MockTime();
-    private MockSelector selector = new MockSelector(time);
-    private int batchSize = 16 * 1024;
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-    private Cluster cluster = TestUtils.singletonCluster("test", 1);
-    private Metrics metrics = new Metrics(time);
-    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time);
-    private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024, 0L, (short) -1, 10000, time);
-
-    @Before
-    public void setup() {
-        metadata.update(cluster, time.milliseconds());
-    }
-
-    @Test
-    public void testSimple() throws Exception {
-        TopicPartition tp = new TopicPartition("test", 0);
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
-        sender.run(time.milliseconds());
-        assertEquals("We should have connected", 1, selector.connected().size());
-        selector.clear();
-        sender.run(time.milliseconds());
-        assertEquals("Single request should be sent", 1, selector.completedSends().size());
-        RequestSend request = (RequestSend) selector.completedSends().get(0);
-        selector.clear();
-        long offset = 42;
-        selector.completeReceive(produceResponse(request.header().correlationId(),
-                                                 cluster.leaderFor(tp).id(),
-                                                 tp.topic(),
-                                                 tp.partition(),
-                                                 offset,
-                                                 Errors.NONE.code()));
-        sender.run(time.milliseconds());
-        assertTrue("Request should be completed", future.isDone());
-        assertEquals(offset, future.get().offset());
-    }
-
-    private NetworkReceive produceResponse(int correlation, int source, String topic, int part, long offset, int error) {
-        Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
-        Struct response = struct.instance("responses");
-        response.set("topic", topic);
-        Struct partResp = response.instance("partition_responses");
-        partResp.set("partition", part);
-        partResp.set("error_code", (short) error);
-        partResp.set("base_offset", offset);
-        response.set("partition_responses", new Object[] { partResp });
-        struct.set("responses", new Object[] { response });
-        ResponseHeader header = new ResponseHeader(correlation);
-        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + struct.sizeOf());
-        header.writeTo(buffer);
-        struct.writeTo(buffer);
-        buffer.rewind();
-        return new NetworkReceive(source, buffer);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/kafka/common/config/ConfigDefTest.java
deleted file mode 100644
index a6a91ac..0000000
--- a/clients/src/test/java/kafka/common/config/ConfigDefTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package kafka.common.config;
-
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import kafka.common.config.ConfigDef.Range;
-import kafka.common.config.ConfigDef.Type;
-
-import org.junit.Test;
-
-public class ConfigDefTest {
-
-    @Test
-    public void testBasicTypes() {
-        ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), "docs")
-                                       .define("b", Type.LONG, "docs")
-                                       .define("c", Type.STRING, "hello", "docs")
-                                       .define("d", Type.LIST, "docs")
-                                       .define("e", Type.DOUBLE, "docs")
-                                       .define("f", Type.CLASS, "docs")
-                                       .define("g", Type.BOOLEAN, "docs");
-
-        Properties props = new Properties();
-        props.put("a", "1   ");
-        props.put("b", 2);
-        props.put("d", " a , b, c");
-        props.put("e", 42.5d);
-        props.put("f", String.class.getName());
-        props.put("g", "true");
-
-        Map<String, Object> vals = def.parse(props);
-        assertEquals(1, vals.get("a"));
-        assertEquals(2L, vals.get("b"));
-        assertEquals("hello", vals.get("c"));
-        assertEquals(asList("a", "b", "c"), vals.get("d"));
-        assertEquals(42.5d, vals.get("e"));
-        assertEquals(String.class, vals.get("f"));
-        assertEquals(true, vals.get("g"));
-    }
-
-    @Test(expected = ConfigException.class)
-    public void testInvalidDefault() {
-        new ConfigDef().define("a", Type.INT, "hello", "docs");
-    }
-
-    @Test(expected = ConfigException.class)
-    public void testNullDefault() {
-        new ConfigDef().define("a", Type.INT, null, null, "docs");
-    }
-
-    @Test(expected = ConfigException.class)
-    public void testMissingRequired() {
-        new ConfigDef().define("a", Type.INT, "docs").parse(new HashMap<String, Object>());
-    }
-
-    @Test(expected = ConfigException.class)
-    public void testDefinedTwice() {
-        new ConfigDef().define("a", Type.STRING, "docs").define("a", Type.INT, "docs");
-    }
-
-    @Test
-    public void testBadInputs() {
-        testBadInputs(Type.INT, "hello", null, "42.5", 42.5, Long.MAX_VALUE, Long.toString(Long.MAX_VALUE), new Object());
-        testBadInputs(Type.LONG, "hello", null, "42.5", Long.toString(Long.MAX_VALUE) + "00", new Object());
-        testBadInputs(Type.DOUBLE, "hello", null, new Object());
-        testBadInputs(Type.STRING, new Object());
-        testBadInputs(Type.LIST, 53, new Object());
-    }
-
-    private void testBadInputs(Type type, Object... values) {
-        for (Object value : values) {
-            Map<String, Object> m = new HashMap<String, Object>();
-            m.put("name", value);
-            ConfigDef def = new ConfigDef().define("name", type, "docs");
-            try {
-                def.parse(m);
-                fail("Expected a config exception on bad input for value " + value);
-            } catch (ConfigException e) {
-                // this is good
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java
deleted file mode 100644
index e286261..0000000
--- a/clients/src/test/java/kafka/common/metrics/JmxReporterTest.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package kafka.common.metrics;
-
-import kafka.common.metrics.stats.Avg;
-import kafka.common.metrics.stats.Total;
-
-import org.junit.Test;
-
-public class JmxReporterTest {
-
-    @Test
-    public void testJmxRegistration() throws Exception {
-        Metrics metrics = new Metrics();
-        metrics.addReporter(new JmxReporter());
-        Sensor sensor = metrics.sensor("kafka.requests");
-        sensor.add("pack.bean1.avg", new Avg());
-        sensor.add("pack.bean2.total", new Total());
-        Sensor sensor2 = metrics.sensor("kafka.blah");
-        sensor2.add("pack.bean1.some", new Total());
-        sensor2.add("pack.bean2.some", new Total());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/kafka/common/metrics/MetricsTest.java
deleted file mode 100644
index f66cc7f..0000000
--- a/clients/src/test/java/kafka/common/metrics/MetricsTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-package kafka.common.metrics;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import kafka.common.Metric;
-import kafka.common.metrics.stats.Avg;
-import kafka.common.metrics.stats.Count;
-import kafka.common.metrics.stats.Max;
-import kafka.common.metrics.stats.Min;
-import kafka.common.metrics.stats.Percentile;
-import kafka.common.metrics.stats.Percentiles;
-import kafka.common.metrics.stats.Percentiles.BucketSizing;
-import kafka.common.metrics.stats.Rate;
-import kafka.common.metrics.stats.Total;
-import kafka.common.utils.MockTime;
-
-import org.junit.Test;
-
-public class MetricsTest {
-
-    private static double EPS = 0.000001;
-
-    MockTime time = new MockTime();
-    Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time);
-
-    @Test
-    public void testSimpleStats() throws Exception {
-        ConstantMeasurable measurable = new ConstantMeasurable();
-        metrics.addMetric("direct.measurable", measurable);
-        Sensor s = metrics.sensor("test.sensor");
-        s.add("test.avg", new Avg());
-        s.add("test.max", new Max());
-        s.add("test.min", new Min());
-        s.add("test.rate", new Rate(TimeUnit.SECONDS));
-        s.add("test.occurences", new Rate(TimeUnit.SECONDS, new Count()));
-        s.add("test.count", new Count());
-        s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT, new Percentile("test.median", 50.0), new Percentile("test.perc99_9",
-                                                                                                                         99.9)));
-
-        Sensor s2 = metrics.sensor("test.sensor2");
-        s2.add("s2.total", new Total());
-        s2.record(5.0);
-
-        for (int i = 0; i < 10; i++)
-            s.record(i);
-
-        // pretend 2 seconds passed...
-        time.sleep(2000);
-
-        assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get("s2.total").value(), EPS);
-        assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get("test.avg").value(), EPS);
-        assertEquals("Max(0...9) = 9", 9.0, metrics.metrics().get("test.max").value(), EPS);
-        assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get("test.min").value(), EPS);
-        assertEquals("Rate(0...9) = 22.5", 22.5, metrics.metrics().get("test.rate").value(), EPS);
-        assertEquals("Occurences(0...9) = 5", 5.0, metrics.metrics().get("test.occurences").value(), EPS);
-        assertEquals("Count(0...9) = 10", 10.0, metrics.metrics().get("test.count").value(), EPS);
-    }
-
-    @Test
-    public void testHierarchicalSensors() {
-        Sensor parent1 = metrics.sensor("test.parent1");
-        parent1.add("test.parent1.count", new Count());
-        Sensor parent2 = metrics.sensor("test.parent2");
-        parent2.add("test.parent2.count", new Count());
-        Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
-        child1.add("test.child1.count", new Count());
-        Sensor child2 = metrics.sensor("test.child2", parent1);
-        child2.add("test.child2.count", new Count());
-        Sensor grandchild = metrics.sensor("test.grandchild", child1);
-        grandchild.add("test.grandchild.count", new Count());
-
-        /* increment each sensor one time */
-        parent1.record();
-        parent2.record();
-        child1.record();
-        child2.record();
-        grandchild.record();
-
-        double p1 = parent1.metrics().get(0).value();
-        double p2 = parent2.metrics().get(0).value();
-        double c1 = child1.metrics().get(0).value();
-        double c2 = child2.metrics().get(0).value();
-        double gc = grandchild.metrics().get(0).value();
-
-        /* each metric should have a count equal to one + its children's count */
-        assertEquals(1.0, gc, EPS);
-        assertEquals(1.0 + gc, child1.metrics().get(0).value(), EPS);
-        assertEquals(1.0, c2, EPS);
-        assertEquals(1.0 + c1, p2, EPS);
-        assertEquals(1.0 + c1 + c2, p1, EPS);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testBadSensorHiearchy() {
-        Sensor p = metrics.sensor("parent");
-        Sensor c1 = metrics.sensor("child1", p);
-        Sensor c2 = metrics.sensor("child2", p);
-        metrics.sensor("gc", c1, c2); // should fail
-    }
-
-    @Test
-    public void testEventWindowing() {
-        Count count = new Count();
-        MetricConfig config = new MetricConfig().eventWindow(1).samples(2);
-        count.record(config, 1.0, time.nanoseconds());
-        count.record(config, 1.0, time.nanoseconds());
-        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
-        count.record(config, 1.0, time.nanoseconds()); // first event times out
-        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
-    }
-
-    @Test
-    public void testTimeWindowing() {
-        Count count = new Count();
-        MetricConfig config = new MetricConfig().timeWindow(1, TimeUnit.MILLISECONDS).samples(2);
-        count.record(config, 1.0, time.nanoseconds());
-        time.sleep(1);
-        count.record(config, 1.0, time.nanoseconds());
-        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
-        time.sleep(1);
-        count.record(config, 1.0, time.nanoseconds()); // oldest event times out
-        assertEquals(2.0, count.measure(config, time.nanoseconds()), EPS);
-    }
-
-    @Test
-    public void testOldDataHasNoEffect() {
-        Max max = new Max();
-        long windowMs = 100;
-        MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS);
-        max.record(config, 50, time.nanoseconds());
-        time.sleep(windowMs);
-        assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.nanoseconds()), EPS);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testDuplicateMetricName() {
-        metrics.sensor("test").add("test", new Avg());
-        metrics.sensor("test2").add("test", new Total());
-    }
-
-    @Test
-    public void testQuotas() {
-        Sensor sensor = metrics.sensor("test");
-        sensor.add("test1.total", new Total(), new MetricConfig().quota(Quota.lessThan(5.0)));
-        sensor.add("test2.total", new Total(), new MetricConfig().quota(Quota.moreThan(0.0)));
-        sensor.record(5.0);
-        try {
-            sensor.record(1.0);
-            fail("Should have gotten a quota violation.");
-        } catch (QuotaViolationException e) {
-            // this is good
-        }
-        assertEquals(6.0, metrics.metrics().get("test1.total").value(), EPS);
-        sensor.record(-6.0);
-        try {
-            sensor.record(-1.0);
-            fail("Should have gotten a quota violation.");
-        } catch (QuotaViolationException e) {
-            // this is good
-        }
-    }
-
-    @Test
-    public void testPercentiles() {
-        int buckets = 100;
-        Percentiles percs = new Percentiles(4 * buckets,
-                                            0.0,
-                                            100.0,
-                                            BucketSizing.CONSTANT,
-                                            new Percentile("test.p25", 25),
-                                            new Percentile("test.p50", 50),
-                                            new Percentile("test.p75", 75));
-        MetricConfig config = new MetricConfig().eventWindow(50).samples(2);
-        Sensor sensor = metrics.sensor("test", config);
-        sensor.add(percs);
-        Metric p25 = this.metrics.metrics().get("test.p25");
-        Metric p50 = this.metrics.metrics().get("test.p50");
-        Metric p75 = this.metrics.metrics().get("test.p75");
-
-        // record two windows worth of sequential values
-        for (int i = 0; i < buckets; i++)
-            sensor.record(i);
-
-        assertEquals(25, p25.value(), 1.0);
-        assertEquals(50, p50.value(), 1.0);
-        assertEquals(75, p75.value(), 1.0);
-
-        for (int i = 0; i < buckets; i++)
-            sensor.record(0.0);
-
-        assertEquals(0.0, p25.value(), 1.0);
-        assertEquals(0.0, p50.value(), 1.0);
-        assertEquals(0.0, p75.value(), 1.0);
-    }
-
-    public static class ConstantMeasurable implements Measurable {
-        public double value = 0.0;
-
-        @Override
-        public double measure(MetricConfig config, long now) {
-            return value;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java b/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
deleted file mode 100644
index 9c6a4ab..0000000
--- a/clients/src/test/java/kafka/common/metrics/stats/HistogramTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package kafka.common.metrics.stats;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.Random;
-
-import kafka.common.metrics.stats.Histogram.BinScheme;
-import kafka.common.metrics.stats.Histogram.ConstantBinScheme;
-import kafka.common.metrics.stats.Histogram.LinearBinScheme;
-
-import org.junit.Test;
-
-public class HistogramTest {
-
-    private static final double EPS = 0.0000001d;
-
-    @Test
-    public void testHistogram() {
-        BinScheme scheme = new ConstantBinScheme(12, -5, 5);
-        Histogram hist = new Histogram(scheme);
-        for (int i = -5; i < 5; i++)
-            hist.record(i);
-        for (int i = 0; i < 10; i++)
-            assertEquals(scheme.fromBin(i + 1), hist.value(i / 10.0 + EPS), EPS);
-    }
-
-    @Test
-    public void testConstantBinScheme() {
-        ConstantBinScheme scheme = new ConstantBinScheme(5, -5, 5);
-        assertEquals("A value below the lower bound should map to the first bin", 0, scheme.toBin(-5.01));
-        assertEquals("A value above the upper bound should map to the last bin", 4, scheme.toBin(5.01));
-        assertEquals("Check boundary of bucket 1", 1, scheme.toBin(-5));
-        assertEquals("Check boundary of bucket 4", 4, scheme.toBin(5));
-        assertEquals("Check boundary of bucket 3", 3, scheme.toBin(4.9999));
-        checkBinningConsistency(new ConstantBinScheme(4, 0, 5));
-        checkBinningConsistency(scheme);
-    }
-
-    @Test
-    public void testLinearBinScheme() {
-        LinearBinScheme scheme = new LinearBinScheme(10, 10);
-        checkBinningConsistency(scheme);
-    }
-
-    private void checkBinningConsistency(BinScheme scheme) {
-        for (int bin = 0; bin < scheme.bins(); bin++) {
-            double fromBin = scheme.fromBin(bin);
-            int binAgain = scheme.toBin(fromBin + EPS);
-            assertEquals("unbinning and rebinning the bin " + bin
-                         + " gave a different result ("
-                         + fromBin
-                         + " was placed in bin "
-                         + binAgain
-                         + " )", bin, binAgain);
-        }
-    }
-
-    public static void main(String[] args) {
-        Random random = new Random();
-        System.out.println("[-100, 100]:");
-        for (BinScheme scheme : Arrays.asList(new ConstantBinScheme(1000, -100, 100),
-                                              new ConstantBinScheme(100, -100, 100),
-                                              new ConstantBinScheme(10, -100, 100))) {
-            Histogram h = new Histogram(scheme);
-            for (int i = 0; i < 10000; i++)
-                h.record(200.0 * random.nextDouble() - 100.0);
-            for (double quantile = 0.0; quantile < 1.0; quantile += 0.05)
-                System.out.printf("%5.2f: %.1f, ", quantile, h.value(quantile));
-            System.out.println();
-        }
-
-        System.out.println("[0, 1000]");
-        for (BinScheme scheme : Arrays.asList(new LinearBinScheme(1000, 1000),
-                                              new LinearBinScheme(100, 1000),
-                                              new LinearBinScheme(10, 1000))) {
-            Histogram h = new Histogram(scheme);
-            for (int i = 0; i < 10000; i++)
-                h.record(1000.0 * random.nextDouble());
-            for (double quantile = 0.0; quantile < 1.0; quantile += 0.05)
-                System.out.printf("%5.2f: %.1f, ", quantile, h.value(quantile));
-            System.out.println();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
deleted file mode 100644
index 5204f3a..0000000
--- a/clients/src/test/java/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package kafka.common.protocol.types;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.junit.Before;
-import org.junit.Test;
-
-public class ProtocolSerializationTest {
-
-    private Schema schema;
-    private Struct struct;
-
-    @Before
-    public void setup() {
-        this.schema = new Schema(new Field("int8", Type.INT8),
-                                 new Field("int16", Type.INT16),
-                                 new Field("int32", Type.INT32),
-                                 new Field("int64", Type.INT64),
-                                 new Field("string", Type.STRING),
-                                 new Field("bytes", Type.BYTES),
-                                 new Field("array", new ArrayOf(Type.INT32)),
-                                 new Field("struct", new Schema(new Field("field", Type.INT32))));
-        this.struct = new Struct(this.schema).set("int8", (byte) 1)
-                                             .set("int16", (short) 1)
-                                             .set("int32", (int) 1)
-                                             .set("int64", (long) 1)
-                                             .set("string", "1")
-                                             .set("bytes", "1".getBytes())
-                                             .set("array", new Object[] { 1 });
-        this.struct.set("struct", this.struct.instance("struct").set("field", new Object[] { 1, 2, 3 }));
-    }
-
-    @Test
-    public void testSimple() {
-        check(Type.INT8, (byte) -111);
-        check(Type.INT16, (short) -11111);
-        check(Type.INT32, -11111111);
-        check(Type.INT64, -11111111111L);
-        check(Type.STRING, "");
-        check(Type.STRING, "hello");
-        check(Type.STRING, "A\u00ea\u00f1\u00fcC");
-        check(Type.BYTES, ByteBuffer.allocate(0));
-        check(Type.BYTES, ByteBuffer.wrap("abcd".getBytes()));
-        check(new ArrayOf(Type.INT32), new Object[] { 1, 2, 3, 4 });
-        check(new ArrayOf(Type.STRING), new Object[] {});
-        check(new ArrayOf(Type.STRING), new Object[] { "hello", "there", "beautiful" });
-    }
-
-    @Test
-    public void testNulls() {
-        for (Field f : this.schema.fields()) {
-            Object o = this.struct.get(f);
-            try {
-                this.struct.set(f, null);
-                this.struct.validate();
-                fail("Should not allow serialization of null value.");
-            } catch (SchemaException e) {
-                // this is good
-                this.struct.set(f, o);
-            }
-        }
-    }
-
-    @Test
-    public void testDefault() {
-        Schema schema = new Schema(new Field("field", Type.INT32, "doc", 42));
-        Struct struct = new Struct(schema);
-        assertEquals("Should get the default value", 42, struct.get("field"));
-        struct.validate(); // should be valid even with missing value
-    }
-
-    private Object roundtrip(Type type, Object obj) {
-        ByteBuffer buffer = ByteBuffer.allocate(type.sizeOf(obj));
-        type.write(buffer, obj);
-        assertFalse("The buffer should now be full.", buffer.hasRemaining());
-        buffer.rewind();
-        Object read = type.read(buffer);
-        assertFalse("All bytes should have been read.", buffer.hasRemaining());
-        return read;
-    }
-
-    private void check(Type type, Object obj) {
-        Object result = roundtrip(type, obj);
-        if (obj instanceof Object[]) {
-            obj = Arrays.asList((Object[]) obj);
-            result = Arrays.asList((Object[]) result);
-        }
-        assertEquals("The object read back should be the same as what was written.", obj, result);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java
deleted file mode 100644
index 6906309..0000000
--- a/clients/src/test/java/kafka/common/record/MemoryRecordsTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package kafka.common.record;
-
-import static kafka.common.utils.Utils.toArray;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-
-import org.junit.Test;
-
-public class MemoryRecordsTest {
-
-    @Test
-    public void testIterator() {
-        MemoryRecords recs1 = new MemoryRecords(ByteBuffer.allocate(1024));
-        MemoryRecords recs2 = new MemoryRecords(ByteBuffer.allocate(1024));
-        List<Record> list = Arrays.asList(new Record("a".getBytes(), "1".getBytes()),
-                                          new Record("b".getBytes(), "2".getBytes()),
-                                          new Record("c".getBytes(), "3".getBytes()));
-        for (int i = 0; i < list.size(); i++) {
-            Record r = list.get(i);
-            recs1.append(i, r);
-            recs2.append(i, toArray(r.key()), toArray(r.value()), r.compressionType());
-        }
-
-        for (int iteration = 0; iteration < 2; iteration++) {
-            for (MemoryRecords recs : Arrays.asList(recs1, recs2)) {
-                Iterator<LogEntry> iter = recs.iterator();
-                for (int i = 0; i < list.size(); i++) {
-                    assertTrue(iter.hasNext());
-                    LogEntry entry = iter.next();
-                    assertEquals((long) i, entry.offset());
-                    assertEquals(list.get(i), entry.record());
-                }
-                assertFalse(iter.hasNext());
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/record/RecordTest.java b/clients/src/test/java/kafka/common/record/RecordTest.java
deleted file mode 100644
index 9c59c9b..0000000
--- a/clients/src/test/java/kafka/common/record/RecordTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package kafka.common.record;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(value = Parameterized.class)
-public class RecordTest {
-
-    private ByteBuffer key;
-    private ByteBuffer value;
-    private CompressionType compression;
-    private Record record;
-
-    public RecordTest(byte[] key, byte[] value, CompressionType compression) {
-        this.key = key == null ? null : ByteBuffer.wrap(key);
-        this.value = value == null ? null : ByteBuffer.wrap(value);
-        this.compression = compression;
-        this.record = new Record(key, value, compression);
-    }
-
-    @Test
-    public void testFields() {
-        assertEquals(compression, record.compressionType());
-        assertEquals(key != null, record.hasKey());
-        assertEquals(key, record.key());
-        if (key != null)
-            assertEquals(key.limit(), record.keySize());
-        assertEquals(Record.CURRENT_MAGIC_VALUE, record.magic());
-        assertEquals(value, record.value());
-        if (value != null)
-            assertEquals(value.limit(), record.valueSize());
-    }
-
-    @Test
-    public void testChecksum() {
-        assertEquals(record.checksum(), record.computeChecksum());
-        assertTrue(record.isValid());
-        for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) {
-            Record copy = copyOf(record);
-            copy.buffer().put(i, (byte) 69);
-            assertFalse(copy.isValid());
-            try {
-                copy.ensureValid();
-                fail("Should fail the above test.");
-            } catch (InvalidRecordException e) {
-                // this is good
-            }
-        }
-    }
-
-    private Record copyOf(Record record) {
-        ByteBuffer buffer = ByteBuffer.allocate(record.size());
-        record.buffer().put(buffer);
-        buffer.rewind();
-        record.buffer().rewind();
-        return new Record(buffer);
-    }
-
-    @Test
-    public void testEquality() {
-        assertEquals(record, copyOf(record));
-    }
-
-    @Parameters
-    public static Collection<Object[]> data() {
-        List<Object[]> values = new ArrayList<Object[]>();
-        for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes()))
-            for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes()))
-                for (CompressionType compression : CompressionType.values())
-                    values.add(new Object[] { key, value, compression });
-        return values;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java b/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java
deleted file mode 100644
index 7662d38..0000000
--- a/clients/src/test/java/kafka/common/utils/AbstractIteratorTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package kafka.common.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.junit.Test;
-
-public class AbstractIteratorTest {
-
-    @Test
-    public void testIterator() {
-        int max = 10;
-        List<Integer> l = new ArrayList<Integer>();
-        for (int i = 0; i < max; i++)
-            l.add(i);
-        ListIterator<Integer> iter = new ListIterator<Integer>(l);
-        for (int i = 0; i < max; i++) {
-            Integer value = i;
-            assertEquals(value, iter.peek());
-            assertTrue(iter.hasNext());
-            assertEquals(value, iter.next());
-        }
-        assertFalse(iter.hasNext());
-    }
-
-    @Test(expected = NoSuchElementException.class)
-    public void testEmptyIterator() {
-        Iterator<Object> iter = new ListIterator<Object>(Arrays.asList());
-        iter.next();
-    }
-
-    class ListIterator<T> extends AbstractIterator<T> {
-        private List<T> list;
-        private int position = 0;
-
-        public ListIterator(List<T> l) {
-            this.list = l;
-        }
-
-        public T makeNext() {
-            if (position < list.size())
-                return list.get(position++);
-            else
-                return allDone();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/common/utils/MockTime.java b/clients/src/test/java/kafka/common/utils/MockTime.java
deleted file mode 100644
index 095d4f6..0000000
--- a/clients/src/test/java/kafka/common/utils/MockTime.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package kafka.common.utils;
-
-import java.util.concurrent.TimeUnit;
-
-public class MockTime implements Time {
-
-    private long nanos = 0;
-
-    public MockTime() {
-        this.nanos = System.nanoTime();
-    }
-
-    @Override
-    public long milliseconds() {
-        return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
-    }
-
-    @Override
-    public long nanoseconds() {
-        return nanos;
-    }
-
-    @Override
-    public void sleep(long ms) {
-        this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/test/java/kafka/test/MetricsBench.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/test/MetricsBench.java b/clients/src/test/java/kafka/test/MetricsBench.java
deleted file mode 100644
index 2b164bd..0000000
--- a/clients/src/test/java/kafka/test/MetricsBench.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package kafka.test;
-
-import java.util.Arrays;
-
-import kafka.common.metrics.Metrics;
-import kafka.common.metrics.Sensor;
-import kafka.common.metrics.stats.Avg;
-import kafka.common.metrics.stats.Count;
-import kafka.common.metrics.stats.Max;
-import kafka.common.metrics.stats.Percentile;
-import kafka.common.metrics.stats.Percentiles;
-import kafka.common.metrics.stats.Percentiles.BucketSizing;
-
-public class MetricsBench {
-
-    public static void main(String[] args) {
-        long iters = Long.parseLong(args[0]);
-        Metrics metrics = new Metrics();
-        Sensor parent = metrics.sensor("parent");
-        Sensor child = metrics.sensor("child", parent);
-        for (Sensor sensor : Arrays.asList(parent, child)) {
-            sensor.add(sensor.name() + ".avg", new Avg());
-            sensor.add(sensor.name() + ".count", new Count());
-            sensor.add(sensor.name() + ".max", new Max());
-            sensor.add(new Percentiles(1024,
-                                       0.0,
-                                       iters,
-                                       BucketSizing.CONSTANT,
-                                       new Percentile(sensor.name() + ".median", 50.0),
-                                       new Percentile(sensor.name() + ".p_99", 99.0)));
-        }
-        long start = System.nanoTime();
-        for (int i = 0; i < iters; i++)
-            child.record(i);
-        double ellapsed = (System.nanoTime() - start) / (double) iters;
-        System.out.println(String.format("%.2f ns per metric recording.", ellapsed));
-    }
-}


Mime
View raw message