kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [1/3] KAFKA-1326 Refactor Sender to support consumer.
Date Wed, 11 Jun 2014 15:47:58 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk dcc88408c -> 548d1ba09


http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 6fa4a58..6cf4fb7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -3,65 +3,50 @@
  * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
  * License. You may obtain a copy of the License at
- *
+ * 
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.types.Struct;
-
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.Struct;
+
 public class ProduceResponse {
-    public class PartitionResponse {
-        public int partitionId;
-        public short errorCode;
-        public long baseOffset;
 
-        public PartitionResponse(int partitionId, short errorCode, long baseOffset) {
-            this.partitionId = partitionId;
-            this.errorCode = errorCode;
-            this.baseOffset = baseOffset;
-        }
-        @Override
-        public String toString() {
-            StringBuilder b = new StringBuilder();
-            b.append('{');
-            b.append("pid: " + partitionId);
-            b.append(",error: " + errorCode);
-            b.append(",offset: " + baseOffset);
-            b.append('}');
-            return b.toString();
-        }
-    }
+    private final Map<TopicPartition, PartitionResponse> responses;
 
-    private final Map<String, Map<TopicPartition, PartitionResponse>> responses;
+    public ProduceResponse() {
+        this.responses = new HashMap<TopicPartition, PartitionResponse>();
+    }
 
     public ProduceResponse(Struct struct) {
-        responses = new HashMap<String, Map<TopicPartition, PartitionResponse>>();
+        responses = new HashMap<TopicPartition, PartitionResponse>();
         for (Object topicResponse : (Object[]) struct.get("responses")) {
             Struct topicRespStruct = (Struct) topicResponse;
             String topic = (String) topicRespStruct.get("topic");
-            Map<TopicPartition, PartitionResponse> topicResponses = new HashMap<TopicPartition,
PartitionResponse>();
             for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses"))
{
                 Struct partRespStruct = (Struct) partResponse;
                 int partition = (Integer) partRespStruct.get("partition");
                 short errorCode = (Short) partRespStruct.get("error_code");
                 long offset = (Long) partRespStruct.get("base_offset");
                 TopicPartition tp = new TopicPartition(topic, partition);
-                topicResponses.put(tp, new PartitionResponse(partition, errorCode, offset));
+                responses.put(tp, new PartitionResponse(partition, errorCode, offset));
             }
-            responses.put(topic, topicResponses);
         }
     }
 
-    public Map<String, Map<TopicPartition, PartitionResponse>> responses() {
+    public void addResponse(TopicPartition tp, int partition, short error, long baseOffset)
{
+        this.responses.put(tp, new PartitionResponse(partition, error, baseOffset));
+    }
+
+    public Map<TopicPartition, PartitionResponse> responses() {
         return this.responses;
     }
 
@@ -70,16 +55,40 @@ public class ProduceResponse {
         StringBuilder b = new StringBuilder();
         b.append('{');
         boolean isFirst = true;
-        for (Map<TopicPartition, PartitionResponse> response : responses.values())
{
-            for (Map.Entry<TopicPartition, PartitionResponse> entry : response.entrySet())
{
-                if (isFirst)
-                    isFirst = false;
-                else
-                    b.append(',');
-                b.append(entry.getKey() + " : " + entry.getValue());
-            }
+        for (Map.Entry<TopicPartition, PartitionResponse> entry : responses.entrySet())
{
+            if (isFirst)
+                isFirst = false;
+            else
+                b.append(',');
+            b.append(entry.getKey() + " : " + entry.getValue());
         }
         b.append('}');
         return b.toString();
     }
+
+    public static class PartitionResponse {
+        public int partitionId;
+        public short errorCode;
+        public long baseOffset;
+
+        public PartitionResponse(int partitionId, short errorCode, long baseOffset) {
+            this.partitionId = partitionId;
+            this.errorCode = errorCode;
+            this.baseOffset = baseOffset;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder b = new StringBuilder();
+            b.append('{');
+            b.append("pid: ");
+            b.append(partitionId);
+            b.append(",error: ");
+            b.append(errorCode);
+            b.append(",offset: ");
+            b.append(baseOffset);
+            b.append('}');
+            return b.toString();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
new file mode 100644
index 0000000..aae8d4a
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -0,0 +1,96 @@
+package org.apache.kafka.clients;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * A mock network client for use testing code
+ */
+public class MockClient implements KafkaClient {
+
+    private final Time time;
+    private int correlation = 0;
+    private final Set<Integer> ready = new HashSet<Integer>();
+    private final Queue<ClientRequest> requests = new ArrayDeque<ClientRequest>();
+    private final Queue<ClientResponse> responses = new ArrayDeque<ClientResponse>();
+
+    public MockClient(Time time) {
+        this.time = time;
+    }
+
+    @Override
+    public boolean isReady(Node node, long now) {
+        return ready.contains(node.id());
+    }
+
+    @Override
+    public boolean ready(Node node, long now) {
+        boolean found = isReady(node, now);
+        ready.add(node.id());
+        return found;
+    }
+
+    public void disconnect(Integer node) {
+        Iterator<ClientRequest> iter = requests.iterator();
+        while (iter.hasNext()) {
+            ClientRequest request = iter.next();
+            if (request.request().destination() == node) {
+                responses.add(new ClientResponse(request, time.milliseconds(), true, null));
+                iter.remove();
+            }
+        }
+        ready.remove(node);
+    }
+
+    @Override
+    public List<ClientResponse> poll(List<ClientRequest> requests, long timeoutMs,
long now) {
+        this.requests.addAll(requests);
+        List<ClientResponse> copy = new ArrayList<ClientResponse>(this.responses);
+        this.responses.clear();
+        return copy;
+    }
+
+    public Queue<ClientRequest> requests() {
+        return this.requests;
+    }
+
+    public void respond(Struct body) {
+        ClientRequest request = requests.remove();
+        responses.add(new ClientResponse(request, time.milliseconds(), false, body));
+    }
+
+    @Override
+    public int inFlightRequestCount() {
+        return requests.size();
+    }
+
+    @Override
+    public RequestHeader nextRequestHeader(ApiKeys key) {
+        return new RequestHeader(key.id, "mock", correlation++);
+    }
+
+    @Override
+    public void wakeup() {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Node leastLoadedNode(long now) {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
new file mode 100644
index 0000000..6a3cdcc
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -0,0 +1,99 @@
+package org.apache.kafka.clients;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.kafka.clients.producer.internals.Metadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.MockSelector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NetworkClientTest {
+
+    private MockTime time = new MockTime();
+    private MockSelector selector = new MockSelector(time);
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+    private int nodeId = 1;
+    private Cluster cluster = TestUtils.singletonCluster("test", nodeId);
+    private Node node = cluster.nodes().get(0);
+    private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
0, 64 * 1024, 64 * 1024);
+
+    @Before
+    public void setup() {
+        metadata.update(cluster, time.milliseconds());
+    }
+
+    @Test
+    public void testReadyAndDisconnect() {
+        List<ClientRequest> reqs = new ArrayList<ClientRequest>();
+        assertFalse("Client begins unready as it has no connection.", client.ready(node,
time.milliseconds()));
+        assertEquals("The connection is established as a side-effect of the readiness check",
1, selector.connected().size());
+        client.poll(reqs, 1, time.milliseconds());
+        selector.clear();
+        assertTrue("Now the client is ready", client.ready(node, time.milliseconds()));
+        selector.disconnect(node.id());
+        client.poll(reqs, 1, time.milliseconds());
+        selector.clear();
+        assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node,
time.milliseconds()));
+        assertTrue("Metadata should get updated.", metadata.needsUpdate(time.milliseconds()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testSendToUnreadyNode() {
+        RequestSend send = new RequestSend(5,
+                                           client.nextRequestHeader(ApiKeys.METADATA),
+                                           new MetadataRequest(Arrays.asList("test")).toStruct());
+        ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null);
+        client.poll(Arrays.asList(request), 1, time.milliseconds());
+    }
+
+    @Test
+    public void testSimpleRequestResponse() {
+        ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000);
+        RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
+        RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct());
+        ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
+        awaitReady(client, node);
+        client.poll(Arrays.asList(request), 1, time.milliseconds());
+        assertEquals(1, client.inFlightRequestCount());
+        ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId());
+        Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
+        resp.set("responses", new Object[0]);
+        int size = respHeader.sizeOf() + resp.sizeOf();
+        ByteBuffer buffer = ByteBuffer.allocate(size);
+        respHeader.writeTo(buffer);
+        resp.writeTo(buffer);
+        buffer.flip();
+        selector.completeReceive(new NetworkReceive(node.id(), buffer));
+        List<ClientResponse> responses = client.poll(new ArrayList<ClientRequest>(),
1, time.milliseconds());
+        assertEquals(1, responses.size());
+        ClientResponse response = responses.get(0);
+        assertTrue("Should have a response body.", response.hasResponse());
+        assertEquals("Should be correlated to the original request", request, response.request());
+    }
+
+    private void awaitReady(NetworkClient client, Node node) {
+        while (!client.ready(node, time.milliseconds()))
+            client.poll(new ArrayList<ClientRequest>(), 1, time.milliseconds());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index c4072ae..93b58d0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -17,7 +17,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.RecordBatch;

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 3ef692c..5489aca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -16,62 +16,48 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.requests.RequestSend;
-import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.test.MockSelector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
 public class SenderTest {
 
-    private static final String CLIENT_ID = "";
     private static final int MAX_REQUEST_SIZE = 1024 * 1024;
-    private static final long RECONNECT_BACKOFF_MS = 0L;
     private static final short ACKS_ALL = -1;
     private static final int MAX_RETRIES = 0;
     private static final int REQUEST_TIMEOUT_MS = 10000;
-    private static final int SEND_BUFFER_SIZE = 64 * 1024;
-    private static final int RECEIVE_BUFFER_SIZE = 64 * 1024;
-    private static final int MAX_IN_FLIGHT_REQS = Integer.MAX_VALUE;
 
     private TopicPartition tp = new TopicPartition("test", 0);
     private MockTime time = new MockTime();
-    private MockSelector selector = new MockSelector(time);
+    private MockClient client = new MockClient(time);
     private int batchSize = 16 * 1024;
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = new Metrics(time);
     private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024,
0L, 0L, false, metrics, time);
-    private Sender sender = new Sender(selector,
+    private Sender sender = new Sender(client,
                                        metadata,
                                        this.accumulator,
-                                       CLIENT_ID,
                                        MAX_REQUEST_SIZE,
-                                       RECONNECT_BACKOFF_MS,
                                        ACKS_ALL,
                                        MAX_RETRIES,
                                        REQUEST_TIMEOUT_MS,
-                                       SEND_BUFFER_SIZE,
-                                       RECEIVE_BUFFER_SIZE,
-                                       MAX_IN_FLIGHT_REQS,
                                        metrics,
                                        time);
 
@@ -82,21 +68,14 @@ public class SenderTest {
 
     @Test
     public void testSimple() throws Exception {
+        int offset = 0;
         Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
CompressionType.NONE, null);
+        sender.run(time.milliseconds()); // connect
+        sender.run(time.milliseconds()); // send produce request
+        assertEquals("We should have a single produce request in flight.", 1, client.inFlightRequestCount());
+        client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
         sender.run(time.milliseconds());
-        assertEquals("We should have connected", 1, selector.connected().size());
-        selector.clear();
-        sender.run(time.milliseconds());
-        assertEquals("Single request should be sent", 1, selector.completedSends().size());
-        RequestSend request = (RequestSend) selector.completedSends().get(0);
-        selector.clear();
-        long offset = 42;
-        selector.completeReceive(produceResponse(request.header().correlationId(),
-                                                 cluster.leaderFor(tp).id(),
-                                                 tp.topic(),
-                                                 tp.partition(),
-                                                 offset,
-                                                 Errors.NONE.code()));
+        assertEquals("All requests completed.", offset, client.inFlightRequestCount());
         sender.run(time.milliseconds());
         assertTrue("Request should be completed", future.isDone());
         assertEquals(offset, future.get().offset());
@@ -106,69 +85,43 @@ public class SenderTest {
     public void testRetries() throws Exception {
         // create a sender with retries = 1
         int maxRetries = 1;
-        Sender sender = new Sender(selector,
+        Sender sender = new Sender(client,
                                    metadata,
                                    this.accumulator,
-                                   CLIENT_ID,
                                    MAX_REQUEST_SIZE,
-                                   RECONNECT_BACKOFF_MS,
                                    ACKS_ALL,
                                    maxRetries,
                                    REQUEST_TIMEOUT_MS,
-                                   SEND_BUFFER_SIZE,
-                                   RECEIVE_BUFFER_SIZE,
-                                   MAX_IN_FLIGHT_REQS,
                                    new Metrics(),
                                    time);
+        // do a successful retry
         Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
CompressionType.NONE, null);
-        RequestSend request1 = completeSend(sender);
-        selector.clear();
-        selector.completeReceive(produceResponse(request1.header().correlationId(),
-                                                 cluster.leaderFor(tp).id(),
-                                                 tp.topic(),
-                                                 tp.partition(),
-                                                 -1,
-                                                 Errors.REQUEST_TIMED_OUT.code()));
-        sender.run(time.milliseconds());
-        selector.clear();
-        sender.run(time.milliseconds());
-        RequestSend request2 = completeSend(sender);
-        selector.completeReceive(produceResponse(request2.header().correlationId(),
-                                                 cluster.leaderFor(tp).id(),
-                                                 tp.topic(),
-                                                 tp.partition(),
-                                                 42,
-                                                 Errors.NONE.code()));
+        sender.run(time.milliseconds()); // connect
+        sender.run(time.milliseconds()); // send produce request
+        assertEquals(1, client.inFlightRequestCount());
+        client.disconnect(client.requests().peek().request().destination());
+        assertEquals(0, client.inFlightRequestCount());
+        sender.run(time.milliseconds()); // receive error
+        sender.run(time.milliseconds()); // reconnect
+        sender.run(time.milliseconds()); // resend
+        assertEquals(1, client.inFlightRequestCount());
+        int offset = 0;
+        client.respond(produceResponse(tp.topic(), tp.partition(), offset, Errors.NONE.code()));
         sender.run(time.milliseconds());
-        assertTrue("Request should retry and complete", future.isDone());
-        assertEquals(42, future.get().offset());
-    }
-
-    @Test
-    public void testMetadataRefreshOnNoLeaderException() throws Exception {
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
CompressionType.NONE, null);
-        RequestSend request = completeSend();
-        selector.clear();
-        selector.completeReceive(produceResponse(request.header().correlationId(),
-                                                 cluster.leaderFor(tp).id(),
-                                                 tp.topic(),
-                                                 tp.partition(),
-                                                 -1,
-                                                 Errors.NOT_LEADER_FOR_PARTITION.code()));
-        sender.run(time.milliseconds());
-        completedWithError(future, Errors.NOT_LEADER_FOR_PARTITION);
-        assertTrue("Error triggers a metadata update.", metadata.needsUpdate(time.milliseconds()));
-    }
+        assertTrue("Request should have retried and completed", future.isDone());
+        assertEquals(offset, future.get().offset());
 
-    @Test
-    public void testMetadataRefreshOnDisconnect() throws Exception {
-        Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
CompressionType.NONE, null);
-        completeSend();
-        selector.clear();
-        selector.disconnect(cluster.leaderFor(tp).id());
+        // do an unsuccessful retry
+        future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE,
null);
+        sender.run(time.milliseconds()); // send produce request
+        for (int i = 0; i < maxRetries + 1; i++) {
+            client.disconnect(client.requests().peek().request().destination());
+            sender.run(time.milliseconds()); // receive error
+            sender.run(time.milliseconds()); // reconnect
+            sender.run(time.milliseconds()); // resend
+        }
         sender.run(time.milliseconds());
         completedWithError(future, Errors.NETWORK_EXCEPTION);
-        assertTrue("The disconnection triggers a metadata update.", metadata.needsUpdate(time.milliseconds()));
     }
 
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws
Exception {
@@ -181,17 +134,7 @@ public class SenderTest {
         }
     }
 
-    private RequestSend completeSend() {
-        return completeSend(sender);
-    }
-
-    private RequestSend completeSend(Sender sender) {
-        while (selector.completedSends().size() == 0)
-            sender.run(time.milliseconds());
-        return (RequestSend) selector.completedSends().get(0);
-    }
-
-    private NetworkReceive produceResponse(int correlation, int source, String topic, int
part, long offset, int error) {
+    private Struct produceResponse(String topic, int part, long offset, int error) {
         Struct struct = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
         Struct response = struct.instance("responses");
         response.set("topic", topic);
@@ -201,12 +144,7 @@ public class SenderTest {
         partResp.set("base_offset", offset);
         response.set("partition_responses", new Object[] { partResp });
         struct.set("responses", new Object[] { response });
-        ResponseHeader header = new ResponseHeader(correlation);
-        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + struct.sizeOf());
-        header.writeTo(buffer);
-        struct.writeTo(buffer);
-        buffer.rewind();
-        return new NetworkReceive(source, buffer);
+        return struct;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index e4e0a04..19bea0f 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -213,7 +213,7 @@ public class MetricsTest {
         public double value = 0.0;
 
         @Override
-        public double measure(MetricConfig config, long nowMs) {
+        public double measure(MetricConfig config, long now) {
             return value;
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
index cda8e64..eb7fcf0 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
@@ -1,25 +1,22 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.utils;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.utils.Time;
-
+/**
+ * A clock that you can manually advance by calling sleep
+ */
 public class MockTime implements Time {
 
     private long nanos = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index b9405cf..d146444 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -127,7 +127,7 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
    * With non-exist-topic the future metadata should return ExecutionException caused by
TimeoutException
    */
   @Test
-  def testNonExistTopic() {
+  def testNonExistentTopic() {
     // send a record with non-exist topic
     val record = new ProducerRecord(topic2, null, "key".getBytes, "value".getBytes)
     intercept[ExecutionException] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 12f8045..57b2bd5 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -379,6 +379,7 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString)
     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000")
+    producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200")
     return new KafkaProducer(producerProps)
   }
 


Mime
View raw message