kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [1/2] Implement a few of the API suggestions from the mailing list.
Date Tue, 04 Feb 2014 16:40:19 GMT
Updated Branches:
  refs/heads/trunk c9052c5ff -> 253f86e31


http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
index cb1aaae..65a7c64 100644
--- a/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
+++ b/clients/src/main/java/kafka/common/network/ByteBufferReceive.java
@@ -33,7 +33,9 @@ public class ByteBufferReceive implements Receive {
 
     @Override
     public long readFrom(ScatteringByteChannel channel) throws IOException {
-        return channel.read(buffers);
+        long read = channel.read(buffers);
+        remaining += read;
+        return read;
     }
 
     public ByteBuffer[] reify() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/Errors.java b/clients/src/main/java/kafka/common/protocol/Errors.java
index fb1a3e5..402a6c0 100644
--- a/clients/src/main/java/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/kafka/common/protocol/Errors.java
@@ -4,9 +4,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 import kafka.common.errors.ApiException;
-import kafka.common.errors.CorruptMessageException;
+import kafka.common.errors.CorruptRecordException;
 import kafka.common.errors.LeaderNotAvailableException;
-import kafka.common.errors.MessageTooLargeException;
+import kafka.common.errors.RecordTooLargeException;
 import kafka.common.errors.NetworkException;
 import kafka.common.errors.NotLeaderForPartitionException;
 import kafka.common.errors.OffsetMetadataTooLarge;
@@ -27,14 +27,14 @@ public enum Errors {
     OFFSET_OUT_OF_RANGE(1,
                         new OffsetOutOfRangeException("The requested offset is not within
the range of offsets maintained by the server.")),
     CORRUPT_MESSAGE(2,
-                    new CorruptMessageException("The message contents does not match the
message CRC or the message is otherwise corrupt.")),
+                    new CorruptRecordException("The message contents does not match the message
CRC or the message is otherwise corrupt.")),
     UNKNOWN_TOPIC_OR_PARTITION(3, new UnknownTopicOrPartitionException("This server does
not host this topic-partition.")),
     LEADER_NOT_AVAILABLE(5,
                          new LeaderNotAvailableException("There is no leader for this topic-partition
as we are in the middle of a leadership election.")),
     NOT_LEADER_FOR_PARTITION(6, new NotLeaderForPartitionException("This server is not the
leader for that topic-partition.")),
     REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")),
     MESSAGE_TOO_LARGE(10,
-                      new MessageTooLargeException("The request included a message larger
than the max message size the server will accept.")),
+                      new RecordTooLargeException("The request included a message larger
than the max message size the server will accept.")),
     OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset
request was too large.")),
     NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response
was received."));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
index 83dad53..576c24d 100644
--- a/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
+++ b/clients/src/main/java/kafka/common/protocol/ProtoUtils.java
@@ -2,7 +2,9 @@ package kafka.common.protocol;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import kafka.common.Cluster;
 import kafka.common.Node;
@@ -52,14 +54,14 @@ public class ProtoUtils {
     }
 
     public static Cluster parseMetadataResponse(Struct response) {
-        List<Node> brokers = new ArrayList<Node>();
+        Map<Integer, Node> brokers = new HashMap<Integer, Node>();
         Object[] brokerStructs = (Object[]) response.get("brokers");
         for (int i = 0; i < brokerStructs.length; i++) {
             Struct broker = (Struct) brokerStructs[i];
             int nodeId = (Integer) broker.get("node_id");
             String host = (String) broker.get("host");
             int port = (Integer) broker.get("port");
-            brokers.add(new Node(nodeId, host, port));
+            brokers.put(nodeId, new Node(nodeId, host, port));
         }
         List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
         Object[] topicInfos = (Object[]) response.get("topic_metadata");
@@ -75,21 +77,21 @@ public class ProtoUtils {
                     if (partError == Errors.NONE.code()) {
                         int partition = partitionInfo.getInt("partition_id");
                         int leader = partitionInfo.getInt("leader");
-                        int[] replicas = intArray((Object[]) partitionInfo.get("replicas"));
-                        int[] isr = intArray((Object[]) partitionInfo.get("isr"));
-                        partitions.add(new PartitionInfo(topic, partition, leader, replicas,
isr));
+                        Node leaderNode = leader == -1 ? null : brokers.get(leader);
+                        Object[] replicas = (Object[]) partitionInfo.get("replicas");
+                        Node[] replicaNodes = new Node[replicas.length];
+                        for (int k = 0; k < replicas.length; k++)
+                            replicaNodes[k] = brokers.get(replicas[k]);
+                        Object[] isr = (Object[]) partitionInfo.get("isr");
+                        Node[] isrNodes = new Node[isr.length];
+                        for (int k = 0; k < isr.length; k++)
+                            isrNodes[k] = brokers.get(isr[k]);
+                        partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes,
isrNodes));
                     }
                 }
             }
         }
-        return new Cluster(brokers, partitions);
-    }
-
-    private static int[] intArray(Object[] ints) {
-        int[] copy = new int[ints.length];
-        for (int i = 0; i < ints.length; i++)
-            copy[i] = (Integer) ints[i];
-        return copy;
+        return new Cluster(brokers.values(), partitions);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/protocol/Protocol.java b/clients/src/main/java/kafka/common/protocol/Protocol.java
index e191d6a..49b60aa 100644
--- a/clients/src/main/java/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/kafka/common/protocol/Protocol.java
@@ -66,7 +66,7 @@ public class Protocol {
 
     public static Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
                                                             new Field("data", new ArrayOf(new
Schema(new Field("partition", INT32),
-                                                                                        
            new Field("message_set", BYTES)))));
+                                                                                        
            new Field("record_set", BYTES)))));
 
     public static Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
                                                                    INT16,

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/MemoryRecords.java b/clients/src/main/java/kafka/common/record/MemoryRecords.java
index ec98226..d3f8426 100644
--- a/clients/src/main/java/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/kafka/common/record/MemoryRecords.java
@@ -48,7 +48,7 @@ public class MemoryRecords implements Records {
         return this.buffer.remaining() >= Records.LOG_OVERHEAD + Record.recordSize(key,
value);
     }
 
-    /** Write the messages in this set to the given channel */
+    /** Write the records in this set to the given channel */
     public int writeTo(GatheringByteChannel channel) throws IOException {
         return channel.write(buffer);
     }
@@ -89,7 +89,7 @@ public class MemoryRecords implements Records {
             long offset = buffer.getLong();
             int size = buffer.getInt();
             if (size < 0)
-                throw new IllegalStateException("Message with size " + size);
+                throw new IllegalStateException("Record with size " + size);
             if (buffer.remaining() < size)
                 return allDone();
             ByteBuffer rec = buffer.slice();

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/main/java/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/record/Record.java b/clients/src/main/java/kafka/common/record/Record.java
index 835a0a4..b89accf 100644
--- a/clients/src/main/java/kafka/common/record/Record.java
+++ b/clients/src/main/java/kafka/common/record/Record.java
@@ -162,7 +162,7 @@ public final class Record {
     }
 
     /**
-     * Throw an InvalidMessageException if isValid is false for this record
+     * Throw an InvalidRecordException if isValid is false for this record
      */
     public void ensureValid() {
         if (!isValid())
@@ -260,7 +260,7 @@ public final class Record {
     }
 
     public String toString() {
-        return String.format("Message(magic = %d, attributes = %d, crc = %d, key = %d bytes,
value = %d bytes)",
+        return String.format("Record(magic = %d, attributes = %d, crc = %d, key = %d bytes,
value = %d bytes)",
                              magic(),
                              attributes(),
                              checksum(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/kafka/clients/producer/MetadataTest.java
index 68e4bd7..dd45209 100644
--- a/clients/src/test/java/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/kafka/clients/producer/MetadataTest.java
@@ -1,12 +1,10 @@
 package kafka.clients.producer;
 
-import static java.util.Arrays.asList;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import kafka.clients.producer.internals.Metadata;
 import kafka.common.Cluster;
-import kafka.common.Node;
-import kafka.common.PartitionInfo;
+import kafka.test.TestUtils;
 
 import org.junit.Test;
 
@@ -30,7 +28,7 @@ public class MetadataTest {
         Thread t2 = asyncFetch(topic);
         assertTrue("Awaiting update", t1.isAlive());
         assertTrue("Awaiting update", t2.isAlive());
-        metadata.update(clusterWith(topic), time);
+        metadata.update(TestUtils.singletonCluster(topic, 1), time);
         t1.join();
         t2.join();
         assertFalse("No update needed.", metadata.needsUpdate(time));
@@ -38,10 +36,6 @@ public class MetadataTest {
         assertTrue("Update needed due to stale metadata.", metadata.needsUpdate(time));
     }
 
-    private Cluster clusterWith(String topic) {
-        return new Cluster(asList(new Node(0, "localhost", 1969)), asList(new PartitionInfo(topic,
0, 0, new int[0], new int[0])));
-    }
-
     private Thread asyncFetch(final String topic) {
         Thread thread = new Thread() {
             public void run() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
index 61929a4..24b132f 100644
--- a/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
+++ b/clients/src/test/java/kafka/clients/producer/MockProducerTest.java
@@ -5,62 +5,59 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import kafka.common.Cluster;
-import kafka.common.Node;
-import kafka.common.PartitionInfo;
-import kafka.common.Serializer;
-import kafka.common.StringSerialization;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.junit.Test;
 
 public class MockProducerTest {
 
+    private String topic = "topic";
+
     @Test
-    public void testAutoCompleteMock() {
+    public void testAutoCompleteMock() throws Exception {
         MockProducer producer = new MockProducer(true);
-        ProducerRecord record = new ProducerRecord("topic", "key", "value");
-        RecordSend send = producer.send(record);
-        assertTrue("Send should be immediately complete", send.completed());
-        assertFalse("Send should be successful", send.hasError());
-        assertEquals("Offset should be 0", 0, send.offset());
+        ProducerRecord record = new ProducerRecord(topic, "key".getBytes(), "value".getBytes());
+        Future<RecordMetadata> metadata = producer.send(record);
+        assertTrue("Send should be immediately complete", metadata.isDone());
+        assertFalse("Send should be successful", isError(metadata));
+        assertEquals("Offset should be 0", 0, metadata.get().offset());
+        assertEquals(topic, metadata.get().topic());
         assertEquals("We should have the record in our history", asList(record), producer.history());
         producer.clear();
         assertEquals("Clear should erase our history", 0, producer.history().size());
     }
 
-    public void testManualCompletion() {
+    @Test
+    public void testManualCompletion() throws Exception {
         MockProducer producer = new MockProducer(false);
-        ProducerRecord record1 = new ProducerRecord("topic", "key1", "value1");
-        ProducerRecord record2 = new ProducerRecord("topic", "key2", "value2");
-        RecordSend send1 = producer.send(record1);
-        assertFalse("Send shouldn't have completed", send1.completed());
-        RecordSend send2 = producer.send(record2);
-        assertFalse("Send shouldn't have completed", send2.completed());
+        ProducerRecord record1 = new ProducerRecord("topic", "key1".getBytes(), "value1".getBytes());
+        ProducerRecord record2 = new ProducerRecord("topic", "key2".getBytes(), "value2".getBytes());
+        Future<RecordMetadata> md1 = producer.send(record1);
+        assertFalse("Send shouldn't have completed", md1.isDone());
+        Future<RecordMetadata> md2 = producer.send(record2);
+        assertFalse("Send shouldn't have completed", md2.isDone());
         assertTrue("Complete the first request", producer.completeNext());
-        assertFalse("Requst should be successful", send1.hasError());
-        assertFalse("Second request still incomplete", send2.completed());
+        assertFalse("Requst should be successful", isError(md1));
+        assertFalse("Second request still incomplete", md2.isDone());
         IllegalArgumentException e = new IllegalArgumentException("blah");
         assertTrue("Complete the second request with an error", producer.errorNext(e));
         try {
-            send2.await();
+            md2.get();
             fail("Expected error to be thrown");
-        } catch (IllegalArgumentException err) {
-            // this is good
+        } catch (ExecutionException err) {
+            assertEquals(e, err.getCause());
         }
         assertFalse("No more requests to complete", producer.completeNext());
     }
 
-    public void testSerializationAndPartitioning() {
-        Cluster cluster = new Cluster(asList(new Node(0, "host", -1)), asList(new PartitionInfo("topic",
-                                                                                        
       0,
-                                                                                        
       0,
-                                                                                        
       new int[] { 0 },
-                                                                                        
       new int[] { 0 })));
-        Serializer serializer = new StringSerialization();
-        Partitioner partitioner = new DefaultPartitioner();
-        MockProducer producer = new MockProducer(serializer, serializer, partitioner, cluster,
true);
-        ProducerRecord record = new ProducerRecord("topic", "key", "value");
-        RecordSend send = producer.send(record);
-        assertTrue("Send should be immediately complete", send.completed());
+    private boolean isError(Future<?> future) {
+        try {
+            future.get();
+            return false;
+        } catch (Exception e) {
+            return true;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/clients/producer/PartitionerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/PartitionerTest.java b/clients/src/test/java/kafka/clients/producer/PartitionerTest.java
new file mode 100644
index 0000000..c18da76
--- /dev/null
+++ b/clients/src/test/java/kafka/clients/producer/PartitionerTest.java
@@ -0,0 +1,54 @@
+package kafka.clients.producer;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import kafka.clients.producer.internals.Partitioner;
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+
+import org.junit.Test;
+
+public class PartitionerTest {
+
+    private byte[] key = "key".getBytes();
+    private byte[] value = "value".getBytes();
+    private Partitioner partitioner = new Partitioner();
+    private Node node0 = new Node(0, "localhost", 99);
+    private Node node1 = new Node(1, "localhost", 100);
+    private Node node2 = new Node(2, "localhost", 101);
+    private Node[] nodes = new Node[] { node0, node1, node2 };
+    private String topic = "test";
+    private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 0, node0,
nodes, nodes),
+                                                    new PartitionInfo(topic, 1, node1, nodes,
nodes),
+                                                    new PartitionInfo(topic, 2, null, nodes,
nodes));
+    private Cluster cluster = new Cluster(asList(node0, node1, node2), partitions);
+
+    @Test
+    public void testUserSuppliedPartitioning() {
+        assertEquals("If the user supplies a partition we should use it.",
+                     0,
+                     partitioner.partition(new ProducerRecord("test", 0, key, value), cluster));
+    }
+
+    @Test
+    public void testKeyPartitionIsStable() {
+        int partition = partitioner.partition(new ProducerRecord("test", key, value), cluster);
+        assertEquals("Same key should yield same partition",
+                     partition,
+                     partitioner.partition(new ProducerRecord("test", key, "value2".getBytes()),
cluster));
+    }
+
+    @Test
+    public void testRoundRobinWithDownNode() {
+        for (int i = 0; i < partitions.size(); i++) {
+            int part = partitioner.partition(new ProducerRecord("test", value), cluster);
+            assertTrue("We should never choose a leader-less node in round robin", part >=
0 && part < 2);
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
index f8fd14b..804c57b 100644
--- a/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
+++ b/clients/src/test/java/kafka/clients/producer/RecordSendTest.java
@@ -5,12 +5,14 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import kafka.clients.producer.internals.FutureRecordMetadata;
 import kafka.clients.producer.internals.ProduceRequestResult;
 import kafka.common.TopicPartition;
-import kafka.common.errors.CorruptMessageException;
-import kafka.common.errors.TimeoutException;
+import kafka.common.errors.CorruptRecordException;
 
 import org.junit.Test;
 
@@ -24,37 +26,37 @@ public class RecordSendTest {
      * Test that waiting on a request that never completes times out
      */
     @Test
-    public void testTimeout() {
+    public void testTimeout() throws Exception {
         ProduceRequestResult request = new ProduceRequestResult();
-        RecordSend send = new RecordSend(relOffset, request);
-        assertFalse("Request is not completed", send.completed());
+        FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset);
+        assertFalse("Request is not completed", future.isDone());
         try {
-            send.await(5, TimeUnit.MILLISECONDS);
+            future.get(5, TimeUnit.MILLISECONDS);
             fail("Should have thrown exception.");
         } catch (TimeoutException e) { /* this is good */
         }
 
         request.done(topicPartition, baseOffset, null);
-        assertTrue(send.completed());
-        assertEquals(baseOffset + relOffset, send.offset());
+        assertTrue(future.isDone());
+        assertEquals(baseOffset + relOffset, future.get().offset());
     }
 
     /**
      * Test that an asynchronous request will eventually throw the right exception
      */
-    @Test(expected = CorruptMessageException.class)
-    public void testError() {
-        RecordSend send = new RecordSend(relOffset, asyncRequest(baseOffset, new CorruptMessageException(),
50L));
-        send.await();
+    @Test(expected = ExecutionException.class)
+    public void testError() throws Exception {
+        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new
CorruptRecordException(), 50L), relOffset);
+        future.get();
     }
 
     /**
      * Test that an asynchronous request will eventually return the right offset
      */
     @Test
-    public void testBlocking() {
-        RecordSend send = new RecordSend(relOffset, asyncRequest(baseOffset, null, 50L));
-        assertEquals(baseOffset + relOffset, send.offset());
+    public void testBlocking() throws Exception {
+        FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null,
50L), relOffset);
+        assertEquals(baseOffset + relOffset, future.get().offset());
     }
 
     /* create a new request result that will be completed after the given timeout */

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/clients/producer/SenderTest.java b/clients/src/test/java/kafka/clients/producer/SenderTest.java
index 73f1aba..8788095 100644
--- a/clients/src/test/java/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/kafka/clients/producer/SenderTest.java
@@ -1,17 +1,15 @@
 package kafka.clients.producer;
 
-import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.Future;
 
 import kafka.clients.producer.internals.Metadata;
 import kafka.clients.producer.internals.RecordAccumulator;
 import kafka.clients.producer.internals.Sender;
 import kafka.common.Cluster;
-import kafka.common.Node;
-import kafka.common.PartitionInfo;
 import kafka.common.TopicPartition;
 import kafka.common.metrics.Metrics;
 import kafka.common.network.NetworkReceive;
@@ -24,6 +22,7 @@ import kafka.common.requests.RequestSend;
 import kafka.common.requests.ResponseHeader;
 import kafka.common.utils.MockTime;
 import kafka.test.MockSelector;
+import kafka.test.TestUtils;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -34,11 +33,7 @@ public class SenderTest {
     private MockSelector selector = new MockSelector(time);
     private int batchSize = 16 * 1024;
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-    private Cluster cluster = new Cluster(asList(new Node(0, "localhost", 1969)), asList(new
PartitionInfo("test",
-                                                                                        
                  0,
-                                                                                        
                  0,
-                                                                                        
                  new int[0],
-                                                                                        
                  new int[0])));
+    private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = new Metrics(time);
     private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024,
0L, false, metrics, time);
     private Sender sender = new Sender(selector, metadata, this.accumulator, "", 1024 * 1024,
0L, (short) -1, 10000, time);
@@ -51,7 +46,7 @@ public class SenderTest {
     @Test
     public void testSimple() throws Exception {
         TopicPartition tp = new TopicPartition("test", 0);
-        RecordSend send = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE,
null);
+        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
CompressionType.NONE, null);
         sender.run(time.milliseconds());
         assertEquals("We should have connected", 1, selector.connected().size());
         selector.clear();
@@ -67,8 +62,8 @@ public class SenderTest {
                                                  offset,
                                                  Errors.NONE.code()));
         sender.run(time.milliseconds());
-        assertTrue("Request should be completed", send.completed());
-        assertEquals(offset, send.offset());
+        assertTrue("Request should be completed", future.isDone());
+        assertEquals(offset, future.get().offset());
     }
 
     private NetworkReceive produceResponse(int correlation, int source, String topic, int
part, long offset, int error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/253f86e3/clients/src/test/java/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/kafka/test/TestUtils.java b/clients/src/test/java/kafka/test/TestUtils.java
index a2ef3a2..90c6850 100644
--- a/clients/src/test/java/kafka/test/TestUtils.java
+++ b/clients/src/test/java/kafka/test/TestUtils.java
@@ -1,10 +1,18 @@
 package kafka.test;
 
+import static java.util.Arrays.asList;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
+import kafka.common.Cluster;
+import kafka.common.Node;
+import kafka.common.PartitionInfo;
+
 /**
  * Helper functions for writing unit tests
  */
@@ -20,6 +28,20 @@ public class TestUtils {
     public static final Random seededRandom = new Random(192348092834L);
     public static final Random random = new Random();
 
+    public static Cluster singletonCluster(String topic, int partitions) {
+        return clusterWith(1, topic, partitions);
+    }
+
+    public static Cluster clusterWith(int nodes, String topic, int partitions) {
+        Node[] ns = new Node[nodes];
+        for (int i = 0; i < nodes; i++)
+            ns[i] = new Node(0, "localhost", 1969);
+        List<PartitionInfo> parts = new ArrayList<PartitionInfo>();
+        for (int i = 0; i < partitions; i++)
+            parts.add(new PartitionInfo(topic, i, ns[i % ns.length], ns, ns));
+        return new Cluster(asList(ns), parts);
+    }
+
     /**
      * Choose a number of random available ports
      */


Mime
View raw message