kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [2/2] kafka git commit: KAFKA-5043; Rename GroupCoordinator to FindCoordinator (KIP-98)
Date Tue, 11 Apr 2017 08:31:12 GMT
KAFKA-5043; Rename GroupCoordinator to FindCoordinator (KIP-98)

Also:
1. FindCoordinator is more general and takes a coordinator_type
so that it can be used for the group and transaction coordinators.
2. Include an error message in FindCoordinatorResponse to make the
errors at the client side more informative. We have just added the
field to the protocol in this PR, a subsequent PR will update the
code to use it.
3. Rename `Errors` names for FindCoordinator to be more generic. This
is a compatible change as the ids remain the same.
4. Since the exception classes for the error codes are in a public
package, we introduce new ones and deprecate the old ones.
The classes were not thrown back to the user (KAFKA-5052 aside),
so this is a compatible change.
5. Update InitPidRequest for transactions. Since this protocol API
was introduced recently and is not used by default, we did not bump
its version.

Author: Apurva Mehta <apurva@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #2825 from apurvam/exactly-once-rpc-stubs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0e7c6b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0e7c6b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0e7c6b9

Branch: refs/heads/trunk
Commit: d0e7c6b9304b23ced046934c799df0cba39c28e5
Parents: 749e9e1
Author: Apurva Mehta <apurva@confluent.io>
Authored: Tue Apr 11 09:05:09 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Apr 11 09:11:46 2017 +0100

----------------------------------------------------------------------
 .../RetriableCommitFailedException.java         |   7 +-
 .../consumer/internals/AbstractCoordinator.java |  36 ++---
 .../consumer/internals/ConsumerCoordinator.java |  15 +-
 .../consumer/internals/RequestFuture.java       |   2 +-
 .../CoordinatorLoadInProgressException.java     |  39 +++++
 .../CoordinatorNotAvailableException.java       |  43 ++++++
 .../GroupCoordinatorNotAvailableException.java  |   5 +-
 .../errors/GroupLoadInProgressException.java    |   5 +-
 .../errors/InvalidTxnTimeoutException.java      |  33 +++++
 .../common/errors/NotCoordinatorException.java  |  38 +++++
 .../errors/NotCoordinatorForGroupException.java |   5 +-
 .../apache/kafka/common/protocol/ApiKeys.java   |   2 +-
 .../apache/kafka/common/protocol/Errors.java    |  24 +--
 .../apache/kafka/common/protocol/Protocol.java  |  57 +++++---
 .../kafka/common/requests/AbstractRequest.java  |   4 +-
 .../kafka/common/requests/AbstractResponse.java |   4 +-
 .../common/requests/DescribeGroupsResponse.java |   6 +-
 .../common/requests/FindCoordinatorRequest.java | 145 +++++++++++++++++++
 .../requests/FindCoordinatorResponse.java       |  95 ++++++++++++
 .../requests/GroupCoordinatorRequest.java       |  89 ------------
 .../requests/GroupCoordinatorResponse.java      |  85 -----------
 .../common/requests/HeartbeatResponse.java      |   2 +-
 .../kafka/common/requests/InitPidRequest.java   |  31 +++-
 .../common/requests/JoinGroupResponse.java      |   4 +-
 .../common/requests/ListGroupsResponse.java     |   2 +-
 .../common/requests/OffsetCommitResponse.java   |   4 +-
 .../common/requests/OffsetFetchResponse.java    |   6 +-
 .../common/requests/SyncGroupResponse.java      |   2 +-
 .../clients/consumer/KafkaConsumerTest.java     |  14 +-
 .../internals/AbstractCoordinatorTest.java      |   6 +-
 .../internals/ConsumerCoordinatorTest.java      |  42 +++---
 .../common/requests/RequestResponseTest.java    |  30 ++--
 .../distributed/WorkerCoordinatorTest.java      |   6 +-
 .../kafka/connect/util/KafkaBasedLogTest.java   |   2 +-
 .../main/scala/kafka/admin/AdminClient.scala    |  10 +-
 .../kafka/api/GroupCoordinatorRequest.scala     |   4 +-
 .../kafka/common/OffsetMetadataAndError.scala   |   6 +-
 .../consumer/ZookeeperConsumerConnector.scala   |   8 +-
 .../kafka/coordinator/GroupCoordinator.scala    |  62 ++++----
 .../coordinator/GroupMetadataManager.scala      |  12 +-
 .../coordinator/TransactionCoordinator.scala    |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  42 ++++--
 .../kafka/api/AuthorizerIntegrationTest.scala   |  12 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala |  43 +++---
 .../api/RequestResponseSerializationTest.scala  |   2 +-
 .../GroupCoordinatorResponseTest.scala          |  12 +-
 .../coordinator/GroupMetadataManagerTest.scala  |  20 +--
 47 files changed, 717 insertions(+), 408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
index 510362a..69f21a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
@@ -22,12 +22,17 @@ public class RetriableCommitFailedException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
 
+    public static RetriableCommitFailedException withUnderlyingMessage(String additionalMessage) {
+        return new RetriableCommitFailedException("Offset commit failed with a retriable exception. " +
+                "You should retry committing offsets. The underlying error was: " + additionalMessage);
+    }
+
     public RetriableCommitFailedException(Throwable t) {
         super("Offset commit failed with a retriable exception. You should retry committing offsets.", t);
     }
 
     public RetriableCommitFailedException(String message) {
-        super("Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: " + message);
+        super(message);
     }
 
     public RetriableCommitFailedException(String message, Throwable t) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index ffafddc..d0de4bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RetriableException;
@@ -35,8 +35,8 @@ import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.GroupCoordinatorRequest;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -465,7 +465,7 @@ public abstract class AbstractCoordinator implements Closeable {
                         }
                     }
                 }
-            } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                 log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,
                         coordinator());
                 // backoff and retry
@@ -475,8 +475,8 @@ public abstract class AbstractCoordinator implements Closeable {
                 resetGeneration();
                 log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
-            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
-                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                    || error == Errors.NOT_COORDINATOR) {
                 // re-discover the coordinator and retry with backoff
                 coordinatorDead();
                 log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message());
@@ -550,8 +550,8 @@ public abstract class AbstractCoordinator implements Closeable {
                     log.debug("SyncGroup for group {} failed due to {}", groupId, error);
                     resetGeneration();
                     future.raise(error);
-                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
-                        || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+                } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                        || error == Errors.NOT_COORDINATOR) {
                     log.debug("SyncGroup for group {} failed due to {}", groupId, error);
                     coordinatorDead();
                     future.raise(error);
@@ -570,8 +570,8 @@ public abstract class AbstractCoordinator implements Closeable {
     private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
         // initiate the group metadata request
         log.debug("Sending GroupCoordinator request for group {} to broker {}", groupId, node);
-        GroupCoordinatorRequest.Builder requestBuilder =
-                new GroupCoordinatorRequest.Builder(this.groupId);
+        FindCoordinatorRequest.Builder requestBuilder =
+                new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
         return client.send(node, requestBuilder)
                      .compose(new GroupCoordinatorResponseHandler());
     }
@@ -582,18 +582,18 @@ public abstract class AbstractCoordinator implements Closeable {
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
             log.debug("Received GroupCoordinator response {} for group {}", resp, groupId);
 
-            GroupCoordinatorResponse groupCoordinatorResponse = (GroupCoordinatorResponse) resp.responseBody();
+            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
             // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
-            Errors error = groupCoordinatorResponse.error();
+            Errors error = findCoordinatorResponse.error();
             clearFindCoordinatorFuture();
             if (error == Errors.NONE) {
                 synchronized (AbstractCoordinator.this) {
                     AbstractCoordinator.this.coordinator = new Node(
-                            Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
-                            groupCoordinatorResponse.node().host(),
-                            groupCoordinatorResponse.node().port());
+                            Integer.MAX_VALUE - findCoordinatorResponse.node().id(),
+                            findCoordinatorResponse.node().host(),
+                            findCoordinatorResponse.node().port());
                     log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
                     client.tryConnect(coordinator);
                     heartbeat.resetTimeouts(time.milliseconds());
@@ -640,7 +640,7 @@ public abstract class AbstractCoordinator implements Closeable {
     protected synchronized void coordinatorDead() {
         if (this.coordinator != null) {
             log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
-            client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
+            client.failUnsentRequests(this.coordinator, CoordinatorNotAvailableException.INSTANCE);
             this.coordinator = null;
         }
     }
@@ -749,8 +749,8 @@ public abstract class AbstractCoordinator implements Closeable {
             if (error == Errors.NONE) {
                 log.debug("Received successful Heartbeat response for group {}", groupId);
                 future.complete(null);
-            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
-                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                    || error == Errors.NOT_COORDINATOR) {
                 log.debug("Attempt to heartbeat failed for group {} since coordinator {} is either not started or not valid.",
                         groupId, coordinator());
                 coordinatorDead();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 7d21767..ca2108d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -521,7 +521,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 @Override
                 public void onFailure(RuntimeException e) {
                     pendingAsyncCommits.decrementAndGet();
-                    completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e.getMessage())));
+                    completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
+                            RetriableCommitFailedException.withUnderlyingMessage(e.getMessage())));
                 }
             });
         }
@@ -550,7 +551,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 Exception commitException = e;
 
                 if (e instanceof RetriableException)
-                    commitException = new RetriableCommitFailedException(e.getMessage());
+                    commitException = RetriableCommitFailedException.withUnderlyingMessage(e.getMessage());
 
                 completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
             }
@@ -754,13 +755,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
                     future.raise(error);
                     return;
-                } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+                } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                     // just retry
                     log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                     future.raise(error);
                     return;
-                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
-                        || error == Errors.NOT_COORDINATOR_FOR_GROUP
+                } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                        || error == Errors.NOT_COORDINATOR
                         || error == Errors.REQUEST_TIMED_OUT) {
                     log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                     coordinatorDead();
@@ -823,10 +824,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 Errors error = response.error();
                 log.debug("Offset fetch for group {} failed: {}", groupId, error.message());
 
-                if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                     // just retry
                     future.raise(error);
-                } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+                } else if (error == Errors.NOT_COORDINATOR) {
                     // re-discover the coordinator and retry
                     coordinatorDead();
                     future.raise(error);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 8515c95..f7e8ca1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -239,7 +239,7 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
     }
 
     public static <T> RequestFuture<T> coordinatorNotAvailable() {
-        return failure(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception());
+        return failure(Errors.COORDINATOR_NOT_AVAILABLE.exception());
     }
 
     public static <T> RequestFuture<T> leaderNotAvailable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorLoadInProgressException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorLoadInProgressException.java
new file mode 100644
index 0000000..4bdb978
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorLoadInProgressException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * In the context of the group coordinator, the broker returns this error code for any coordinator request if
+ * it is still loading the group metadata (e.g. after a leader change for that group metadata topic partition).
+ *
+ * In the context of the transactional coordinator, this error will be returned if there is a pending transactional
+ * request with the same transactional id, or if the transaction cache is currently being populated from the transaction
+ * log.
+ */
+public class CoordinatorLoadInProgressException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public CoordinatorLoadInProgressException(String message) {
+        super(message);
+    }
+
+    public CoordinatorLoadInProgressException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java
new file mode 100644
index 0000000..827ce54
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * In the context of the group coordinator, the broker returns this error code for metadata or offset commit
+ * requests if the group metadata topic has not been created yet.
+ *
+ * In the context of the transactional coordinator, this error will be returned if the underlying transactional log
+ * is under replicated or if an append to the log times out.
+ */
+public class CoordinatorNotAvailableException extends RetriableException {
+    public static final CoordinatorNotAvailableException INSTANCE = new CoordinatorNotAvailableException();
+
+    private static final long serialVersionUID = 1L;
+
+    private CoordinatorNotAvailableException() {
+        super();
+    }
+
+    public CoordinatorNotAvailableException(String message) {
+        super(message);
+    }
+
+    public CoordinatorNotAvailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
index 3409b68..03a7719 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
@@ -19,7 +19,10 @@ package org.apache.kafka.common.errors;
 /**
  * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has
  * not yet been created.
+ *
+ * @deprecated As of Kafka 0.11, this has been replaced by {@link CoordinatorNotAvailableException}
  */
+@Deprecated
 public class GroupCoordinatorNotAvailableException extends RetriableException {
     public static final GroupCoordinatorNotAvailableException INSTANCE = new GroupCoordinatorNotAvailableException();
 
@@ -41,4 +44,4 @@ public class GroupCoordinatorNotAvailableException extends RetriableException {
         super(cause);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
index f579a37..73daa5f 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
@@ -19,7 +19,10 @@ package org.apache.kafka.common.errors;
 /**
  * The broker returns this error code for any coordinator request if it is still loading the metadata (after a leader change
  * for that offsets topic partition) for this group.
+ *
+ * @deprecated As of Kafka 0.11, this has been replaced by {@link CoordinatorLoadInProgressException}
  */
+@Deprecated
 public class GroupLoadInProgressException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
@@ -40,4 +43,4 @@ public class GroupLoadInProgressException extends RetriableException {
         super(cause);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
new file mode 100644
index 0000000..12d873e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * The transaction coordinator returns this error code if the timeout received via the InitPidRequest is larger than
+ * the `max.transaction.timeout.ms` config value.
+ */
+public class InvalidTxnTimeoutException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidTxnTimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public InvalidTxnTimeoutException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java
new file mode 100644
index 0000000..00ca32c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * In the context of the group coordinator, the broker returns this error code if it receives an offset fetch
+ * or commit request for a group it's not the coordinator of.
+ *
+ * In the context of the transactional coordinator, it returns this error when it receives a transactional
+ * request with a transactionalId the coordinator doesn't own.
+ */
+public class NotCoordinatorException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NotCoordinatorException(String message) {
+        super(message);
+    }
+
+    public NotCoordinatorException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
index d2ffaea..cee6495 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
@@ -19,7 +19,10 @@ package org.apache.kafka.common.errors;
 /**
  * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is
  * not a coordinator for.
+ *
+ * @deprecated As of Kafka 0.11, this has been replaced by {@link NotCoordinatorException}
  */
+@Deprecated
 public class NotCoordinatorForGroupException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
@@ -40,4 +43,4 @@ public class NotCoordinatorForGroupException extends RetriableException {
         super(cause);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 63bcfec..9e7ce1d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -35,7 +35,7 @@ public enum ApiKeys {
     CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
     OFFSET_COMMIT(8, "OffsetCommit"),
     OFFSET_FETCH(9, "OffsetFetch"),
-    GROUP_COORDINATOR(10, "GroupCoordinator"),
+    FIND_COORDINATOR(10, "FindCoordinator"),
     JOIN_GROUP(11, "JoinGroup"),
     HEARTBEAT(12, "Heartbeat"),
     LEAVE_GROUP(13, "LeaveGroup"),

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index ccebd93..375cf16 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -23,8 +23,8 @@ import org.apache.kafka.common.errors.ControllerMovedException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.GroupLoadInProgressException;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
@@ -42,10 +42,11 @@ import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
 import org.apache.kafka.common.errors.InvalidTimestampException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
-import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -109,12 +110,12 @@ public enum Errors {
             new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
     NETWORK_EXCEPTION(13,
             new NetworkException("The server disconnected before a response was received.")),
-    GROUP_LOAD_IN_PROGRESS(14,
-            new GroupLoadInProgressException("The coordinator is loading and hence can't process requests for this group.")),
-    GROUP_COORDINATOR_NOT_AVAILABLE(15,
-            new GroupCoordinatorNotAvailableException("The group coordinator is not available.")),
-    NOT_COORDINATOR_FOR_GROUP(16,
-            new NotCoordinatorForGroupException("This is not the correct coordinator for this group.")),
+    COORDINATOR_LOAD_IN_PROGRESS(14,
+            new CoordinatorLoadInProgressException("The coordinator is loading and hence can't process requests.")),
+    COORDINATOR_NOT_AVAILABLE(15,
+            new CoordinatorNotAvailableException("The coordinator is not available.")),
+    NOT_COORDINATOR(16,
+            new NotCoordinatorException("This is not the correct coordinator.")),
     INVALID_TOPIC_EXCEPTION(17,
             new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
     RECORD_LIST_TOO_LARGE(18,
@@ -182,7 +183,10 @@ public enum Errors {
     INVALID_TXN_STATE(48,
         new InvalidTxnStateException("The producer attempted a transactional operation in an invalid state")),
     INVALID_PID_MAPPING(49,
-        new InvalidPidMappingException("The PID mapping is invalid"));
+        new InvalidPidMappingException("The PID mapping is invalid")),
+    INVALID_TRANSACTION_TIMEOUT(50,
+        new InvalidTxnTimeoutException("The transaction timeout is larger than the maximum value allowed by the broker " +
+            "(as configured by max.transaction.timeout.ms)."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 4c58bb8..54f533e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -774,23 +774,43 @@ public class Protocol {
     public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0};
     public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0};
 
-    /* Group coordinator api */
-    public static final Schema GROUP_COORDINATOR_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                                   STRING,
-                                                                                   "The unique group id."));
+    /* Find coordinator api */
+    public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
+            new Field("group_id",
+                    STRING,
+                    "The unique group id."));
+
+    public static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
+            new Field("coordinator_key",
+                    STRING,
+                    "Id to use for finding the coordinator (for groups, this is the groupId, " +
+                            "for transactional producers, this is the transactional id)"),
+            new Field("coordinator_type",
+                    INT8,
+                    "The type of coordinator to find (0 = group, 1 = transaction)"));
 
-    public static final Schema GROUP_COORDINATOR_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
-                                                                        new Field("host", STRING, "The hostname of the broker."),
-                                                                        new Field("port", INT32,
-                                                                            "The port on which the broker accepts requests."));
+    public static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
+            new Field("node_id", INT32, "The broker id."),
+            new Field("host", STRING, "The hostname of the broker."),
+            new Field("port", INT32,
+                    "The port on which the broker accepts requests."));
 
-    public static final Schema GROUP_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                          new Field("coordinator",
-                                                                                    GROUP_COORDINATOR_BROKER_V0,
-                                                                                    "Host and port information for the coordinator for a consumer group."));
+    public static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
+            new Field("error_code", INT16),
+            new Field("coordinator",
+                    FIND_COORDINATOR_BROKER_V0,
+                    "Host and port information for the coordinator for a consumer group."));
+
+    public static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
+            new Field("error_code", INT16),
+            new Field("error_message", NULLABLE_STRING),
+            new Field("coordinator",
+                    FIND_COORDINATOR_BROKER_V0,
+                    "Host and port information for the coordinator for a consumer group."));
 
-    public static final Schema[] GROUP_COORDINATOR_REQUEST = new Schema[] {GROUP_COORDINATOR_REQUEST_V0};
-    public static final Schema[] GROUP_COORDINATOR_RESPONSE = new Schema[] {GROUP_COORDINATOR_RESPONSE_V0};
+
+    public static final Schema[] FIND_COORDINATOR_REQUEST = new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1};
+    public static final Schema[] FIND_COORDINATOR_RESPONSE = new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
 
     /* Controlled shutdown api */
     public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
@@ -1180,7 +1200,10 @@ public class Protocol {
     public static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
             new Field("transactional_id",
                     NULLABLE_STRING,
-                    "The transactional id whose producer id we want to retrieve or generate.")
+                    "The transactional id whose producer id we want to retrieve or generate."),
+            new Field("transaction_timeout_ms",
+                    INT32,
+                    "The time in ms to wait for before aborting idle transactions sent by this producer.")
     );
 
     public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
@@ -1432,7 +1455,7 @@ public class Protocol {
         REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST;
         REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
         REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
-        REQUESTS[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_REQUEST;
+        REQUESTS[ApiKeys.FIND_COORDINATOR.id] = FIND_COORDINATOR_REQUEST;
         REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
         REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
         REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST;
@@ -1462,7 +1485,7 @@ public class Protocol {
         RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
-        RESPONSES[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_RESPONSE;
+        RESPONSES[ApiKeys.FIND_COORDINATOR.id] = FIND_COORDINATOR_RESPONSE;
         RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
         RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
         RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index bd4bc49..07bde63 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -123,8 +123,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             case OFFSET_FETCH:
                 request = new OffsetFetchRequest(struct, version);
                 break;
-            case GROUP_COORDINATOR:
-                request = new GroupCoordinatorRequest(struct, version);
+            case FIND_COORDINATOR:
+                request = new FindCoordinatorRequest(struct, version);
                 break;
             case JOIN_GROUP:
                 request = new JoinGroupRequest(struct, version);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 433539c..2286783 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -61,8 +61,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new OffsetCommitResponse(struct);
             case OFFSET_FETCH:
                 return new OffsetFetchResponse(struct);
-            case GROUP_COORDINATOR:
-                return new GroupCoordinatorResponse(struct);
+            case FIND_COORDINATOR:
+                return new FindCoordinatorResponse(struct);
             case JOIN_GROUP:
                 return new JoinGroupResponse(struct);
             case HEARTBEAT:

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 5496d66..797ed58 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -51,9 +51,9 @@ public class DescribeGroupsResponse extends AbstractResponse {
     /**
      * Possible per-group error codes:
      *
-     * GROUP_LOAD_IN_PROGRESS (14)
-     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
+     * COORDINATOR_LOAD_IN_PROGRESS (14)
+     * COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR (16)
      * AUTHORIZATION_FAILED (29)
      */
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
new file mode 100644
index 0000000..46f3426
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class FindCoordinatorRequest extends AbstractRequest {
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
+    private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
+
+    public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
+        private final String coordinatorKey;
+        private final CoordinatorType coordinatorType;
+        private final short minVersion;
+
+        public Builder(CoordinatorType coordinatorType, String coordinatorKey) {
+            super(ApiKeys.FIND_COORDINATOR);
+            this.coordinatorType = coordinatorType;
+            this.coordinatorKey = coordinatorKey;
+            this.minVersion = coordinatorType == CoordinatorType.TRANSACTION ? (short) 1 : (short) 0;
+        }
+
+        @Override
+        public FindCoordinatorRequest build(short version) {
+            if (version < minVersion)
+                throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
+                        "because we require features supported only in " + minVersion + " or later.");
+            return new FindCoordinatorRequest(coordinatorType, coordinatorKey, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=FindCoordinatorRequest, coordinatorKey=");
+            bld.append(coordinatorKey);
+            bld.append(", coordinatorType=");
+            bld.append(coordinatorType);
+            bld.append(")");
+            return bld.toString();
+        }
+    }
+
+    private final String coordinatorKey;
+    private final CoordinatorType coordinatorType;
+
+    private FindCoordinatorRequest(CoordinatorType coordinatorType, String coordinatorKey, short version) {
+        super(version);
+        this.coordinatorType = coordinatorType;
+        this.coordinatorKey = coordinatorKey;
+    }
+
+    public FindCoordinatorRequest(Struct struct, short version) {
+        super(version);
+
+        if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
+            this.coordinatorType = CoordinatorType.forId(struct.getByte(COORDINATOR_TYPE_KEY_NAME));
+        else
+            this.coordinatorType = CoordinatorType.GROUP;
+        if (struct.hasField(GROUP_ID_KEY_NAME))
+            this.coordinatorKey = struct.getString(GROUP_ID_KEY_NAME);
+        else
+            this.coordinatorKey = struct.getString(COORDINATOR_KEY_KEY_NAME);
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+            case 1:
+                return new FindCoordinatorResponse(Errors.forException(e), Node.noNode());
+
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ApiKeys.FIND_COORDINATOR.latestVersion()));
+        }
+    }
+
+    public String coordinatorKey() {
+        return coordinatorKey;
+    }
+
+    public CoordinatorType coordinatorType() {
+        return coordinatorType;
+    }
+
+    public static FindCoordinatorRequest parse(ByteBuffer buffer, short version) {
+        return new FindCoordinatorRequest(ApiKeys.FIND_COORDINATOR.parseRequest(version, buffer), version);
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.requestSchema(version()));
+        if (struct.hasField(GROUP_ID_KEY_NAME))
+            struct.set(GROUP_ID_KEY_NAME, coordinatorKey);
+        else
+            struct.set(COORDINATOR_KEY_KEY_NAME, coordinatorKey);
+        if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
+            struct.set(COORDINATOR_TYPE_KEY_NAME, coordinatorType.id);
+        return struct;
+    }
+
+    public enum CoordinatorType {
+        GROUP((byte) 0), TRANSACTION((byte) 1);
+
+        final byte id;
+
+        CoordinatorType(byte id) {
+            this.id = id;
+        }
+
+        public static CoordinatorType forId(byte id) {
+            switch (id) {
+                case 0:
+                    return GROUP;
+                case 1:
+                    return TRANSACTION;
+                default:
+                    throw new IllegalArgumentException("Unknown coordinator type received: " + id);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
new file mode 100644
index 0000000..f96f123
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class FindCoordinatorResponse extends AbstractResponse {
+
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
+    private static final String COORDINATOR_KEY_NAME = "coordinator";
+
+    /**
+     * Possible error codes:
+     *
+     * COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR (16)
+     * GROUP_AUTHORIZATION_FAILED (30)
+     */
+
+    // coordinator level field names
+    private static final String NODE_ID_KEY_NAME = "node_id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+
+    private final String errorMessage;
+    private final Errors error;
+    private final Node node;
+
+    public FindCoordinatorResponse(Errors error, Node node) {
+        this.error = error;
+        this.node = node;
+        this.errorMessage = null;
+    }
+
+    public FindCoordinatorResponse(Struct struct) {
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
+            errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME);
+        else
+            errorMessage = null;
+
+        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
+        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+        String host = broker.getString(HOST_KEY_NAME);
+        int port = broker.getInt(PORT_KEY_NAME);
+        node = new Node(nodeId, host, port);
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public Node node() {
+        return node;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
+            struct.set(ERROR_MESSAGE_KEY_NAME, errorMessage);
+
+        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+        coordinator.set(NODE_ID_KEY_NAME, node.id());
+        coordinator.set(HOST_KEY_NAME, node.host());
+        coordinator.set(PORT_KEY_NAME, node.port());
+        struct.set(COORDINATOR_KEY_NAME, coordinator);
+        return struct;
+    }
+
+    public static FindCoordinatorResponse parse(ByteBuffer buffer, short version) {
+        return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
deleted file mode 100644
index b45054c..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class GroupCoordinatorRequest extends AbstractRequest {
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-
-    public static class Builder extends AbstractRequest.Builder<GroupCoordinatorRequest> {
-        private final String groupId;
-
-        public Builder(String groupId) {
-            super(ApiKeys.GROUP_COORDINATOR);
-            this.groupId = groupId;
-        }
-
-        @Override
-        public GroupCoordinatorRequest build(short version) {
-            return new GroupCoordinatorRequest(this.groupId, version);
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=GroupCoordinatorRequest, groupId=");
-            bld.append(groupId).append(")");
-            return bld.toString();
-        }
-    }
-
-    private final String groupId;
-
-    private GroupCoordinatorRequest(String groupId, short version) {
-        super(version);
-        this.groupId = groupId;
-    }
-
-    public GroupCoordinatorRequest(Struct struct, short versionId) {
-        super(versionId);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-    }
-
-    @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
-        short versionId = version();
-        switch (versionId) {
-            case 0:
-                return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ApiKeys.GROUP_COORDINATOR.latestVersion()));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public static GroupCoordinatorRequest parse(ByteBuffer buffer, short version) {
-        return new GroupCoordinatorRequest(ApiKeys.GROUP_COORDINATOR.parseRequest(version, buffer), version);
-    }
-
-    @Override
-    protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.GROUP_COORDINATOR.requestSchema(version()));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        return struct;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
deleted file mode 100644
index f8a9f8f..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class GroupCoordinatorResponse extends AbstractResponse {
-
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private static final String COORDINATOR_KEY_NAME = "coordinator";
-
-    /**
-     * Possible error codes:
-     *
-     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
-     * GROUP_AUTHORIZATION_FAILED (30)
-     */
-
-
-    // coordinator level field names
-    private static final String NODE_ID_KEY_NAME = "node_id";
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
-
-    private final Errors error;
-    private final Node node;
-
-    public GroupCoordinatorResponse(Errors error, Node node) {
-        this.error = error;
-        this.node = node;
-    }
-
-    public GroupCoordinatorResponse(Struct struct) {
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
-        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
-        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
-        String host = broker.getString(HOST_KEY_NAME);
-        int port = broker.getInt(PORT_KEY_NAME);
-        node = new Node(nodeId, host, port);
-    }
-
-    public Errors error() {
-        return error;
-    }
-
-    public Node node() {
-        return node;
-    }
-
-    @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.GROUP_COORDINATOR.responseSchema(version));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
-        coordinator.set(NODE_ID_KEY_NAME, node.id());
-        coordinator.set(HOST_KEY_NAME, node.host());
-        coordinator.set(PORT_KEY_NAME, node.port());
-        struct.set(COORDINATOR_KEY_NAME, coordinator);
-        return struct;
-    }
-
-    public static GroupCoordinatorResponse parse(ByteBuffer buffer, short version) {
-        return new GroupCoordinatorResponse(ApiKeys.GROUP_COORDINATOR.parseResponse(version, buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 18d63f8..9bc400c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -30,7 +30,7 @@ public class HeartbeatResponse extends AbstractResponse {
      * Possible error codes:
      *
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
+     * NOT_COORDINATOR (16)
      * ILLEGAL_GENERATION (22)
      * UNKNOWN_MEMBER_ID (25)
      * REBALANCE_IN_PROGRESS (27)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
index 284107f..dedbc0f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
@@ -23,27 +23,45 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class InitPidRequest extends AbstractRequest {
+    public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
+
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+    private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
+
 
     private final String transactionalId;
+    private final int transactionTimeoutMs;
 
     public static class Builder extends AbstractRequest.Builder<InitPidRequest> {
         private final String transactionalId;
+        private final int transactionTimeoutMs;
+
         public Builder(String transactionalId) {
+            this(transactionalId, NO_TRANSACTION_TIMEOUT_MS);
+        }
+
+        public Builder(String transactionalId, int transactionTimeoutMs) {
             super(ApiKeys.INIT_PRODUCER_ID);
+
+            if (transactionTimeoutMs <= 0)
+                throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs);
+
             if (transactionalId != null && transactionalId.isEmpty())
                 throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
+
             this.transactionalId = transactionalId;
+            this.transactionTimeoutMs = transactionTimeoutMs;
         }
 
         @Override
         public InitPidRequest build(short version) {
-            return new InitPidRequest(this.transactionalId, version);
+            return new InitPidRequest(version, transactionalId, transactionTimeoutMs);
         }
 
         @Override
         public String toString() {
-            return "(type=InitPidRequest)";
+            return "(type=InitPidRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
+                    transactionTimeoutMs + ")";
         }
 
     }
@@ -51,11 +69,13 @@ public class InitPidRequest extends AbstractRequest {
     public InitPidRequest(Struct struct, short version) {
         super(version);
         this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+        this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
     }
 
-    private InitPidRequest(String transactionalId, short version) {
+    private InitPidRequest(short version, String transactionalId, int transactionTimeoutMs) {
         super(version);
         this.transactionalId = transactionalId;
+        this.transactionTimeoutMs = transactionTimeoutMs;
     }
 
     @Override
@@ -71,10 +91,15 @@ public class InitPidRequest extends AbstractRequest {
         return transactionalId;
     }
 
+    public int transactionTimeoutMs() {
+        return transactionTimeoutMs;
+    }
+
     @Override
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
         struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index bd55e9b..1f702c7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -33,9 +33,9 @@ public class JoinGroupResponse extends AbstractResponse {
     /**
      * Possible error codes:
      *
-     * GROUP_LOAD_IN_PROGRESS (14)
+     * COORDINATOR_LOAD_IN_PROGRESS (14)
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
+     * NOT_COORDINATOR (16)
      * INCONSISTENT_GROUP_PROTOCOL (23)
      * UNKNOWN_MEMBER_ID (25)
      * INVALID_SESSION_TIMEOUT (26)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index f2ac33b..d0409ef 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -35,7 +35,7 @@ public class ListGroupsResponse extends AbstractResponse {
     /**
      * Possible error codes:
      *
-     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * COORDINATOR_NOT_AVAILABLE (15)
      * AUTHORIZATION_FAILED (29)
      */
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index b30505b..b1dae37 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -45,9 +45,9 @@ public class OffsetCommitResponse extends AbstractResponse {
      *
      * UNKNOWN_TOPIC_OR_PARTITION (3)
      * OFFSET_METADATA_TOO_LARGE (12)
-     * GROUP_LOAD_IN_PROGRESS (14)
+     * COORDINATOR_LOAD_IN_PROGRESS (14)
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
+     * NOT_COORDINATOR (16)
      * ILLEGAL_GENERATION (22)
      * UNKNOWN_MEMBER_ID (25)
      * REBALANCE_IN_PROGRESS (27)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index b16b233..69507f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -56,9 +56,9 @@ public class OffsetFetchResponse extends AbstractResponse {
      *   - UNKNOWN_TOPIC_OR_PARTITION (3)
      *
      * - Group or coordinator errors:
-     *   - GROUP_LOAD_IN_PROGRESS (14)
-     *   - GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     *   - NOT_COORDINATOR_FOR_GROUP (16)
+     *   - COORDINATOR_LOAD_IN_PROGRESS (14)
+     *   - COORDINATOR_NOT_AVAILABLE (15)
+     *   - NOT_COORDINATOR (16)
      *   - GROUP_AUTHORIZATION_FAILED (30)
      */
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 5d50c5c..4a06491 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -31,7 +31,7 @@ public class SyncGroupResponse extends AbstractResponse {
      * Possible error codes:
      *
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
+     * NOT_COORDINATOR (16)
      * ILLEGAL_GENERATION (22)
      * UNKNOWN_MEMBER_ID (25)
      * REBALANCE_IN_PROGRESS (27)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 3dd3983..6a9f3eb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -47,7 +47,7 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FetchResponse.PartitionData;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
@@ -494,7 +494,7 @@ public class KafkaConsumerTest {
         consumer.assign(singletonList(tp0));
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // fetch offset for one topic
@@ -1003,7 +1003,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1068,7 +1068,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1129,7 +1129,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1344,7 +1344,7 @@ public class KafkaConsumerTest {
     private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
-            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+            client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
             coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         }
 
@@ -1367,7 +1367,7 @@ public class KafkaConsumerTest {
     private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
-            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+            client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
             coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 62801d0..4779f43 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -466,8 +466,8 @@ public class AbstractCoordinatorTest {
         }, 3000, "Should have received a heartbeat request after joining the group");
     }
 
-    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
-        return new GroupCoordinatorResponse(error, node);
+    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
+        return new FindCoordinatorResponse(error, node);
     }
 
     private HeartbeatResponse heartbeatResponse(Errors error) {


Mime
View raw message