http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b7645dd..097b4fc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -12,31 +12,19 @@
*/
package org.apache.kafka.clients.producer.internals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.ProduceResponse;
@@ -46,6 +34,17 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class SenderTest {
private static final int MAX_REQUEST_SIZE = 1024 * 1024;
@@ -69,7 +68,7 @@ public class SenderTest {
@Before
public void setup() {
- Map<String, String> metricTags = new LinkedHashMap<String, String>();
+ Map<String, String> metricTags = new LinkedHashMap<>();
metricTags.put("client-id", CLIENT_ID);
MetricConfig metricConfig = new MetricConfig().tags(metricTags);
metrics = new Metrics(metricConfig, time);
@@ -147,7 +146,7 @@ public class SenderTest {
Future<RecordMetadata> future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
- String id = client.requests().peek().request().destination();
+ String id = client.requests().peek().destination();
Node node = new Node(Integer.valueOf(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue("Client ready status should be true", client.isReady(node, 0L));
@@ -168,7 +167,7 @@ public class SenderTest {
future = accumulator.append(tp, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
sender.run(time.milliseconds()); // send produce request
for (int i = 0; i < maxRetries + 1; i++) {
- client.disconnect(client.requests().peek().request().destination());
+ client.disconnect(client.requests().peek().destination());
sender.run(time.milliseconds()); // receive error
sender.run(time.milliseconds()); // reconnect
sender.run(time.milliseconds()); // resend
@@ -205,8 +204,8 @@ public class SenderTest {
accumulator.append(tp2, 0L, "key1".getBytes(), "value1".getBytes(), null, MAX_BLOCK_TIMEOUT);
sender.run(time.milliseconds()); // connect
sender.run(time.milliseconds()); // send produce request
- String id = client.requests().peek().request().destination();
- assertEquals(ApiKeys.PRODUCE.id, client.requests().peek().request().header().apiKey());
+ String id = client.requests().peek().destination();
+ assertEquals(ApiKeys.PRODUCE.id, client.requests().peek().header().apiKey());
Node node = new Node(Integer.valueOf(id), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertTrue("Client ready status should be true", client.isReady(node, 0L));
@@ -272,11 +271,10 @@ public class SenderTest {
}
}
- private Struct produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) {
+ private ProduceResponse produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short) error, offset, Record.NO_TIMESTAMP);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
- ProduceResponse response = new ProduceResponse(partResp, throttleTimeMs);
- return response.toStruct();
+ return new ProduceResponse(partResp, throttleTimeMs);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 30faac1..a5eb22a 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,15 +16,20 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.junit.Test;
+import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -36,6 +41,7 @@ import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class RequestResponseTest {
@@ -113,7 +119,6 @@ public class RequestResponseTest {
for (AbstractRequestResponse req : requestResponseList)
checkSerialization(req, null);
-
checkOlderFetchVersions();
checkSerialization(createMetadataResponse(0), 0);
checkSerialization(createMetadataResponse(1), 1);
@@ -180,7 +185,9 @@ public class RequestResponseTest {
@Test
public void fetchResponseVersionTest() {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
- responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
+
+ MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
+ responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records));
FetchResponse v0Response = new FetchResponse(responseData);
FetchResponse v1Response = new FetchResponse(responseData, 10);
@@ -193,6 +200,34 @@ public class RequestResponseTest {
}
@Test
+ public void verifyFetchResponseFullWrite() throws Exception {
+ FetchResponse fetchResponse = createFetchResponse();
+ RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, "client", 15);
+
+ Send send = fetchResponse.toSend("1", header);
+ ByteBufferChannel channel = new ByteBufferChannel(send.size());
+ send.writeTo(channel);
+ channel.close();
+
+ ByteBuffer buf = channel.buf;
+
+ // read the size
+ int size = buf.getInt();
+ assertTrue(size > 0);
+
+ // read the header
+ ResponseHeader responseHeader = ResponseHeader.parse(channel.buf);
+ assertEquals(header.correlationId(), responseHeader.correlationId());
+
+ // read the body
+ Struct responseBody = ProtoUtils.responseSchema(ApiKeys.FETCH.id, header.apiVersion()).read(buf);
+ FetchResponse parsedResponse = new FetchResponse(responseBody);
+ assertEquals(parsedResponse, fetchResponse);
+
+ assertEquals(size, responseHeader.sizeOf() + parsedResponse.sizeOf());
+ }
+
+ @Test
public void testControlledShutdownResponse() {
ControlledShutdownResponse response = createControlledShutdownResponse();
ByteBuffer buffer = ByteBuffer.allocate(response.sizeOf());
@@ -216,24 +251,24 @@ public class RequestResponseTest {
assertEquals("", deserialized.clientId()); // null is defaulted to ""
}
- private AbstractRequestResponse createRequestHeader() {
+ private RequestHeader createRequestHeader() {
return new RequestHeader((short) 10, (short) 1, "", 10);
}
- private AbstractRequestResponse createResponseHeader() {
+ private ResponseHeader createResponseHeader() {
return new ResponseHeader(10);
}
- private AbstractRequest createGroupCoordinatorRequest() {
+ private GroupCoordinatorRequest createGroupCoordinatorRequest() {
return new GroupCoordinatorRequest("test-group");
}
- private AbstractRequestResponse createGroupCoordinatorResponse() {
+ private GroupCoordinatorResponse createGroupCoordinatorResponse() {
return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
}
@SuppressWarnings("deprecation")
- private AbstractRequest createFetchRequest(int version) {
+ private FetchRequest createFetchRequest(int version) {
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetchData = new LinkedHashMap<>();
fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
@@ -243,22 +278,23 @@ public class RequestResponseTest {
return new FetchRequest(100, 1000, 1000000, fetchData);
}
- private AbstractRequestResponse createFetchResponse() {
+ private FetchResponse createFetchResponse() {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
- responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
- return new FetchResponse(responseData, 0);
+ MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
+ responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, records));
+ return new FetchResponse(responseData, 25);
}
- private AbstractRequest createHeartBeatRequest() {
+ private HeartbeatRequest createHeartBeatRequest() {
return new HeartbeatRequest("group1", 1, "consumer1");
}
- private AbstractRequestResponse createHeartBeatResponse() {
+ private HeartbeatResponse createHeartBeatResponse() {
return new HeartbeatResponse(Errors.NONE.code());
}
@SuppressWarnings("deprecation")
- private AbstractRequest createJoinGroupRequest(int version) {
+ private JoinGroupRequest createJoinGroupRequest(int version) {
ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
List<JoinGroupRequest.ProtocolMetadata> protocols = new ArrayList<>();
protocols.add(new JoinGroupRequest.ProtocolMetadata("consumer-range", metadata));
@@ -269,27 +305,27 @@ public class RequestResponseTest {
}
}
- private AbstractRequestResponse createJoinGroupResponse() {
+ private JoinGroupResponse createJoinGroupResponse() {
Map<String, ByteBuffer> members = new HashMap<>();
members.put("consumer1", ByteBuffer.wrap(new byte[]{}));
members.put("consumer2", ByteBuffer.wrap(new byte[]{}));
return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members);
}
- private AbstractRequest createListGroupsRequest() {
+ private ListGroupsRequest createListGroupsRequest() {
return new ListGroupsRequest();
}
- private AbstractRequestResponse createListGroupsResponse() {
+ private ListGroupsResponse createListGroupsResponse() {
List<ListGroupsResponse.Group> groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer"));
return new ListGroupsResponse(Errors.NONE.code(), groups);
}
- private AbstractRequest createDescribeGroupRequest() {
+ private DescribeGroupsRequest createDescribeGroupRequest() {
return new DescribeGroupsRequest(Collections.singletonList("test-group"));
}
- private AbstractRequestResponse createDescribeGroupResponse() {
+ private DescribeGroupsResponse createDescribeGroupResponse() {
String clientId = "consumer-1";
String clientHost = "localhost";
ByteBuffer empty = ByteBuffer.allocate(0);
@@ -300,16 +336,16 @@ public class RequestResponseTest {
return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
}
- private AbstractRequest createLeaveGroupRequest() {
+ private LeaveGroupRequest createLeaveGroupRequest() {
return new LeaveGroupRequest("group1", "consumer1");
}
- private AbstractRequestResponse createLeaveGroupResponse() {
+ private LeaveGroupResponse createLeaveGroupResponse() {
return new LeaveGroupResponse(Errors.NONE.code());
}
@SuppressWarnings("deprecation")
- private AbstractRequest createListOffsetRequest(int version) {
+ private ListOffsetRequest createListOffsetRequest(int version) {
if (version == 0) {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<>();
offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10));
@@ -324,7 +360,7 @@ public class RequestResponseTest {
}
@SuppressWarnings("deprecation")
- private AbstractRequestResponse createListOffsetResponse(int version) {
+ private ListOffsetResponse createListOffsetResponse(int version) {
if (version == 0) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
@@ -338,11 +374,11 @@ public class RequestResponseTest {
}
}
- private AbstractRequest createMetadataRequest(List<String> topics) {
+ private MetadataRequest createMetadataRequest(List<String> topics) {
return new MetadataRequest(topics);
}
- private AbstractRequestResponse createMetadataResponse(int version) {
+ private MetadataResponse createMetadataResponse(int version) {
Node node = new Node(1, "host1", 1001);
List<Node> replicas = Arrays.asList(node);
List<Node> isr = Arrays.asList(node);
@@ -357,7 +393,7 @@ public class RequestResponseTest {
}
@SuppressWarnings("deprecation")
- private AbstractRequest createOffsetCommitRequest(int version) {
+ private OffsetCommitRequest createOffsetCommitRequest(int version) {
Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, ""));
commitData.put(new TopicPartition("test", 1), new OffsetCommitRequest.PartitionData(200, null));
@@ -371,47 +407,47 @@ public class RequestResponseTest {
throw new IllegalArgumentException("Unknown offset commit request version " + version);
}
- private AbstractRequestResponse createOffsetCommitResponse() {
+ private OffsetCommitResponse createOffsetCommitResponse() {
Map<TopicPartition, Short> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), Errors.NONE.code());
return new OffsetCommitResponse(responseData);
}
- private AbstractRequest createOffsetFetchRequest() {
+ private OffsetFetchRequest createOffsetFetchRequest() {
return new OffsetFetchRequest("group1", Arrays.asList(new TopicPartition("test11", 1)));
}
- private AbstractRequestResponse createOffsetFetchResponse() {
+ private OffsetFetchResponse createOffsetFetchResponse() {
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code()));
responseData.put(new TopicPartition("test", 1), new OffsetFetchResponse.PartitionData(100L, null, Errors.NONE.code()));
return new OffsetFetchResponse(responseData);
}
- private AbstractRequest createProduceRequest() {
- Map<TopicPartition, ByteBuffer> produceData = new HashMap<>();
- produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10));
+ private ProduceRequest createProduceRequest() {
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(ByteBuffer.allocate(10)));
return new ProduceRequest((short) 1, 5000, produceData);
}
- private AbstractRequestResponse createProduceResponse() {
+ private ProduceResponse createProduceResponse() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
return new ProduceResponse(responseData, 0);
}
- private AbstractRequest createStopReplicaRequest(boolean deletePartitions) {
+ private StopReplicaRequest createStopReplicaRequest(boolean deletePartitions) {
Set<TopicPartition> partitions = new HashSet<>(Arrays.asList(new TopicPartition("test", 0)));
return new StopReplicaRequest(0, 1, deletePartitions, partitions);
}
- private AbstractRequestResponse createStopReplicaResponse() {
+ private StopReplicaResponse createStopReplicaResponse() {
Map<TopicPartition, Short> responses = new HashMap<>();
responses.put(new TopicPartition("test", 0), Errors.NONE.code());
return new StopReplicaResponse(Errors.NONE.code(), responses);
}
- private AbstractRequest createControlledShutdownRequest() {
+ private ControlledShutdownRequest createControlledShutdownRequest() {
return new ControlledShutdownRequest(10);
}
@@ -423,7 +459,7 @@ public class RequestResponseTest {
return new ControlledShutdownResponse(Errors.NONE.code(), topicPartitions);
}
- private AbstractRequest createLeaderAndIsrRequest() {
+ private LeaderAndIsrRequest createLeaderAndIsrRequest() {
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = Arrays.asList(1, 2);
List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
@@ -442,14 +478,14 @@ public class RequestResponseTest {
return new LeaderAndIsrRequest(1, 10, partitionStates, leaders);
}
- private AbstractRequestResponse createLeaderAndIsrResponse() {
+ private LeaderAndIsrResponse createLeaderAndIsrResponse() {
Map<TopicPartition, Short> responses = new HashMap<>();
responses.put(new TopicPartition("test", 0), Errors.NONE.code());
return new LeaderAndIsrResponse(Errors.NONE.code(), responses);
}
@SuppressWarnings("deprecation")
- private AbstractRequest createUpdateMetadataRequest(int version, String rack) {
+ private UpdateMetadataRequest createUpdateMetadataRequest(int version, String rack) {
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = Arrays.asList(1, 2);
List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
@@ -482,28 +518,28 @@ public class RequestResponseTest {
}
}
- private AbstractRequestResponse createUpdateMetadataResponse() {
+ private UpdateMetadataResponse createUpdateMetadataResponse() {
return new UpdateMetadataResponse(Errors.NONE.code());
}
- private AbstractRequest createSaslHandshakeRequest() {
+ private SaslHandshakeRequest createSaslHandshakeRequest() {
return new SaslHandshakeRequest("PLAIN");
}
- private AbstractRequestResponse createSaslHandshakeResponse() {
+ private SaslHandshakeResponse createSaslHandshakeResponse() {
return new SaslHandshakeResponse(Errors.NONE.code(), Collections.singletonList("GSSAPI"));
}
- private AbstractRequest createApiVersionRequest() {
+ private ApiVersionsRequest createApiVersionRequest() {
return new ApiVersionsRequest();
}
- private AbstractRequestResponse createApiVersionResponse() {
+ private ApiVersionsResponse createApiVersionResponse() {
List<ApiVersionsResponse.ApiVersion> apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2));
return new ApiVersionsResponse(Errors.NONE.code(), apiVersions);
}
- private AbstractRequest createCreateTopicRequest() {
+ private CreateTopicsRequest createCreateTopicRequest() {
CreateTopicsRequest.TopicDetails request1 = new CreateTopicsRequest.TopicDetails(3, (short) 5);
Map<Integer, List<Integer>> replicaAssignments = new HashMap<>();
@@ -521,21 +557,65 @@ public class RequestResponseTest {
return new CreateTopicsRequest(request, 0);
}
- private AbstractRequestResponse createCreateTopicResponse() {
+ private CreateTopicsResponse createCreateTopicResponse() {
Map<String, Errors> errors = new HashMap<>();
errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION);
errors.put("t2", Errors.LEADER_NOT_AVAILABLE);
return new CreateTopicsResponse(errors);
}
- private AbstractRequest createDeleteTopicsRequest() {
+ private DeleteTopicsRequest createDeleteTopicsRequest() {
return new DeleteTopicsRequest(new HashSet<>(Arrays.asList("my_t1", "my_t2")), 10000);
}
- private AbstractRequestResponse createDeleteTopicsResponse() {
+ private DeleteTopicsResponse createDeleteTopicsResponse() {
Map<String, Errors> errors = new HashMap<>();
errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION);
errors.put("t2", Errors.TOPIC_AUTHORIZATION_FAILED);
return new DeleteTopicsResponse(errors);
}
+
+ private static class ByteBufferChannel implements GatheringByteChannel {
+ private final ByteBuffer buf;
+ private boolean closed = false;
+
+ private ByteBufferChannel(long size) {
+ this.buf = ByteBuffer.allocate(Long.valueOf(size).intValue());
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+ int position = buf.position();
+ for (int i = 0; i < length; i++) {
+ ByteBuffer src = srcs[i].duplicate();
+ if (i == 0)
+ src.position(offset);
+ buf.put(src);
+ }
+ return buf.position() - position;
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs) throws IOException {
+ return write(srcs, 0, srcs.length);
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ int position = buf.position();
+ buf.put(src);
+ return buf.position() - position;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return !closed;
+ }
+
+ @Override
+ public void close() throws IOException {
+ buf.flip();
+ closed = true;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 97fe3d8..27c5695 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -12,18 +12,6 @@
*/
package org.apache.kafka.common.security.authenticator;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
@@ -36,17 +24,17 @@ import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Protocol;
import org.apache.kafka.common.protocol.SecurityProtocol;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.AbstractRequestResponse;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestHeader;
-import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
@@ -55,6 +43,18 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
/**
* Tests for the Sasl authenticator. These use a test harness that runs a simple socket server that echos back responses.
*/
@@ -261,7 +261,8 @@ public class SaslAuthenticatorTest {
String node = "1";
createClientConnection(SecurityProtocol.PLAINTEXT, node);
RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS.id, Short.MAX_VALUE, "someclient", 1);
- selector.send(new NetworkSend(node, RequestSend.serialize(header, new ApiVersionsRequest().toStruct())));
+ ApiVersionsRequest request = new ApiVersionsRequest();
+ selector.send(request.toSend(node, header));
ByteBuffer responseBuffer = waitForResponse();
ResponseHeader.parse(responseBuffer);
ApiVersionsResponse response = ApiVersionsResponse.parse(responseBuffer);
@@ -291,8 +292,9 @@ public class SaslAuthenticatorTest {
// Send ApiVersionsRequest and validate error response.
String node1 = "invalid1";
createClientConnection(SecurityProtocol.PLAINTEXT, node1);
+ SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, Short.MAX_VALUE, "someclient", 2);
- selector.send(new NetworkSend(node1, RequestSend.serialize(header, new SaslHandshakeRequest("PLAIN").toStruct())));
+ selector.send(request.toSend(node1, header));
NetworkTestUtils.waitForChannelClose(selector, node1);
selector.close();
@@ -355,8 +357,9 @@ public class SaslAuthenticatorTest {
createClientConnection(SecurityProtocol.PLAINTEXT, node1);
sendHandshakeRequestReceiveResponse(node1);
+ ApiVersionsRequest request = new ApiVersionsRequest();
RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id, "someclient", 2);
- selector.send(new NetworkSend(node1, RequestSend.serialize(versionsHeader, new ApiVersionsRequest().toStruct())));
+ selector.send(request.toSend(node1, versionsHeader));
NetworkTestUtils.waitForChannelClose(selector, node1);
selector.close();
@@ -420,7 +423,7 @@ public class SaslAuthenticatorTest {
createClientConnection(SecurityProtocol.PLAINTEXT, node1);
RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id, "someclient", 1);
MetadataRequest metadataRequest1 = new MetadataRequest(Collections.singletonList("sometopic"));
- selector.send(new NetworkSend(node1, RequestSend.serialize(metadataRequestHeader1, metadataRequest1.toStruct())));
+ selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
NetworkTestUtils.waitForChannelClose(selector, node1);
selector.close();
@@ -433,7 +436,7 @@ public class SaslAuthenticatorTest {
sendHandshakeRequestReceiveResponse(node2);
RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id, "someclient", 2);
MetadataRequest metadataRequest2 = new MetadataRequest(Collections.singletonList("sometopic"));
- selector.send(new NetworkSend(node2, RequestSend.serialize(metadataRequestHeader2, metadataRequest2.toStruct())));
+ selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
NetworkTestUtils.waitForChannelClose(selector, node2);
selector.close();
@@ -581,25 +584,24 @@ public class SaslAuthenticatorTest {
selector = null;
}
- private Struct sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequestResponse request) throws IOException {
+ private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException {
RequestHeader header = new RequestHeader(apiKey.id, "someclient", 1);
- selector.send(new NetworkSend(node, RequestSend.serialize(header, request.toStruct())));
+ Send send = request.toSend(node, header);
+ selector.send(send);
ByteBuffer responseBuffer = waitForResponse();
return NetworkClient.parseResponse(responseBuffer, header);
}
private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node) throws Exception {
SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest("PLAIN");
- Struct responseStruct = sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest);
- SaslHandshakeResponse response = new SaslHandshakeResponse(responseStruct);
+ SaslHandshakeResponse response = (SaslHandshakeResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest);
assertEquals(Errors.NONE.code(), response.errorCode());
return response;
}
private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception {
ApiVersionsRequest handshakeRequest = new ApiVersionsRequest();
- Struct responseStruct = sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest);
- ApiVersionsResponse response = new ApiVersionsResponse(responseStruct);
+ ApiVersionsResponse response = (ApiVersionsResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, handshakeRequest);
assertEquals(Errors.NONE.code(), response.errorCode());
return response;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 3bfa83f..3aff8f2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -16,7 +16,6 @@
**/
package org.apache.kafka.connect.runtime.distributed;
-import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
@@ -24,7 +23,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.JoinGroupResponse;
@@ -172,7 +171,7 @@ public class WorkerCoordinatorTest {
final String consumerId = "leader";
- client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
// normal join group
@@ -182,8 +181,8 @@ public class WorkerCoordinatorTest {
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE.code()));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.memberId().equals(consumerId) &&
sync.generationId() == 1 &&
sync.groupAssignment().containsKey(consumerId);
@@ -212,15 +211,15 @@ public class WorkerCoordinatorTest {
final String memberId = "member";
- client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
// normal join group
client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
client.prepareResponse(new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.memberId().equals(memberId) &&
sync.generationId() == 1 &&
sync.groupAssignment().isEmpty();
@@ -253,15 +252,15 @@ public class WorkerCoordinatorTest {
final String memberId = "member";
- client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
// config mismatch results in assignment error
client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE.code()));
MockClient.RequestMatcher matcher = new MockClient.RequestMatcher() {
@Override
- public boolean matches(ClientRequest request) {
- SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+ public boolean matches(AbstractRequest body) {
+ SyncGroupRequest sync = (SyncGroupRequest) body;
return sync.memberId().equals(memberId) &&
sync.generationId() == 1 &&
sync.groupAssignment().isEmpty();
@@ -284,7 +283,7 @@ public class WorkerCoordinatorTest {
PowerMock.replayAll();
- client.prepareResponse(groupMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
coordinator.ensureCoordinatorReady();
// join the group once
@@ -392,12 +391,11 @@ public class WorkerCoordinatorTest {
}
- private Struct groupMetadataResponse(Node node, short error) {
- GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
- return response.toStruct();
+ private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short error) {
+ return new GroupCoordinatorResponse(error, node);
}
- private Struct joinGroupLeaderResponse(int generationId, String memberId,
+ private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,
Map<String, Long> configOffsets, short error) {
Map<String, ByteBuffer> metadata = new HashMap<>();
for (Map.Entry<String, Long> configStateEntry : configOffsets.entrySet()) {
@@ -407,22 +405,21 @@ public class WorkerCoordinatorTest {
ByteBuffer buf = ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(memberUrl, configOffset));
metadata.put(configStateEntry.getKey(), buf);
}
- return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata).toStruct();
+ return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, memberId, metadata);
}
- private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
+ private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
return new JoinGroupResponse(error, generationId, WorkerCoordinator.DEFAULT_SUBPROTOCOL, memberId, leaderId,
- Collections.<String, ByteBuffer>emptyMap()).toStruct();
+ Collections.<String, ByteBuffer>emptyMap());
}
- private Struct syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds,
+ private SyncGroupResponse syncGroupResponse(short assignmentError, String leader, long configOffset, List<String> connectorIds,
List<ConnectorTaskId> taskIds, short error) {
ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(assignmentError, leader, LEADER_URL, configOffset, connectorIds, taskIds);
ByteBuffer buf = ConnectProtocol.serializeAssignment(assignment);
- return new SyncGroupResponse(error, buf).toStruct();
+ return new SyncGroupResponse(error, buf);
}
-
private static class MockRebalanceListener implements WorkerRebalanceListener {
public ConnectProtocol.Assignment assignment = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 1179557..592fecf 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -42,7 +42,7 @@ class AdminClient(val time: Time,
private def send(target: Node,
api: ApiKeys,
- request: AbstractRequest): Struct = {
+ request: AbstractRequest): AbstractResponse = {
var future: RequestFuture[ClientResponse] = null
future = client.send(target, api, request)
@@ -54,9 +54,8 @@ class AdminClient(val time: Time,
throw future.exception()
}
- private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = {
- bootstrapBrokers.foreach {
- case broker =>
+ private def sendAnyNode(api: ApiKeys, request: AbstractRequest): AbstractResponse = {
+ bootstrapBrokers.foreach { broker =>
try {
return send(broker, api, request)
} catch {
@@ -69,23 +68,20 @@ class AdminClient(val time: Time,
private def findCoordinator(groupId: String): Node = {
val request = new GroupCoordinatorRequest(groupId)
- val responseBody = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request)
- val response = new GroupCoordinatorResponse(responseBody)
+ val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, request).asInstanceOf[GroupCoordinatorResponse]
Errors.forCode(response.errorCode()).maybeThrow()
response.node()
}
def listGroups(node: Node): List[GroupOverview] = {
- val responseBody = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest())
- val response = new ListGroupsResponse(responseBody)
+ val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest()).asInstanceOf[ListGroupsResponse]
Errors.forCode(response.errorCode()).maybeThrow()
response.groups().map(group => GroupOverview(group.groupId(), group.protocolType())).toList
}
private def findAllBrokers(): List[Node] = {
val request = new MetadataRequest(List[String]())
- val responseBody = sendAnyNode(ApiKeys.METADATA, request)
- val response = new MetadataResponse(responseBody)
+ val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
val errors = response.errors()
if (!errors.isEmpty)
debug(s"Metadata request contained errors: $errors")
@@ -140,7 +136,7 @@ class AdminClient(val time: Time,
def describeConsumerGroup(groupId: String): ConsumerGroupSummary = {
val coordinator = findCoordinator(groupId)
val responseBody = send(coordinator, ApiKeys.DESCRIBE_GROUPS, new DescribeGroupsRequest(List(groupId).asJava))
- val response = new DescribeGroupsResponse(responseBody)
+ val response = responseBody.asInstanceOf[DescribeGroupsResponse]
val metadata = response.groups.get(groupId)
if (metadata == null)
throw new KafkaException(s"Response from broker contained no metadata for group $groupId")
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 57e99c1..00897db 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -199,8 +199,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
val fetchResponsePartitionData = requestInfo.map { case (topicAndPartition, _) =>
(topicAndPartition, FetchResponsePartitionData(Errors.forException(e).code, -1, MessageSet.Empty))
}
- val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
- val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId)
+ val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, request.header.apiVersion)
// Magic value does not matter here because the message set is empty
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index d99bbcd..d31d4ba 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -76,7 +76,8 @@ class PartitionDataSend(val partitionId: Int,
written += channel.write(buffer)
if (!buffer.hasRemaining) {
if (messagesSentSize < messageSize) {
- val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)
+ val records = partitionData.messages.asRecords
+ val bytesSent = records.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
messagesSentSize += bytesSent
written += bytesSent
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
index cb5b95e..3783c29 100644
--- a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
+++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
@@ -14,13 +14,14 @@
package kafka.api
import java.nio.ByteBuffer
-import org.apache.kafka.common.requests.AbstractRequestResponse
+
import kafka.api.ApiUtils._
+import org.apache.kafka.common.requests.AbstractResponse
private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
val correlationId: Int,
val clientId: String,
- val body: AbstractRequestResponse,
+ val body: AbstractResponse,
val name: String,
override val requestId: Option[Short] = None)
extends RequestOrResponse(requestId) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
index 2835fb6..be0c080 100644
--- a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
+++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
@@ -14,10 +14,11 @@
package kafka.api
import java.nio.ByteBuffer
-import org.apache.kafka.common.requests.AbstractRequestResponse
+
+import org.apache.kafka.common.requests.AbstractResponse
private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
- val body: AbstractRequestResponse,
+ val body: AbstractResponse,
val name: String,
override val requestId: Option[Short] = None)
extends RequestOrResponse(requestId) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 3ca7bd7..f6e4475 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -128,8 +128,8 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
describe(true)
}
- override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) {
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ if(request.body.asInstanceOf[org.apache.kafka.common.requests.ProduceRequest].acks == 0) {
requestChannel.closeConnection(request.processor, request)
}
else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/api/RequestOrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index 65b37fd..d013047 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -42,6 +42,6 @@ abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Lo
* This API has no meaning for a Response object.
* @param details If this is false, omit the parts of the request description that are proportional to the number of
* topics or partitions. This is mainly to control the amount of request logging. */
- def describe(details: Boolean):String
+ def describe(details: Boolean): String
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 03cd98c..1935ea2 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -42,7 +42,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
- controllerContext.liveBrokers.foreach(addNewBroker(_))
+ controllerContext.liveBrokers.foreach(addNewBroker)
def startup() = {
brokerLock synchronized {
@@ -56,7 +56,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
}
}
- def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) {
+ def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractResponse => Unit = null) {
brokerLock synchronized {
val stateInfoOpt = brokerStateInfo.get(brokerId)
stateInfoOpt match {
@@ -149,7 +149,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
}
}
-case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit)
+case class QueueItem(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractResponse => Unit)
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
@@ -185,28 +185,27 @@ class RequestSendThread(val controllerId: Int,
}
else {
val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
- val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
- val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
- clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
+ val clientRequest = new ClientRequest(brokerNode.idString, time.milliseconds(), true, requestHeader, request, null)
+ clientResponse = networkClient.blockingSendAndReceive(clientRequest, request)(time)
isSendSuccessful = true
}
} catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
"Reconnecting to broker.").format(controllerId, controllerContext.epoch,
- request.toString, brokerNode.toString()), e)
+ request.toString, brokerNode.toString), e)
networkClient.close(brokerNode.idString)
isSendSuccessful = false
backoff()
}
}
if (clientResponse != null) {
- val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
- case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
- case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
- case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
- case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
- }
+ val api = ApiKeys.forId(clientResponse.requestHeader.apiKey)
+ if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA_KEY)
+ throw new KafkaException(s"Unexpected apiKey received: $apiKey")
+
+ val response = clientResponse.responseBody
+
stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
.format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))
@@ -217,7 +216,7 @@ class RequestSendThread(val controllerId: Int,
}
} catch {
case e: Throwable =>
- error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)
+ error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString), e)
// If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
networkClient.close(brokerNode.idString)
}
@@ -230,13 +229,13 @@ class RequestSendThread(val controllerId: Int,
if (!networkClient.blockingReady(brokerNode, socketTimeoutMs)(time))
throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
- info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString()))
+ info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString))
}
true
} catch {
case e: Throwable =>
- warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, brokerNode.toString()), e)
+ warn("Controller %d's connection to broker %s was unsuccessful".format(controllerId, brokerNode.toString), e)
networkClient.close(brokerNode.idString)
false
}
@@ -273,7 +272,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
- replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) {
+ replicas: Seq[Int], callback: AbstractResponse => Unit = null) {
val topicPartition = new TopicPartition(topic, partition)
brokerIds.filter(_ >= 0).foreach { brokerId =>
@@ -286,13 +285,13 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
}
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
- callback: (AbstractRequestResponse, Int) => Unit = null) {
+ callback: (AbstractResponse, Int) => Unit = null) {
brokerIds.filter(b => b >= 0).foreach { brokerId =>
stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
val v = stopReplicaRequestMap(brokerId)
if(callback != null)
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
- deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId))
+ deletePartition, (r: AbstractResponse) => callback(r, brokerId))
else
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition)
@@ -302,7 +301,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
/** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
- callback: AbstractRequestResponse => Unit = null) {
+ callback: AbstractResponse => Unit = null) {
def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
@@ -444,29 +443,29 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
messageQueue: BlockingQueue[QueueItem],
requestSendThread: RequestSendThread)
-case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractRequestResponse => Unit = null)
+case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit = null)
-class Callbacks private (var leaderAndIsrResponseCallback: AbstractRequestResponse => Unit = null,
- var updateMetadataResponseCallback: AbstractRequestResponse => Unit = null,
- var stopReplicaResponseCallback: (AbstractRequestResponse, Int) => Unit = null)
+class Callbacks private (var leaderAndIsrResponseCallback: AbstractResponse => Unit = null,
+ var updateMetadataResponseCallback: AbstractResponse => Unit = null,
+ var stopReplicaResponseCallback: (AbstractResponse, Int) => Unit = null)
object Callbacks {
class CallbackBuilder {
- var leaderAndIsrResponseCbk: AbstractRequestResponse => Unit = null
- var updateMetadataResponseCbk: AbstractRequestResponse => Unit = null
- var stopReplicaResponseCbk: (AbstractRequestResponse, Int) => Unit = null
+ var leaderAndIsrResponseCbk: AbstractResponse => Unit = null
+ var updateMetadataResponseCbk: AbstractResponse => Unit = null
+ var stopReplicaResponseCbk: (AbstractResponse, Int) => Unit = null
- def leaderAndIsrCallback(cbk: AbstractRequestResponse => Unit): CallbackBuilder = {
+ def leaderAndIsrCallback(cbk: AbstractResponse => Unit): CallbackBuilder = {
leaderAndIsrResponseCbk = cbk
this
}
- def updateMetadataCallback(cbk: AbstractRequestResponse => Unit): CallbackBuilder = {
+ def updateMetadataCallback(cbk: AbstractResponse => Unit): CallbackBuilder = {
updateMetadataResponseCbk = cbk
this
}
- def stopReplicaCallback(cbk: (AbstractRequestResponse, Int) => Unit): CallbackBuilder = {
+ def stopReplicaCallback(cbk: (AbstractResponse, Int) => Unit): CallbackBuilder = {
stopReplicaResponseCbk = cbk
this
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4a94aad..730f07c 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -20,7 +20,7 @@ import java.util
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
import scala.collection._
import com.yammer.metrics.core.{Gauge, Meter}
@@ -39,7 +39,7 @@ import kafka.utils.CoreUtils._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
import java.util.concurrent.locks.ReentrantLock
@@ -136,7 +136,7 @@ object KafkaController extends Logging {
Json.parseFull(controllerInfoString) match {
case Some(m) =>
val controllerInfo = m.asInstanceOf[Map[String, Any]]
- controllerInfo.get("brokerid").get.asInstanceOf[Int]
+ controllerInfo("brokerid").asInstanceOf[Int]
case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
}
} catch {
@@ -686,7 +686,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
onControllerResignation()
}
- def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) = {
+ def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractResponse => Unit = null) = {
controllerContext.controllerChannelManager.sendRequest(brokerId, apiKey, apiVersion, request, callback)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 8e5f3a1..823f9b4 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -19,13 +19,14 @@ package kafka.controller
import kafka.server.ConfigType
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{StopReplicaResponse, AbstractRequestResponse}
+import org.apache.kafka.common.requests.{AbstractResponse, StopReplicaResponse}
import collection.mutable
import collection.JavaConverters._
-import kafka.utils.{ShutdownableThread, Logging}
+import kafka.utils.{Logging, ShutdownableThread}
import kafka.utils.CoreUtils._
import kafka.utils.ZkUtils._
+
import collection.Set
import kafka.common.TopicAndPartition
import java.util.concurrent.locks.ReentrantLock
@@ -378,7 +379,7 @@ class TopicDeletionManager(controller: KafkaController,
startReplicaDeletion(replicasPerPartition)
}
- private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {
+ private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractResponse, replicaId: Int) {
val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
val responseMap = stopReplicaResponse.responses.asScala
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index fd9ec5f..c33e376 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -26,9 +26,10 @@ import kafka.utils._
import kafka.message._
import kafka.common.KafkaException
import java.util.concurrent.TimeUnit
-import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import org.apache.kafka.common.errors.CorruptRecordException
-import org.apache.kafka.common.network.TransportLayer
+import org.apache.kafka.common.record.FileRecords
import org.apache.kafka.common.utils.Utils
import scala.collection.mutable.ArrayBuffer
@@ -48,7 +49,6 @@ class FileMessageSet private[kafka](@volatile var file: File,
private[log] val start: Int,
private[log] val end: Int,
isSlice: Boolean) extends MessageSet {
- import FileMessageSet._
/* the size of the message set in bytes */
private val _size =
if(isSlice)
@@ -126,6 +126,8 @@ class FileMessageSet private[kafka](@volatile var file: File,
})
}
+ override def asRecords: FileRecords = new FileRecords(file, channel, start, end, isSlice)
+
/**
* Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position and the size of the message (including log overhead) at the returned offset. If
@@ -206,31 +208,6 @@ class FileMessageSet private[kafka](@volatile var file: File,
}
/**
- * Write some of this set to the given channel.
- * @param destChannel The channel to write to.
- * @param writePosition The position in the message set to begin writing from.
- * @param size The maximum number of bytes to write
- * @return The number of bytes actually written.
- */
- def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
- // Ensure that the underlying size has not changed.
- val newSize = math.min(channel.size.toInt, end) - start
- if (newSize < _size.get()) {
- throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
- .format(file.getAbsolutePath, _size.get(), newSize))
- }
- val position = start + writePosition
- val count = math.min(size, sizeInBytes)
- val bytesTransferred = (destChannel match {
- case tl: TransportLayer => tl.transferFrom(channel, position, count)
- case dc => channel.transferTo(position, count, dc)
- }).toInt
- trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
- + " bytes requested for transfer : " + math.min(size, sizeInBytes))
- bytesTransferred
- }
-
- /**
* This method is called before we write messages to the socket using zero-copy transfer. We need to
* make sure all the messages in the message set have the expected magic value.
*
@@ -337,7 +314,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
// increment the location and return the item
location += size + sizeOffsetLength
- new MessageAndOffset(new Message(buffer), offset)
+ MessageAndOffset(new Message(buffer), offset)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index a33bc4b..096344d 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -26,7 +26,7 @@ import java.util.ArrayDeque
import kafka.message.ByteBufferMessageSet.FilterResult
import org.apache.kafka.common.errors.InvalidTimestampException
-import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.record.{MemoryRecords, TimestampType}
import org.apache.kafka.common.utils.Utils
import scala.collection.mutable
@@ -352,6 +352,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
def getBuffer = buffer
+ override def asRecords: MemoryRecords = MemoryRecords.readableRecords(buffer.duplicate())
+
private def shallowValidBytes: Int = {
if (shallowValidByteCount < 0) {
this.shallowValidByteCount = this.internalIterator(isShallow = true).map { messageAndOffset =>
@@ -371,19 +373,6 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
written
}
- /** Write the messages in this set to the given channel starting at the given offset byte.
- * Less than the complete amount may be written, but no more than maxSize can be. The number
- * of bytes written is returned */
- def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int = {
- if (offset > Int.MaxValue)
- throw new IllegalArgumentException(s"offset should not be larger than Int.MaxValue: $offset")
- val dup = buffer.duplicate()
- val position = offset.toInt
- dup.position(position)
- dup.limit(math.min(buffer.limit, position + maxSize))
- channel.write(dup)
- }
-
override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = {
for (messageAndOffset <- shallowIterator) {
if (messageAndOffset.message.magic != expectedMagicValue)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index 14c455c..ffa27fa 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -20,6 +20,8 @@ package kafka.message
import java.nio._
import java.nio.channels._
+import org.apache.kafka.common.record.Records
+
/**
* Message set helper functions
*/
@@ -70,11 +72,6 @@ case class MagicAndTimestamp(magic: Byte, timestamp: Long)
*/
abstract class MessageSet extends Iterable[MessageAndOffset] {
- /** Write the messages in this set to the given channel starting at the given offset byte.
- * Less than the complete amount may be written, but no more than maxSize can be. The number
- * of bytes written is returned */
- def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
-
/**
* Check if all the wrapper messages in the message set have the expected magic value
*/
@@ -91,6 +88,11 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
def sizeInBytes: Int
/**
+ * Get the client representation of the message set
+ */
+ def asRecords: Records
+
+ /**
* Print this message set's contents. If the message set has more than 100 messages, just
* print the first 100.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index dace782..0cece68 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -23,7 +23,7 @@ import java.util.HashMap
import java.util.concurrent._
import com.yammer.metrics.core.Gauge
-import kafka.api._
+import kafka.api.{ControlledShutdownRequest, RequestOrResponse}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaId
import kafka.utils.{Logging, SystemTime}
@@ -31,19 +31,19 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.network.Send
import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
-import org.apache.kafka.common.requests.{AbstractRequest, ApiVersionsRequest, ProduceRequest, RequestHeader, RequestSend}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.log4j.Logger
-
object RequestChannel extends Logging {
- val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost()), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)
+ val AllDone = Request(processor = 1, connectionId = "2", Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT)
private val requestLogger = Logger.getLogger("kafka.request.logger")
def getShutdownReceive() = {
val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, "", 0)
- val emptyProduceRequest = new ProduceRequest(0, 0, new HashMap[TopicPartition, ByteBuffer]())
- RequestSend.serialize(emptyRequestHeader, emptyProduceRequest.toStruct)
+ val emptyProduceRequest = new ProduceRequest(0, 0, new HashMap[TopicPartition, MemoryRecords]())
+ AbstractRequestResponse.serialize(emptyRequestHeader, emptyProduceRequest)
}
case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
@@ -61,19 +61,12 @@ object RequestChannel extends Logging {
val requestId = buffer.getShort()
- // TODO: this will be removed once we migrated to client-side format
- // for server-side request / response format
- // NOTE: this map only includes the server-side request/response handlers. Newer
- // request types should only use the client-side versions which are parsed with
- // o.a.k.common.requests.AbstractRequest.getRequest()
- private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
- Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
- ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
- )
-
- // TODO: this will be removed once we migrated to client-side format
- val requestObj =
- keyToNameAndDeserializerMap.get(requestId).map(readFrom => readFrom(buffer)).orNull
+ // TODO: this will be removed once we remove support for v0 of ControlledShutdownRequest (which
+ // depends on a non-standard request header)
+ val requestObj: RequestOrResponse = if (requestId == ApiKeys.CONTROLLED_SHUTDOWN_KEY.id)
+ ControlledShutdownRequest.readFrom(buffer)
+ else
+ null
// if we failed to find a server-side mapping, then try using the
// client-side request / response format
@@ -108,7 +101,7 @@ object RequestChannel extends Logging {
if (requestObj != null)
requestObj.describe(details)
else
- header.toString + " -- " + body.toString
+ s"$header -- $body"
}
trace("Processor %d received request : %s".format(processor, requestDesc(true)))
@@ -135,7 +128,7 @@ object RequestChannel extends Logging {
val totalTime = endTimeMs - startTimeMs
val fetchMetricNames =
if (requestId == ApiKeys.FETCH.id) {
- val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
+ val isFromFollower = body.asInstanceOf[FetchRequest].isFromFollower
Seq(
if (isFromFollower) RequestMetrics.followFetchMetricName
else RequestMetrics.consumerFetchMetricName
@@ -172,6 +165,9 @@ object RequestChannel extends Logging {
def this(request: Request, send: Send) =
this(request.processor, request, send)
+
+ def this(request: Request, response: AbstractResponse) =
+ this(request, response.toSend(request.connectionId, request.header))
}
trait ResponseAction
@@ -221,14 +217,14 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
/** No operation to take for the request, need to read more over the network */
def noOperation(processor: Int, request: RequestChannel.Request) {
- responseQueues(processor).put(new RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction))
+ responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction))
for(onResponse <- responseListeners)
onResponse(processor)
}
/** Close the connection for the request */
def closeConnection(processor: Int, request: RequestChannel.Request) {
- responseQueues(processor).put(new RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction))
+ responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction))
for(onResponse <- responseListeners)
onResponse(processor)
}
@@ -254,7 +250,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
}
def shutdown() {
- requestQueue.clear
+ requestQueue.clear()
}
}
@@ -283,4 +279,3 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
}
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 4bf04e6..001051f 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -20,14 +20,15 @@ package kafka.server
import java.util.concurrent.TimeUnit
import kafka.api.FetchResponsePartitionData
-import kafka.api.PartitionFetchInfo
import kafka.common.TopicAndPartition
import kafka.metrics.KafkaMetricsGroup
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.requests.FetchRequest.PartitionData
import scala.collection._
-case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionFetchInfo) {
+case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) {
override def toString = "[startOffsetMetadata: " + startOffsetMetadata + ", " +
"fetchInfo: " + fetchInfo + "]"
@@ -103,7 +104,7 @@ class DelayedFetch(delayMs: Long,
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
// we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
- val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.fetchSize)
+ val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
if (quota.isThrottled(topicAndPartition))
accumulatedThrottledSize += bytesAvailable
else
@@ -146,7 +147,7 @@ class DelayedFetch(delayMs: Long,
readOnlyCommitted = fetchMetadata.fetchOnlyCommitted,
fetchMaxBytes = fetchMetadata.fetchMaxBytes,
hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
- readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
+ readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => new TopicPartition(tp.topic, tp.partition) -> status.fetchInfo },
quota = quota
)
|