kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/3] kafka git commit: KAFKA-2687: Add support for ListGroups and DescribeGroup APIs
Date Tue, 03 Nov 2015 22:40:27 GMT
KAFKA-2687: Add support for ListGroups and DescribeGroup APIs

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang, Jun Rao

Closes #388 from hachikuji/K2687


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/596c203a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/596c203a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/596c203a

Branch: refs/heads/trunk
Commit: 596c203af1f33360c04f4be7c466310d11343f78
Parents: f413143
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Nov 3 14:46:04 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Nov 3 14:46:04 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  34 +--
 .../consumer/internals/AbstractCoordinator.java |  20 +-
 .../consumer/internals/ConsumerCoordinator.java |   2 +-
 .../consumer/internals/ConsumerProtocol.java    |   2 +
 .../kafka/clients/producer/ProducerConfig.java  |  70 +++---
 .../apache/kafka/common/config/ConfigDef.java   |  17 ++
 .../apache/kafka/common/config/SaslConfigs.java |   8 +
 .../apache/kafka/common/config/SslConfigs.java  |  17 ++
 .../apache/kafka/common/protocol/ApiKeys.java   |   6 +-
 .../apache/kafka/common/protocol/Errors.java    |   3 +-
 .../apache/kafka/common/protocol/Protocol.java  |  82 ++++++-
 .../kafka/common/protocol/types/Struct.java     |   8 +
 .../kafka/common/requests/AbstractRequest.java  |   8 +-
 .../common/requests/DescribeGroupsRequest.java  |  68 ++++++
 .../common/requests/DescribeGroupsResponse.java | 224 +++++++++++++++++
 .../requests/GroupCoordinatorRequest.java       |  65 +++++
 .../requests/GroupCoordinatorResponse.java      |  70 ++++++
 .../common/requests/GroupMetadataRequest.java   |  65 -----
 .../common/requests/GroupMetadataResponse.java  |  70 ------
 .../common/requests/ListGroupsRequest.java      |  57 +++++
 .../common/requests/ListGroupsResponse.java     | 107 ++++++++
 .../internals/ConsumerCoordinatorTest.java      |   4 +-
 .../common/requests/RequestResponseTest.java    |  45 +++-
 .../distributed/WorkerCoordinatorTest.java      |   4 +-
 .../main/scala/kafka/admin/AdminClient.scala    | 242 +++++++++++++++++++
 .../kafka/api/GroupCoordinatorRequest.scala     |  80 ++++++
 .../kafka/api/GroupCoordinatorResponse.scala    |  58 +++++
 .../scala/kafka/api/GroupMetadataRequest.scala  |  80 ------
 .../scala/kafka/api/GroupMetadataResponse.scala |  58 -----
 core/src/main/scala/kafka/api/RequestKeys.scala |  10 +-
 .../main/scala/kafka/client/ClientUtils.scala   |   4 +-
 .../main/scala/kafka/common/ErrorMapping.scala  |   1 +
 .../scala/kafka/consumer/SimpleConsumer.scala   |   4 +-
 .../kafka/coordinator/GroupCoordinator.scala    | 104 ++++++--
 .../scala/kafka/coordinator/GroupMetadata.scala |  36 ++-
 .../coordinator/GroupMetadataManager.scala      |  38 +--
 .../kafka/coordinator/MemberMetadata.scala      |  24 +-
 .../javaapi/GroupCoordinatorResponse.scala      |  47 ++++
 .../kafka/javaapi/GroupMetadataResponse.scala   |  47 ----
 .../scala/kafka/network/RequestChannel.scala    |   3 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  90 +++++--
 .../integration/kafka/api/AdminClientTest.scala | 114 +++++++++
 .../kafka/api/AuthorizerIntegrationTest.scala   |   8 +-
 .../scala/other/kafka/TestOffsetManager.scala   |   4 +-
 .../api/RequestResponseSerializationTest.scala  |  10 +-
 .../GroupCoordinatorResponseTest.scala          | 144 +++++++++--
 .../kafka/coordinator/GroupMetadataTest.scala   |  23 +-
 .../kafka/coordinator/MemberMetadataTest.scala  |  31 +--
 .../unit/kafka/server/OffsetCommitTest.scala    |   4 +-
 49 files changed, 1765 insertions(+), 555 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index b366efd..4131352 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -17,8 +17,6 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import java.util.HashMap;
@@ -286,27 +284,6 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         Importance.HIGH,
                                         VALUE_DESERIALIZER_CLASS_DOC)
-                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
-                                .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
-                                .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
-                                .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
-                                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
-                                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
-                                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
-                                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
-                                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
-                                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
-                                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
-                                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-                                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
-                                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
-                                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
                                         Type.INT,
                                         40 * 1000,
@@ -318,7 +295,16 @@ public class ConsumerConfig extends AbstractConfig {
                                         Type.LONG,
                                         9 * 60 * 1000,
                                         Importance.MEDIUM,
-                                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC);
+                                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+
+                                // security support
+                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+                                        Type.STRING,
+                                        CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                                .withClientSslSupport()
+                                .withClientSaslSupport();
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d8f3c25..9cf825c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -30,8 +30,8 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.GroupMetadataRequest;
-import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.GroupCoordinatorRequest;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -450,8 +450,8 @@ public abstract class AbstractCoordinator {
         } else {
             // create a group  metadata request
             log.debug("Issuing group metadata request to broker {}", node.id());
-            GroupMetadataRequest metadataRequest = new GroupMetadataRequest(this.groupId);
-            return client.send(node, ApiKeys.GROUP_METADATA, metadataRequest)
+            GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
+            return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
                     .compose(new RequestFutureAdapter<ClientResponse, Void>() {
                         @Override
                         public void onSuccess(ClientResponse response, RequestFuture<Void> future) {
@@ -472,14 +472,14 @@ public abstract class AbstractCoordinator {
             // We already found the coordinator, so ignore the request
             future.complete(null);
         } else {
-            GroupMetadataResponse groupMetadataResponse = new GroupMetadataResponse(resp.responseBody());
+            GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
             // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
-            if (groupMetadataResponse.errorCode() == Errors.NONE.code()) {
-                this.coordinator = new Node(Integer.MAX_VALUE - groupMetadataResponse.node().id(),
-                        groupMetadataResponse.node().host(),
-                        groupMetadataResponse.node().port());
+            if (groupCoordinatorResponse.errorCode() == Errors.NONE.code()) {
+                this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
+                        groupCoordinatorResponse.node().host(),
+                        groupCoordinatorResponse.node().port());
 
                 client.tryConnect(coordinator);
 
@@ -488,7 +488,7 @@ public abstract class AbstractCoordinator {
                     heartbeatTask.reset();
                 future.complete(null);
             } else {
-                future.raise(Errors.forCode(groupMetadataResponse.errorCode()));
+                future.raise(Errors.forCode(groupCoordinatorResponse.errorCode()));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 97d25c3..5b5d6ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -119,7 +119,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
 
     @Override
     public String protocolType() {
-        return "consumer";
+        return ConsumerProtocol.PROTOCOL_TYPE;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
index 0020993..4728a50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
@@ -54,6 +54,8 @@ import java.util.Map;
  */
 public class ConsumerProtocol {
 
+    public static final String PROTOCOL_TYPE = "consumer";
+
     public static final String VERSION_KEY_NAME = "version";
     public static final String TOPICS_KEY_NAME = "topics";
     public static final String TOPIC_KEY_NAME = "topic";

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index e9d9fad..3eaea09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -12,23 +12,22 @@
  */
 package org.apache.kafka.clients.producer;
 
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.serialization.Serializer;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
 /**
  * Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a
  * href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka documentation</a>
@@ -262,32 +261,33 @@ public class ProducerConfig extends AbstractConfig {
                                         atLeast(1),
                                         Importance.LOW,
                                         MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC)
-                                .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
-                                .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
-                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
-                                .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
-                                .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
-                                .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
-                                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
-                                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
-                                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
-                                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
-                                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
-                                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
-                                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
-                                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
-                                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-                                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
-                                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
-                                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
-                                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                                .define(KEY_SERIALIZER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        KEY_SERIALIZER_CLASS_DOC)
+                                .define(VALUE_SERIALIZER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        VALUE_SERIALIZER_CLASS_DOC)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
-                                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, Type.LONG, 9 * 60 * 1000, Importance.MEDIUM, CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
-                                .define(PARTITIONER_CLASS_CONFIG, Type.CLASS, "org.apache.kafka.clients.producer.internals.DefaultPartitioner", Importance.MEDIUM, PARTITIONER_CLASS_DOC);
+                                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                                        Type.LONG,
+                                        9 * 60 * 1000,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+                                .define(PARTITIONER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        DefaultPartitioner.class.getName(),
+                                        Importance.MEDIUM, PARTITIONER_CLASS_DOC)
+
+                                // security support
+                                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+                                        Type.STRING,
+                                        CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                                .withClientSslSupport()
+                                .withClientSaslSupport();
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 2e820dd..f01ed28 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -151,6 +151,23 @@ public class ConfigDef {
         return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
     }
 
+    /**
+     * Add standard SSL client configuration options.
+     * @return this
+     */
+    public ConfigDef withClientSslSupport() {
+        SslConfigs.addClientSslSupport(this);
+        return this;
+    }
+
+    /**
+     * Add standard SASL client configuration options.
+     * @return this
+     */
+    public ConfigDef withClientSaslSupport() {
+        SaslConfigs.addClientSaslSupport(this);
+        return this;
+    }
 
     /**
      * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
index 657c6d3..0046868 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
@@ -49,4 +49,12 @@ public class SaslConfigs {
             "By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>.";
     public static final List<String> DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = Collections.singletonList("DEFAULT");
 
+    public static void addClientSaslSupport(ConfigDef config) {
+        config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index 60e1eb3..8f93301 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -96,4 +96,21 @@ public class SslConfigs {
                                            + " unlike requested , if this option is set client can choose not to provide authentication information about itself"
                                            + " <li><code>ssl.client.auth=none</code> This means client authentication is not needed.";
 
+    public static void addClientSslSupport(ConfigDef config) {
+        config.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
+                .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
+                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index af7b266..5bd3c96 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -30,11 +30,13 @@ public enum ApiKeys {
     CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
     OFFSET_COMMIT(8, "OffsetCommit"),
     OFFSET_FETCH(9, "OffsetFetch"),
-    GROUP_METADATA(10, "GroupMetadata"),
+    GROUP_COORDINATOR(10, "GroupCoordinator"),
     JOIN_GROUP(11, "JoinGroup"),
     HEARTBEAT(12, "Heartbeat"),
     LEAVE_GROUP(13, "LeaveGroup"),
-    SYNC_GROUP(14, "SyncGroup");
+    SYNC_GROUP(14, "SyncGroup"),
+    DESCRIBE_GROUPS(15, "DescribeGroups"),
+    LIST_GROUPS(16, "ListGroups");
 
     private static ApiKeys[] codeToType;
     public static final int MAX_API_KEY;

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 2c9cb20..d4eb1f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -86,7 +86,8 @@ public enum Errors {
             new ApiException("The session timeout is not within an acceptable range.")),
     INVALID_COMMIT_OFFSET_SIZE(28,
             new ApiException("The committing offset data size is not valid")),
-    AUTHORIZATION_FAILED(29, new ApiException("Request is not authorized.")),
+    AUTHORIZATION_FAILED(29,
+            new ApiException("Request is not authorized.")),
     REBALANCE_IN_PROGRESS(30,
             new RebalanceInProgressException("The group is rebalancing, so a rejoin is needed."));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 36094b0..00560db 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -388,18 +388,71 @@ public class Protocol {
     public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1};
     public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1};
 
-    /* Consumer metadata api */
-    public static final Schema GROUP_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                                STRING,
-                                                                                "The unique group id."));
+    /* List groups api */
+    public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema();
 
-    public static final Schema GROUP_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                       new Field("coordinator",
-                                                                                 BROKER,
-                                                                                 "Host and port information for the coordinator for a consumer group."));
+    public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING),
+                                                                          new Field("protocol_type", STRING));
+    public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+                                                                    new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0)));
 
-    public static final Schema[] GROUP_METADATA_REQUEST = new Schema[] {GROUP_METADATA_REQUEST_V0};
-    public static final Schema[] GROUP_METADATA_RESPONSE = new Schema[] {GROUP_METADATA_RESPONSE_V0};
+    public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0};
+    public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0};
+
+    /* Describe group api */
+    public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids",
+                                                                                 new ArrayOf(STRING),
+                                                                                 "List of groupIds to request metadata for (an empty groupId array will return empty group metadata)."));
+
+    public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id",
+                                                                                         STRING,
+                                                                                         "The memberId assigned by the coordinator"),
+                                                                               new Field("client_id",
+                                                                                         STRING,
+                                                                                         "The client id used in the member's latest join group request"),
+                                                                               new Field("client_host",
+                                                                                         STRING,
+                                                                                         "The client host used in the request session corresponding to the member's join group."),
+                                                                               new Field("member_metadata",
+                                                                                         BYTES,
+                                                                                         "The metadata corresponding to the current group protocol in use (will only be present if the group is stable)."),
+                                                                               new Field("member_assignment",
+                                                                                         BYTES,
+                                                                                         "The current assignment provided by the group leader (will only be present if the group is stable)."));
+
+    public static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema(new Field("error_code", INT16),
+                                                                                       new Field("group_id",
+                                                                                                 STRING),
+                                                                                       new Field("state",
+                                                                                                 STRING,
+                                                                                                 "The current state of the group (one of: Dead, Stable, AwaitingSync, or PreparingRebalance, or empty if there is no active group)"),
+                                                                                       new Field("protocol_type",
+                                                                                                 STRING,
+                                                                                                 "The current group protocol type (will be empty if the there is no active group)"),
+                                                                                       new Field("protocol",
+                                                                                                 STRING,
+                                                                                                 "The current group protocol (only provided if the group is Stable)"),
+                                                                                       new Field("members",
+                                                                                                 new ArrayOf(DESCRIBE_GROUPS_RESPONSE_MEMBER_V0),
+                                                                                                 "Current group members (only provided if the group is not Dead)"));
+
+    public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0)));
+
+    public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0};
+    public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0};
+
+    /* Group coordinator api */
+    public static final Schema GROUP_COORDINATOR_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                                   STRING,
+                                                                                   "The unique group id."));
+
+    public static final Schema GROUP_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+                                                                          new Field("coordinator",
+                                                                                    BROKER,
+                                                                                    "Host and port information for the coordinator for a consumer group."));
+
+    public static final Schema[] GROUP_COORDINATOR_REQUEST = new Schema[] {GROUP_COORDINATOR_REQUEST_V0};
+    public static final Schema[] GROUP_COORDINATOR_RESPONSE = new Schema[] {GROUP_COORDINATOR_RESPONSE_V0};
 
     /* Controlled shutdown api */
     public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
@@ -616,12 +669,13 @@ public class Protocol {
         REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST;
         REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
         REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
-        REQUESTS[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_REQUEST;
+        REQUESTS[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_REQUEST;
         REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
         REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
         REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST;
         REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST;
-
+        REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST;
+        REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -633,11 +687,13 @@ public class Protocol {
         RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
-        RESPONSES[ApiKeys.GROUP_METADATA.id] = GROUP_METADATA_RESPONSE;
+        RESPONSES[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_RESPONSE;
         RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
         RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
         RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE;
         RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE;
+        RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE;
+        RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE;
 
         /* set the maximum version of each api */
         for (ApiKeys api : ApiKeys.values())

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index ef2525e..54c3deb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -98,6 +98,14 @@ public class Struct {
         return (Struct) get(name);
     }
 
+    public Byte getByte(Field field) {
+        return (Byte) get(field);
+    }
+
+    public byte getByte(String name) {
+        return (Byte) get(name);
+    }
+
     public Short getShort(Field field) {
         return (Short) get(field);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 03e77a5..8dfa3f6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -49,8 +49,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return OffsetCommitRequest.parse(buffer, versionId);
             case OFFSET_FETCH:
                 return OffsetFetchRequest.parse(buffer, versionId);
-            case GROUP_METADATA:
-                return GroupMetadataRequest.parse(buffer, versionId);
+            case GROUP_COORDINATOR:
+                return GroupCoordinatorRequest.parse(buffer, versionId);
             case JOIN_GROUP:
                 return JoinGroupRequest.parse(buffer, versionId);
             case HEARTBEAT:
@@ -67,6 +67,10 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
                 return UpdateMetadataRequest.parse(buffer, versionId);
             case LEADER_AND_ISR:
                 return LeaderAndIsrRequest.parse(buffer, versionId);
+            case DESCRIBE_GROUPS:
+                return DescribeGroupsRequest.parse(buffer, versionId);
+            case LIST_GROUPS:
+                return ListGroupsRequest.parse(buffer, versionId);
             default:
                 return null;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
new file mode 100644
index 0000000..a545cca
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class DescribeGroupsRequest extends AbstractRequest {
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.DESCRIBE_GROUPS.id);
+    private static final String GROUP_IDS_KEY_NAME = "group_ids";
+
+    private final List<String> groupIds;
+
+    public DescribeGroupsRequest(List<String> groupIds) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray());
+        this.groupIds = groupIds;
+    }
+
+    public DescribeGroupsRequest(Struct struct) {
+        super(struct);
+        this.groupIds = new ArrayList<>();
+        for (Object groupId : struct.getArray(GROUP_IDS_KEY_NAME))
+            this.groupIds.add((String) groupId);
+    }
+
+    public List<String> groupIds() {
+        return groupIds;
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
+
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.DESCRIBE_GROUPS.id)));
+        }
+    }
+
+    public static DescribeGroupsRequest parse(ByteBuffer buffer, int versionId) {
+        return new DescribeGroupsRequest(ProtoUtils.parseRequest(ApiKeys.DESCRIBE_GROUPS.id, versionId, buffer));
+    }
+
+    public static DescribeGroupsRequest parse(ByteBuffer buffer) {
+        return new DescribeGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
new file mode 100644
index 0000000..c71e7d2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeGroupsResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.DESCRIBE_GROUPS.id);
+
+    private static final String GROUPS_KEY_NAME = "groups";
+
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String GROUP_STATE_KEY_NAME = "state";
+    private static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
+    private static final String PROTOCOL_KEY_NAME = "protocol";
+
+    private static final String MEMBERS_KEY_NAME = "members";
+    private static final String MEMBER_ID_KEY_NAME = "member_id";
+    private static final String CLIENT_ID_KEY_NAME = "client_id";
+    private static final String CLIENT_HOST_KEY_NAME = "client_host";
+    private static final String MEMBER_METADATA_KEY_NAME = "member_metadata";
+    private static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment";
+
+    public static final String UNKNOWN_STATE = "";
+    public static final String UNKNOWN_PROTOCOL_TYPE = "";
+    public static final String UNKNOWN_PROTOCOL = "";
+
+    /**
+     * Possible per-group error codes:
+     *
+     * GROUP_LOAD_IN_PROGRESS (14)
+     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_GROUP (16)
+     * AUTHORIZATION_FAILED (29)
+     */
+
+    private final Map<String, GroupMetadata> groups;
+
+    public DescribeGroupsResponse(Map<String, GroupMetadata> groups) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        List<Struct> groupStructs = new ArrayList<>();
+        for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
+            Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
+            GroupMetadata group = groupEntry.getValue();
+            groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey());
+            groupStruct.set(ERROR_CODE_KEY_NAME, group.errorCode);
+            groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
+            groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
+            groupStruct.set(PROTOCOL_KEY_NAME, group.protocol);
+            List<Struct> membersList = new ArrayList<>();
+            for (GroupMember member : group.members) {
+                Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME);
+                memberStruct.set(MEMBER_ID_KEY_NAME, member.memberId);
+                memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId);
+                memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost);
+                memberStruct.set(MEMBER_METADATA_KEY_NAME, member.memberMetadata);
+                memberStruct.set(MEMBER_ASSIGNMENT_KEY_NAME, member.memberAssignment);
+                membersList.add(memberStruct);
+            }
+            groupStruct.set(MEMBERS_KEY_NAME, membersList.toArray());
+            groupStructs.add(groupStruct);
+        }
+        struct.set(GROUPS_KEY_NAME, groupStructs.toArray());
+        this.groups = groups;
+    }
+
+    public DescribeGroupsResponse(Struct struct) {
+        super(struct);
+        this.groups = new HashMap<>();
+        for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
+            Struct groupStruct = (Struct) groupObj;
+
+            String groupId = groupStruct.getString(GROUP_ID_KEY_NAME);
+            short errorCode = groupStruct.getShort(ERROR_CODE_KEY_NAME);
+            String state = groupStruct.getString(GROUP_STATE_KEY_NAME);
+            String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
+            String protocol = groupStruct.getString(PROTOCOL_KEY_NAME);
+
+            List<GroupMember> members = new ArrayList<>();
+            for (Object memberObj : groupStruct.getArray(MEMBERS_KEY_NAME)) {
+                Struct memberStruct = (Struct) memberObj;
+                String memberId = memberStruct.getString(MEMBER_ID_KEY_NAME);
+                String clientId = memberStruct.getString(CLIENT_ID_KEY_NAME);
+                String clientHost = memberStruct.getString(CLIENT_HOST_KEY_NAME);
+                ByteBuffer memberMetadata = memberStruct.getBytes(MEMBER_METADATA_KEY_NAME);
+                ByteBuffer memberAssignment = memberStruct.getBytes(MEMBER_ASSIGNMENT_KEY_NAME);
+                members.add(new GroupMember(memberId, clientId, clientHost,
+                        memberMetadata, memberAssignment));
+            }
+            this.groups.put(groupId, new GroupMetadata(errorCode, state, protocolType, protocol, members));
+        }
+    }
+
+    public Map<String, GroupMetadata> groups() {
+        return groups;
+    }
+
+
+    public static class GroupMetadata {
+        private final short errorCode;
+        private final String state;
+        private final String protocolType;
+        private final String protocol;
+        private final List<GroupMember> members;
+
+        public GroupMetadata(short errorCode,
+                             String state,
+                             String protocolType,
+                             String protocol,
+                             List<GroupMember> members) {
+            this.errorCode = errorCode;
+            this.state = state;
+            this.protocolType = protocolType;
+            this.protocol = protocol;
+            this.members = members;
+        }
+
+        public short errorCode() {
+            return errorCode;
+        }
+
+        public String state() {
+            return state;
+        }
+
+        public String protocolType() {
+            return protocolType;
+        }
+
+        public String protocol() {
+            return protocol;
+        }
+
+        public List<GroupMember> members() {
+            return members;
+        }
+
+        public static GroupMetadata forError(Errors error) {
+            return new DescribeGroupsResponse.GroupMetadata(
+                    error.code(),
+                    DescribeGroupsResponse.UNKNOWN_STATE,
+                    DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
+                    DescribeGroupsResponse.UNKNOWN_PROTOCOL,
+                    Collections.<DescribeGroupsResponse.GroupMember>emptyList());
+        }
+    }
+
+    public static class GroupMember {
+        private final String memberId;
+        private final String clientId;
+        private final String clientHost;
+        private final ByteBuffer memberMetadata;
+        private final ByteBuffer memberAssignment;
+
+        public GroupMember(String memberId,
+                           String clientId,
+                           String clientHost,
+                           ByteBuffer memberMetadata,
+                           ByteBuffer memberAssignment) {
+            this.memberId = memberId;
+            this.clientId = clientId;
+            this.clientHost = clientHost;
+            this.memberMetadata = memberMetadata;
+            this.memberAssignment = memberAssignment;
+        }
+
+        public String memberId() {
+            return memberId;
+        }
+
+        public String clientId() {
+            return clientId;
+        }
+
+        public String clientHost() {
+            return clientHost;
+        }
+
+        public ByteBuffer memberMetadata() {
+            return memberMetadata;
+        }
+
+        public ByteBuffer memberAssignment() {
+            return memberAssignment;
+        }
+    }
+
+    public static DescribeGroupsResponse parse(ByteBuffer buffer) {
+        return new DescribeGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+    public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) {
+        GroupMetadata errorMetadata = GroupMetadata.forError(error);
+        Map<String, GroupMetadata> groups = new HashMap<>();
+        for (String groupId : groupIds)
+            groups.put(groupId, errorMetadata);
+        return new DescribeGroupsResponse(groups);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
new file mode 100644
index 0000000..8c56e7f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class GroupCoordinatorRequest extends AbstractRequest {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_COORDINATOR.id);
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+
+    private final String groupId;
+
+    public GroupCoordinatorRequest(String groupId) {
+        super(new Struct(CURRENT_SCHEMA));
+
+        struct.set(GROUP_ID_KEY_NAME, groupId);
+        this.groupId = groupId;
+    }
+
+    public GroupCoordinatorRequest(Struct struct) {
+        super(struct);
+        groupId = struct.getString(GROUP_ID_KEY_NAME);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_COORDINATOR.id)));
+        }
+    }
+
+    public String groupId() {
+        return groupId;
+    }
+
+    public static GroupCoordinatorRequest parse(ByteBuffer buffer, int versionId) {
+        return new GroupCoordinatorRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_COORDINATOR.id, versionId, buffer));
+    }
+
+    public static GroupCoordinatorRequest parse(ByteBuffer buffer) {
+        return new GroupCoordinatorRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
new file mode 100644
index 0000000..c28de70
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class GroupCoordinatorResponse extends AbstractRequestResponse {
+    
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id);
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String COORDINATOR_KEY_NAME = "coordinator";
+
+    // coordinator level field names
+    private static final String NODE_ID_KEY_NAME = "node_id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+
+    private final short errorCode;
+    private final Node node;
+
+    public GroupCoordinatorResponse(short errorCode, Node node) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+        coordinator.set(NODE_ID_KEY_NAME, node.id());
+        coordinator.set(HOST_KEY_NAME, node.host());
+        coordinator.set(PORT_KEY_NAME, node.port());
+        struct.set(COORDINATOR_KEY_NAME, coordinator);
+        this.errorCode = errorCode;
+        this.node = node;
+    }
+
+    public GroupCoordinatorResponse(Struct struct) {
+        super(struct);
+        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
+        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+        String host = broker.getString(HOST_KEY_NAME);
+        int port = broker.getInt(PORT_KEY_NAME);
+        node = new Node(nodeId, host, port);
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public Node node() {
+        return node;
+    }
+
+    public static GroupCoordinatorResponse parse(ByteBuffer buffer) {
+        return new GroupCoordinatorResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
deleted file mode 100644
index fd54c5a..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataRequest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class GroupMetadataRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.GROUP_METADATA.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-
-    private final String groupId;
-
-    public GroupMetadataRequest(String groupId) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        this.groupId = groupId;
-    }
-
-    public GroupMetadataRequest(Struct struct) {
-        super(struct);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        switch (versionId) {
-            case 0:
-                return new GroupMetadataResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.GROUP_METADATA.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public static GroupMetadataRequest parse(ByteBuffer buffer, int versionId) {
-        return new GroupMetadataRequest(ProtoUtils.parseRequest(ApiKeys.GROUP_METADATA.id, versionId, buffer));
-    }
-
-    public static GroupMetadataRequest parse(ByteBuffer buffer) {
-        return new GroupMetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
deleted file mode 100644
index a5ef478..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupMetadataResponse.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class GroupMetadataResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_METADATA.id);
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private static final String COORDINATOR_KEY_NAME = "coordinator";
-
-    // coordinator level field names
-    private static final String NODE_ID_KEY_NAME = "node_id";
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
-
-    private final short errorCode;
-    private final Node node;
-
-    public GroupMetadataResponse(short errorCode, Node node) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
-        coordinator.set(NODE_ID_KEY_NAME, node.id());
-        coordinator.set(HOST_KEY_NAME, node.host());
-        coordinator.set(PORT_KEY_NAME, node.port());
-        struct.set(COORDINATOR_KEY_NAME, coordinator);
-        this.errorCode = errorCode;
-        this.node = node;
-    }
-
-    public GroupMetadataResponse(Struct struct) {
-        super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
-        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
-        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
-        String host = broker.getString(HOST_KEY_NAME);
-        int port = broker.getInt(PORT_KEY_NAME);
-        node = new Node(nodeId, host, port);
-    }
-
-    public short errorCode() {
-        return errorCode;
-    }
-
-    public Node node() {
-        return node;
-    }
-
-    public static GroupMetadataResponse parse(ByteBuffer buffer) {
-        return new GroupMetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
new file mode 100644
index 0000000..439720f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class ListGroupsRequest extends AbstractRequest {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_GROUPS.id);
+
+    public ListGroupsRequest() {
+        super(new Struct(CURRENT_SCHEMA));
+    }
+
+    public ListGroupsRequest(Struct struct) {
+        super(struct);
+    }
+
+    @Override
+    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+        switch (versionId) {
+            case 0:
+                short errorCode = Errors.forException(e).code();
+                return new ListGroupsResponse(errorCode, Collections.<ListGroupsResponse.Group>emptyList());
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_GROUPS.id)));
+        }
+    }
+
+    public static ListGroupsRequest parse(ByteBuffer buffer, int versionId) {
+        return new ListGroupsRequest(ProtoUtils.parseRequest(ApiKeys.LIST_GROUPS.id, versionId, buffer));
+    }
+
+    public static ListGroupsRequest parse(ByteBuffer buffer) {
+        return new ListGroupsRequest((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
new file mode 100644
index 0000000..d07f0d1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ListGroupsResponse extends AbstractRequestResponse {
+
+    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_GROUPS.id);
+
+    public static final String ERROR_CODE_KEY_NAME = "error_code";
+    public static final String GROUPS_KEY_NAME = "groups";
+    public static final String GROUP_ID_KEY_NAME = "group_id";
+    public static final String PROTOCOL_TYPE_KEY_NAME = "protocol_type";
+
+    /**
+     * Possible error codes:
+     *
+     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * AUTHORIZATION_FAILED (29)
+     */
+
+    private final short errorCode;
+    private final List<Group> groups;
+
+    public ListGroupsResponse(short errorCode, List<Group> groups) {
+        super(new Struct(CURRENT_SCHEMA));
+        struct.set(ERROR_CODE_KEY_NAME, errorCode);
+        List<Struct> groupList = new ArrayList<>();
+        for (Group group : groups) {
+            Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
+            groupStruct.set(GROUP_ID_KEY_NAME, group.groupId);
+            groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
+            groupList.add(groupStruct);
+        }
+        struct.set(GROUPS_KEY_NAME, groupList.toArray());
+        this.errorCode = errorCode;
+        this.groups = groups;
+    }
+
+    public ListGroupsResponse(Struct struct) {
+        super(struct);
+        this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        this.groups = new ArrayList<>();
+        for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
+            Struct groupStruct = (Struct) groupObj;
+            String groupId = groupStruct.getString(GROUP_ID_KEY_NAME);
+            String protocolType = groupStruct.getString(PROTOCOL_TYPE_KEY_NAME);
+            this.groups.add(new Group(groupId, protocolType));
+        }
+    }
+
+    public List<Group> groups() {
+        return groups;
+    }
+
+    public short errorCode() {
+        return errorCode;
+    }
+
+    public static class Group {
+        private final String groupId;
+        private final String protocolType;
+
+        public Group(String groupId, String protocolType) {
+            this.groupId = groupId;
+            this.protocolType = protocolType;
+        }
+
+        public String groupId() {
+            return groupId;
+        }
+
+        public String protocolType() {
+            return protocolType;
+        }
+
+    }
+
+    public static ListGroupsResponse parse(ByteBuffer buffer) {
+        return new ListGroupsResponse((Struct) CURRENT_SCHEMA.read(buffer));
+    }
+
+    public static ListGroupsResponse fromError(Errors error) {
+        return new ListGroupsResponse(error.code(), Collections.<Group>emptyList());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 2029e92..8667f22 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -32,7 +32,7 @@ import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
@@ -729,7 +729,7 @@ public class ConsumerCoordinatorTest {
     }
 
     private Struct consumerMetadataResponse(Node node, short error) {
-        GroupMetadataResponse response = new GroupMetadataResponse(error, node);
+        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
         return response.toStruct();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index fb21802..761b9db 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -28,6 +28,7 @@ import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -43,9 +44,9 @@ public class RequestResponseTest {
         List<AbstractRequestResponse> requestResponseList = Arrays.asList(
                 createRequestHeader(),
                 createResponseHeader(),
-                createConsumerMetadataRequest(),
-                createConsumerMetadataRequest().getErrorResponse(0, new UnknownServerException()),
-                createConsumerMetadataResponse(),
+                createGroupCoordinatorRequest(),
+                createGroupCoordinatorRequest().getErrorResponse(0, new UnknownServerException()),
+                createGroupCoordinatorResponse(),
                 createControlledShutdownRequest(),
                 createControlledShutdownResponse(),
                 createControlledShutdownRequest().getErrorResponse(1, new UnknownServerException()),
@@ -61,6 +62,12 @@ public class RequestResponseTest {
                 createLeaveGroupRequest(),
                 createLeaveGroupRequest().getErrorResponse(0, new UnknownServerException()),
                 createLeaveGroupResponse(),
+                createListGroupsRequest(),
+                createListGroupsRequest().getErrorResponse(0, new UnknownServerException()),
+                createListGroupsResponse(),
+                createDescribeGroupRequest(),
+                createDescribeGroupRequest().getErrorResponse(0, new UnknownServerException()),
+                createDescribeGroupResponse(),
                 createListOffsetRequest(),
                 createListOffsetRequest().getErrorResponse(0, new UnknownServerException()),
                 createListOffsetResponse(),
@@ -150,12 +157,12 @@ public class RequestResponseTest {
         return new ResponseHeader(10);
     }
 
-    private AbstractRequest createConsumerMetadataRequest() {
-        return new GroupMetadataRequest("test-group");
+    private AbstractRequest createGroupCoordinatorRequest() {
+        return new GroupCoordinatorRequest("test-group");
     }
 
-    private AbstractRequestResponse createConsumerMetadataResponse() {
-        return new GroupMetadataResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
+    private AbstractRequestResponse createGroupCoordinatorResponse() {
+        return new GroupCoordinatorResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
     }
 
     private AbstractRequest createFetchRequest() {
@@ -193,6 +200,30 @@ public class RequestResponseTest {
         return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members);
     }
 
+    private AbstractRequest createListGroupsRequest() {
+        return new ListGroupsRequest();
+    }
+
+    private AbstractRequestResponse createListGroupsResponse() {
+        List<ListGroupsResponse.Group> groups = Arrays.asList(new ListGroupsResponse.Group("test-group", "consumer"));
+        return new ListGroupsResponse(Errors.NONE.code(), groups);
+    }
+
+    private AbstractRequest createDescribeGroupRequest() {
+        return new DescribeGroupsRequest(Collections.singletonList("test-group"));
+    }
+
+    private AbstractRequestResponse createDescribeGroupResponse() {
+        String clientId = "consumer-1";
+        String clientHost = "localhost";
+        ByteBuffer empty = ByteBuffer.allocate(0);
+        DescribeGroupsResponse.GroupMember member = new DescribeGroupsResponse.GroupMember("memberId",
+                clientId, clientHost, empty, empty);
+        DescribeGroupsResponse.GroupMetadata metadata = new DescribeGroupsResponse.GroupMetadata(Errors.NONE.code(),
+                "STABLE", "consumer", "roundrobin", Arrays.asList(member));
+        return new DescribeGroupsResponse(Collections.singletonMap("test-group", metadata));
+    }
+
     private AbstractRequest createLeaveGroupRequest() {
         return new LeaveGroupRequest("group1", "consumer1");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/596c203a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
index ca53674..ac9df44 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -386,7 +386,7 @@ public class WorkerCoordinatorTest {
 
 
     private Struct groupMetadataResponse(Node node, short error) {
-        GroupMetadataResponse response = new GroupMetadataResponse(error, node);
+        GroupCoordinatorResponse response = new GroupCoordinatorResponse(error, node);
         return response.toStruct();
     }
 


Mime
View raw message