Repository: kafka
Updated Branches:
refs/heads/trunk 3df46bf4c -> 594b96393
KAFKA-2275: Add ListTopics() API to the Java consumer; reviewed by Jason Gustafson, Edward
Ribeiro and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/594b9639
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/594b9639
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/594b9639
Branch: refs/heads/trunk
Commit: 594b963930c2054199ed54203415d1cb7917df27
Parents: 3df46bf
Author: Ashish Singh <asingh@cloudera.com>
Authored: Tue Jul 28 15:49:22 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 28 15:49:22 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/ClientRequest.java | 29 +++++++++++--
.../org/apache/kafka/clients/NetworkClient.java | 4 +-
.../apache/kafka/clients/consumer/Consumer.java | 5 +++
.../kafka/clients/consumer/KafkaConsumer.java | 16 +++++++
.../kafka/clients/consumer/MockConsumer.java | 6 +++
.../clients/consumer/internals/Fetcher.java | 44 ++++++++++++++++++++
.../clients/consumer/internals/FetcherTest.java | 13 ++++++
.../integration/kafka/api/ConsumerTest.scala | 18 ++++++++
8 files changed, 130 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index ed4c0d9..dc8f0f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -23,6 +23,7 @@ public final class ClientRequest {
private final boolean expectResponse;
private final RequestSend request;
private final RequestCompletionHandler callback;
+ private final boolean isInitiatedByNetworkClient;
/**
* @param createdMs The unix timestamp in milliseconds for the time at which this request
was created.
@@ -30,17 +31,35 @@ public final class ClientRequest {
* @param request The request
* @param callback A callback to execute when the response has been received (or null
if no callback is necessary)
*/
- public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, RequestCompletionHandler
callback) {
+ public ClientRequest(long createdMs, boolean expectResponse, RequestSend request,
+ RequestCompletionHandler callback) {
+ this(createdMs, expectResponse, request, callback, false);
+ }
+
+ /**
+ * @param createdMs The unix timestamp in milliseconds for the time at which this request
was created.
+ * @param expectResponse Should we expect a response message or is this request complete
once it is sent?
+ * @param request The request
+ * @param callback A callback to execute when the response has been received (or null
if no callback is necessary)
+ * @param isInitiatedByNetworkClient Is request initiated by network client, if yes,
its
+ * response will be consumed by network client
+ */
+ public ClientRequest(long createdMs, boolean expectResponse, RequestSend request,
+ RequestCompletionHandler callback, boolean isInitiatedByNetworkClient)
{
this.createdMs = createdMs;
this.callback = callback;
this.request = request;
this.expectResponse = expectResponse;
+ this.isInitiatedByNetworkClient = isInitiatedByNetworkClient;
}
@Override
public String toString() {
- return "ClientRequest(expectResponse=" + expectResponse + ", callback=" + callback
+ ", request=" + request
- + ")";
+ return "ClientRequest(expectResponse=" + expectResponse +
+ ", callback=" + callback +
+ ", request=" + request +
+ (isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient" : "") +
+ ")";
}
public boolean expectResponse() {
@@ -63,4 +82,8 @@ public final class ClientRequest {
return createdMs;
}
+ public boolean isInitiatedByNetworkClient() {
+ return isInitiatedByNetworkClient;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 48fe796..0e51d7b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -378,7 +378,7 @@ public class NetworkClient implements KafkaClient {
short apiKey = req.request().header().apiKey();
Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
correlate(req.request().header(), header);
- if (apiKey == ApiKeys.METADATA.id) {
+ if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient())
{
handleMetadataResponse(req.request().header(), body, now);
} else {
// need to add body/header to response here
@@ -454,7 +454,7 @@ public class NetworkClient implements KafkaClient {
private ClientRequest metadataRequest(long now, String node, Set<String> topics)
{
MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
- return new ClientRequest(now, true, send, null);
+ return new ClientRequest(now, true, send, null, true);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 252b759..23e410b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -114,6 +114,11 @@ public interface Consumer<K, V> extends Closeable {
public List<PartitionInfo> partitionsFor(String topic);
/**
+ * @see KafkaConsumer#listTopics()
+ */
+ public Map<String, List<PartitionInfo>> listTopics();
+
+ /**
* @see KafkaConsumer#close()
*/
public void close();
http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index bea3d73..923ff99 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1024,6 +1024,22 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
}
}
+ /**
+ * Get metadata about partitions for all topics. This method will issue a remote call
to the
+ * server.
+ *
+ * @return The map of topics and its partitions
+ */
+ @Override
+ public Map<String, List<PartitionInfo>> listTopics() {
+ acquire();
+ try {
+ return fetcher.getAllTopics(requestTimeoutMs);
+ } finally {
+ release();
+ }
+ }
+
@Override
public void close() {
acquire();
http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c14eed1..5b22fa0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -177,6 +177,12 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
return parts;
}
+ @Override
+ public Map<String, List<PartitionInfo>> listTopics() {
+ ensureNotClosed();
+ return partitions;
+ }
+
public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions)
{
ensureNotClosed();
this.partitions.put(topic, partitions);
http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index d2a0e2b..9f71451 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -39,6 +39,8 @@ import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -160,6 +162,48 @@ public class Fetcher<K, V> {
}
}
+
+
+ /**
+ * Get metadata for all topics present in Kafka cluster
+ *
+ * @param timeout time for which getting all topics is attempted
+ * @return The map of topics and its partitions
+ */
+ public Map<String, List<PartitionInfo>> getAllTopics(long timeout) {
+ final HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new
HashMap<>();
+ long startTime = time.milliseconds();
+
+ while (time.milliseconds() - startTime < timeout) {
+ final Node node = client.leastLoadedNode();
+ if (node != null) {
+ MetadataRequest metadataRequest = new MetadataRequest(Collections.<String>emptyList());
+ final RequestFuture<ClientResponse> requestFuture =
+ client.send(node, ApiKeys.METADATA, metadataRequest);
+
+ client.poll(requestFuture);
+
+ if (requestFuture.succeeded()) {
+ MetadataResponse response =
+ new MetadataResponse(requestFuture.value().responseBody());
+
+ for (String topic : response.cluster().topics())
+ topicsPartitionInfos.put(
+ topic, response.cluster().availablePartitionsForTopic(topic));
+
+ return topicsPartitionInfos;
+ }
+
+ if (!requestFuture.isRetriable())
+ throw requestFuture.exception();
+ }
+
+ Utils.sleep(retryBackoffMs);
+ }
+
+ return topicsPartitionInfos;
+ }
+
/**
* Reset offsets for the given partition using the offset reset strategy.
*
http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4002679..06e2990 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -29,6 +30,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
@@ -180,6 +182,17 @@ public class FetcherTest {
assertEquals(null, subscriptions.consumed(tp));
}
+ @Test
+ public void testGetAllTopics() throws InterruptedException {
+ // sending response before request, as getAllTopics is a blocking call
+ client.prepareResponse(
+ new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
+
+ Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopics(5000L);
+
+ assertEquals(cluster.topics().size(), allTopics.size());
+ }
+
private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error,
hw, buffer)));
return response.toStruct();
http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index cca6e94..0c2755f 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -186,6 +186,24 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
}
+ def testListTopics() {
+ val numParts = 2
+ val topic1: String = "part-test-topic-1"
+ val topic2: String = "part-test-topic-2"
+ val topic3: String = "part-test-topic-3"
+ TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers)
+ TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers)
+ TestUtils.createTopic(this.zkClient, topic3, numParts, 1, this.servers)
+
+ val topics = this.consumers.head.listTopics()
+ assertNotNull(topics)
+ assertEquals(5, topics.size())
+ assertEquals(5, topics.keySet().size())
+ assertEquals(2, topics.get(topic1).length)
+ assertEquals(2, topics.get(topic2).length)
+ assertEquals(2, topics.get(topic3).length)
+ }
+
def testPartitionReassignmentCallback() {
val callback = new TestConsumerReassignmentCallback()
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); //
timeout quickly to avoid slow test
|