kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2275: Add ListTopics() API to the Java consumer; reviewed by Jason Gustafson, Edward Ribeiro and Guozhang Wang
Date Tue, 28 Jul 2015 22:49:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3df46bf4c -> 594b96393


KAFKA-2275: Add ListTopics() API to the Java consumer; reviewed by Jason Gustafson, Edward
Ribeiro and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/594b9639
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/594b9639
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/594b9639

Branch: refs/heads/trunk
Commit: 594b963930c2054199ed54203415d1cb7917df27
Parents: 3df46bf
Author: Ashish Singh <asingh@cloudera.com>
Authored: Tue Jul 28 15:49:22 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Jul 28 15:49:22 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientRequest.java | 29 +++++++++++--
 .../org/apache/kafka/clients/NetworkClient.java |  4 +-
 .../apache/kafka/clients/consumer/Consumer.java |  5 +++
 .../kafka/clients/consumer/KafkaConsumer.java   | 16 +++++++
 .../kafka/clients/consumer/MockConsumer.java    |  6 +++
 .../clients/consumer/internals/Fetcher.java     | 44 ++++++++++++++++++++
 .../clients/consumer/internals/FetcherTest.java | 13 ++++++
 .../integration/kafka/api/ConsumerTest.scala    | 18 ++++++++
 8 files changed, 130 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index ed4c0d9..dc8f0f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -23,6 +23,7 @@ public final class ClientRequest {
     private final boolean expectResponse;
     private final RequestSend request;
     private final RequestCompletionHandler callback;
+    private final boolean isInitiatedByNetworkClient;
 
     /**
      * @param createdMs The unix timestamp in milliseconds for the time at which this request
was created.
@@ -30,17 +31,35 @@ public final class ClientRequest {
      * @param request The request
      * @param callback A callback to execute when the response has been received (or null
if no callback is necessary)
      */
-    public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, RequestCompletionHandler
callback) {
+    public ClientRequest(long createdMs, boolean expectResponse, RequestSend request,
+                         RequestCompletionHandler callback) {
+        this(createdMs, expectResponse, request, callback, false);
+    }
+
+    /**
+     * @param createdMs The unix timestamp in milliseconds for the time at which this request
was created.
+     * @param expectResponse Should we expect a response message or is this request complete
once it is sent?
+     * @param request The request
+     * @param callback A callback to execute when the response has been received (or null
if no callback is necessary)
+     * @param isInitiatedByNetworkClient Is request initiated by network client, if yes,
its
+     *                                   response will be consumed by network client
+     */
+    public ClientRequest(long createdMs, boolean expectResponse, RequestSend request,
+                         RequestCompletionHandler callback, boolean isInitiatedByNetworkClient)
{
         this.createdMs = createdMs;
         this.callback = callback;
         this.request = request;
         this.expectResponse = expectResponse;
+        this.isInitiatedByNetworkClient = isInitiatedByNetworkClient;
     }
 
     @Override
     public String toString() {
-        return "ClientRequest(expectResponse=" + expectResponse + ", callback=" + callback
+ ", request=" + request
-                + ")";
+        return "ClientRequest(expectResponse=" + expectResponse +
+            ", callback=" + callback +
+            ", request=" + request +
+            (isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient" : "") +
+            ")";
     }
 
     public boolean expectResponse() {
@@ -63,4 +82,8 @@ public final class ClientRequest {
         return createdMs;
     }
 
+    public boolean isInitiatedByNetworkClient() {
+        return isInitiatedByNetworkClient;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 48fe796..0e51d7b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -378,7 +378,7 @@ public class NetworkClient implements KafkaClient {
             short apiKey = req.request().header().apiKey();
             Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
             correlate(req.request().header(), header);
-            if (apiKey == ApiKeys.METADATA.id) {
+            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient())
{
                 handleMetadataResponse(req.request().header(), body, now);
             } else {
                 // need to add body/header to response here
@@ -454,7 +454,7 @@ public class NetworkClient implements KafkaClient {
     private ClientRequest metadataRequest(long now, String node, Set<String> topics)
{
         MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
         RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
-        return new ClientRequest(now, true, send, null);
+        return new ClientRequest(now, true, send, null, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 252b759..23e410b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -114,6 +114,11 @@ public interface Consumer<K, V> extends Closeable {
     public List<PartitionInfo> partitionsFor(String topic);
 
     /**
+     * @see KafkaConsumer#listTopics()
+     */
+    public Map<String, List<PartitionInfo>> listTopics();
+
+    /**
      * @see KafkaConsumer#close()
      */
     public void close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index bea3d73..923ff99 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1024,6 +1024,22 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         }
     }
 
+    /**
+     * Get metadata about partitions for all topics. This method will issue a remote call
to the
+     * server.
+     *
+     * @return The map of topics and its partitions
+     */
+    @Override
+    public Map<String, List<PartitionInfo>> listTopics() {
+        acquire();
+        try {
+            return fetcher.getAllTopics(requestTimeoutMs);
+        } finally {
+            release();
+        }
+    }
+
     @Override
     public void close() {
         acquire();

http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c14eed1..5b22fa0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -177,6 +177,12 @@ public class MockConsumer<K, V> implements Consumer<K, V>
{
             return parts;
     }
 
+    @Override
+    public Map<String, List<PartitionInfo>> listTopics() {
+        ensureNotClosed();
+        return partitions;
+    }
+
     public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions)
{
         ensureNotClosed();
         this.partitions.put(topic, partitions);

http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index d2a0e2b..9f71451 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -39,6 +39,8 @@ import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -160,6 +162,48 @@ public class Fetcher<K, V> {
         }
     }
 
+
+
+    /**
+     * Get metadata for all topics present in Kafka cluster
+     *
+     * @param timeout time for which getting all topics is attempted
+     * @return The map of topics and its partitions
+     */
+    public Map<String, List<PartitionInfo>> getAllTopics(long timeout) {
+        final HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new
HashMap<>();
+        long startTime = time.milliseconds();
+
+        while (time.milliseconds() - startTime < timeout) {
+            final Node node = client.leastLoadedNode();
+            if (node != null) {
+                MetadataRequest metadataRequest = new MetadataRequest(Collections.<String>emptyList());
+                final RequestFuture<ClientResponse> requestFuture =
+                    client.send(node, ApiKeys.METADATA, metadataRequest);
+
+                client.poll(requestFuture);
+
+                if (requestFuture.succeeded()) {
+                    MetadataResponse response =
+                        new MetadataResponse(requestFuture.value().responseBody());
+
+                    for (String topic : response.cluster().topics())
+                        topicsPartitionInfos.put(
+                            topic, response.cluster().availablePartitionsForTopic(topic));
+
+                    return topicsPartitionInfos;
+                }
+
+                if (!requestFuture.isRetriable())
+                    throw requestFuture.exception();
+            }
+
+            Utils.sleep(retryBackoffMs);
+        }
+
+        return topicsPartitionInfos;
+    }
+
     /**
      * Reset offsets for the given partition using the offset reset strategy.
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4002679..06e2990 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -29,6 +30,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
@@ -180,6 +182,17 @@ public class FetcherTest {
         assertEquals(null, subscriptions.consumed(tp));
     }
 
+    @Test
+    public void testGetAllTopics() throws InterruptedException {
+        // sending response before request, as getAllTopics is a blocking call
+        client.prepareResponse(
+            new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
+
+        Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopics(5000L);
+
+        assertEquals(cluster.topics().size(), allTopics.size());
+    }
+
     private Struct fetchResponse(ByteBuffer buffer, short error, long hw) {
         FetchResponse response = new FetchResponse(Collections.singletonMap(tp, new FetchResponse.PartitionData(error,
hw, buffer)));
         return response.toStruct();

http://git-wip-us.apache.org/repos/asf/kafka/blob/594b9639/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index cca6e94..0c2755f 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -186,6 +186,24 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
   }
 
+  def testListTopics() {
+    val numParts = 2
+    val topic1: String = "part-test-topic-1"
+    val topic2: String = "part-test-topic-2"
+    val topic3: String = "part-test-topic-3"
+    TestUtils.createTopic(this.zkClient, topic1, numParts, 1, this.servers)
+    TestUtils.createTopic(this.zkClient, topic2, numParts, 1, this.servers)
+    TestUtils.createTopic(this.zkClient, topic3, numParts, 1, this.servers)
+
+    val topics = this.consumers.head.listTopics()
+    assertNotNull(topics)
+    assertEquals(5, topics.size())
+    assertEquals(5, topics.keySet().size())
+    assertEquals(2, topics.get(topic1).length)
+    assertEquals(2, topics.get(topic2).length)
+    assertEquals(2, topics.get(topic3).length)
+  }
+
   def testPartitionReassignmentCallback() {
     val callback = new TestConsumerReassignmentCallback()
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); //
timeout quickly to avoid slow test


Mime
View raw message