kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: KAFKA-5043; Rename GroupCoordinator to FindCoordinator (KIP-98)
Date Tue, 11 Apr 2017 08:31:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 749e9e14c -> d0e7c6b93


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 2a1f368..8b582ca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -39,7 +39,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
@@ -178,19 +178,19 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        // GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
+        // COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
         RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
         assertEquals(1, consumerClient.pendingRequestCount());
         assertFalse(future.isDone());
 
-        client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE));
+        client.prepareResponse(heartbeatResponse(Errors.COORDINATOR_NOT_AVAILABLE));
         time.sleep(sessionTimeoutMs);
         consumerClient.poll(0);
 
         assertTrue(future.isDone());
         assertTrue(future.failed());
-        assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
+        assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
         assertTrue(coordinator.coordinatorUnknown());
     }
 
@@ -205,13 +205,13 @@ public class ConsumerCoordinatorTest {
         assertEquals(1, consumerClient.pendingRequestCount());
         assertFalse(future.isDone());
 
-        client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP));
+        client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR));
         time.sleep(sessionTimeoutMs);
         consumerClient.poll(0);
 
         assertTrue(future.isDone());
         assertTrue(future.failed());
-        assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), future.exception());
+        assertEquals(Errors.NOT_COORDINATOR.exception(), future.exception());
         assertTrue(coordinator.coordinatorUnknown());
     }
 
@@ -1121,7 +1121,7 @@ public class ConsumerCoordinatorTest {
         int invokedBeforeTest = mockOffsetCommitCallback.invoked;
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
@@ -1135,7 +1135,7 @@ public class ConsumerCoordinatorTest {
 
         // async commit with coordinator not available
         MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
@@ -1151,7 +1151,7 @@ public class ConsumerCoordinatorTest {
 
         // async commit with not coordinator
         MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP)));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR)));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
@@ -1182,7 +1182,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP)));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR)));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
@@ -1194,7 +1194,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
@@ -1307,7 +1307,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(Errors.GROUP_LOAD_IN_PROGRESS));
+        client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
@@ -1348,7 +1348,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR_FOR_GROUP));
+        client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
@@ -1414,7 +1414,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
-        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
+        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
     }
@@ -1422,14 +1422,14 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
-        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
+        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
     }
 
     @Test
     public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
-        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
+        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
     }
@@ -1437,14 +1437,14 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
-        makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
+        makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
     }
 
     @Test
     public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
-        makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
+        makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
     }
@@ -1452,7 +1452,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
-        makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
+        makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
     }
@@ -1608,8 +1608,8 @@ public class ConsumerCoordinatorTest {
                 leaveGroup);
     }
 
-    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
-        return new GroupCoordinatorResponse(error, node);
+    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
+        return new FindCoordinatorResponse(error, node);
     }
 
     private HeartbeatResponse heartbeatResponse(Errors error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2e1a79d..e41c38e 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -63,9 +63,12 @@ public class RequestResponseTest {
 
     @Test
     public void testSerialization() throws Exception {
-        checkRequest(createGroupCoordinatorRequest());
-        checkErrorResponse(createGroupCoordinatorRequest(), new UnknownServerException());
-        checkResponse(createGroupCoordinatorResponse(), 0);
+        checkRequest(createFindCoordinatorRequest(0));
+        checkRequest(createFindCoordinatorRequest(1));
+        checkErrorResponse(createFindCoordinatorRequest(0), new UnknownServerException());
+        checkErrorResponse(createFindCoordinatorRequest(1), new UnknownServerException());
+        checkResponse(createFindCoordinatorResponse(), 0);
+        checkResponse(createFindCoordinatorResponse(), 1);
         checkRequest(createControlledShutdownRequest());
         checkResponse(createControlledShutdownResponse(), 1);
         checkErrorResponse(createControlledShutdownRequest(), new UnknownServerException());
@@ -100,7 +103,7 @@ public class RequestResponseTest {
         checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
         checkResponse(createOffsetCommitResponse(), 0);
         checkRequest(OffsetFetchRequest.forAllPartitions("group1"));
-        checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new NotCoordinatorForGroupException());
+        checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new NotCoordinatorException("Not Coordinator"));
         checkRequest(createOffsetFetchRequest(0));
         checkRequest(createOffsetFetchRequest(1));
         checkRequest(createOffsetFetchRequest(2));
@@ -262,6 +265,12 @@ public class RequestResponseTest {
         return buffer;
     }
 
+    @Test(expected = UnsupportedVersionException.class)
+    public void cannotUseFindCoordinatorV0ToFindTransactionCoordinator() {
+        FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
+        builder.build((short) 0);
+    }
+
     @Test
     public void produceRequestToStringTest() {
         ProduceRequest request = createProduceRequest(ApiKeys.PRODUCE.latestVersion());
@@ -495,12 +504,13 @@ public class RequestResponseTest {
         return new ResponseHeader(10);
     }
 
-    private GroupCoordinatorRequest createGroupCoordinatorRequest() {
-        return new GroupCoordinatorRequest.Builder("test-group").build();
+    private FindCoordinatorRequest createFindCoordinatorRequest(int version) {
+        return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
+                .build((short) version);
     }
 
-    private GroupCoordinatorResponse createGroupCoordinatorResponse() {
-        return new GroupCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014));
+    private FindCoordinatorResponse createFindCoordinatorResponse() {
+        return new FindCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014));
     }
 
     private FetchRequest createFetchRequest(int version) {
@@ -827,7 +837,7 @@ public class RequestResponseTest {
     }
 
     private InitPidRequest createInitPidRequest() {
-        return new InitPidRequest.Builder(null).build();
+        return new InitPidRequest.Builder(null, 100).build();
     }
 
     private InitPidResponse createInitPidResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 977cc21..ab042de 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
@@ -459,8 +459,8 @@ public class WorkerCoordinatorTest {
     }
 
 
-    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
-        return new GroupCoordinatorResponse(error, node);
+    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
+        return new FindCoordinatorResponse(error, node);
     }
 
     private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index a837e66..f734032 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -384,7 +384,7 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.setException(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception());
+                        consumer.setException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 8a9660b..4d218c1 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -100,15 +100,15 @@ class AdminClient(val time: Time,
 
   def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = {
     val startTime = time.milliseconds
-    val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
-    var response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
+    val requestBuilder = new FindCoordinatorRequest.Builder(org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType.GROUP, groupId)
+    var response = sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse]
 
-    while (response.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE && time.milliseconds - startTime < timeoutMs) {
+    while (response.error == Errors.COORDINATOR_NOT_AVAILABLE && time.milliseconds - startTime < timeoutMs) {
       Thread.sleep(retryBackoffMs)
-      response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
+      response = sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse]
     }
 
-    if (response.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+    if (response.error == Errors.COORDINATOR_NOT_AVAILABLE)
       throw new TimeoutException("The consumer group command timed out while waiting for group to initialize: ", response.error.exception)
 
     response.error.maybeThrow()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
index 2082e44..d99474d 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -44,7 +44,7 @@ case class GroupCoordinatorRequest(group: String,
                                    versionId: Short = GroupCoordinatorRequest.CurrentVersion,
                                    correlationId: Int = 0,
                                    clientId: String = GroupCoordinatorRequest.DefaultClientId)
-  extends RequestOrResponse(Some(ApiKeys.GROUP_COORDINATOR.id)) {
+  extends RequestOrResponse(Some(ApiKeys.FIND_COORDINATOR.id)) {
 
   def sizeInBytes =
     2 + /* versionId */
@@ -64,7 +64,7 @@ case class GroupCoordinatorRequest(group: String,
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     // return ConsumerCoordinatorNotAvailable for all uncaught errors
-    val errorResponse = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE, correlationId)
+    val errorResponse = GroupCoordinatorResponse(None, Errors.COORDINATOR_NOT_AVAILABLE, correlationId)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index e0aa46d..2cf9bb4 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -63,10 +63,10 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Errors
 
 object OffsetMetadataAndError {
   val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE)
-  val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_LOAD_IN_PROGRESS)
+  val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.COORDINATOR_LOAD_IN_PROGRESS)
   val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID)
-  val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP)
-  val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+  val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR)
+  val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.COORDINATOR_NOT_AVAILABLE)
   val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION)
   val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c5ad94a..b810f81 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -370,8 +370,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                       (error != Errors.NONE && error != Errors.OFFSET_METADATA_TOO_LARGE),
 
                     folded._3 || // update shouldRefreshCoordinator
-                      error == Errors.NOT_COORDINATOR_FOR_GROUP ||
-                      error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE,
+                      error == Errors.NOT_COORDINATOR ||
+                      error == Errors.COORDINATOR_NOT_AVAILABLE,
 
                     // update error count
                     folded._4 + (if (error != Errors.NONE) 1 else 0))
@@ -444,8 +444,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
             val (leaderChanged, loadInProgress) =
               offsetFetchResponse.requestInfo.values.foldLeft(false, false) { case (folded, offsetMetadataAndError) =>
-                (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP),
-                 folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS))
+                (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR),
+                 folded._2 || (offsetMetadataAndError.error == Errors.COORDINATOR_LOAD_IN_PROGRESS))
               }
 
             if (leaderChanged) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 70f4724..d78d1df 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -99,13 +99,13 @@ class GroupCoordinator(val brokerId: Int,
                       protocols: List[(String, Array[Byte])],
                       responseCallback: JoinCallback) {
     if (!isActive.get) {
-      responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE))
+      responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
     } else if (!validGroupId(groupId)) {
       responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID))
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP))
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
-      responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS))
+      responseCallback(joinError(memberId, Errors.NOT_COORDINATOR))
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      responseCallback(joinError(memberId, Errors.COORDINATOR_LOAD_IN_PROGRESS))
     } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
                sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
       responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
@@ -225,9 +225,9 @@ class GroupCoordinator(val brokerId: Int,
                       groupAssignment: Map[String, Array[Byte]],
                       responseCallback: SyncCallback) {
     if (!isActive.get) {
-      responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+      responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP)
+      responseCallback(Array.empty, Errors.NOT_COORDINATOR)
     } else {
       groupManager.getGroup(groupId) match {
         case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
@@ -302,11 +302,11 @@ class GroupCoordinator(val brokerId: Int,
 
   def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit) {
     if (!isActive.get) {
-      responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+      responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
-      responseCallback(Errors.GROUP_LOAD_IN_PROGRESS)
+      responseCallback(Errors.NOT_COORDINATOR)
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      responseCallback(Errors.COORDINATOR_LOAD_IN_PROGRESS)
     } else {
       groupManager.getGroup(groupId) match {
         case None =>
@@ -336,10 +336,10 @@ class GroupCoordinator(val brokerId: Int,
                       generationId: Int,
                       responseCallback: Errors => Unit) {
     if (!isActive.get) {
-      responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+      responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
+      responseCallback(Errors.NOT_COORDINATOR)
+    } else if (isCoordinatorLoadInProgress(groupId)) {
       // the group is still loading, so respond just blindly
       responseCallback(Errors.NONE)
     } else {
@@ -399,11 +399,11 @@ class GroupCoordinator(val brokerId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
     if (!isActive.get) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE))
+      responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE))
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP))
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS))
+      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR))
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_LOAD_IN_PROGRESS))
     } else {
       groupManager.getGroup(groupId) match {
         case None =>
@@ -457,12 +457,12 @@ class GroupCoordinator(val brokerId: Int,
   def handleFetchOffsets(groupId: String,
                          partitions: Option[Seq[TopicPartition]] = None): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
     if (!isActive.get)
-      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Map())
+      (Errors.COORDINATOR_NOT_AVAILABLE, Map())
     else if (!isCoordinatorForGroup(groupId)) {
       debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
-      (Errors.NOT_COORDINATOR_FOR_GROUP, Map())
-    } else if (isCoordinatorLoadingInProgress(groupId))
-      (Errors.GROUP_LOAD_IN_PROGRESS, Map())
+      (Errors.NOT_COORDINATOR, Map())
+    } else if (isCoordinatorLoadInProgress(groupId))
+      (Errors.COORDINATOR_LOAD_IN_PROGRESS, Map())
     else {
       // return offsets blindly regardless the current group state since the group may be using
       // Kafka commit storage without automatic group management
@@ -472,20 +472,20 @@ class GroupCoordinator(val brokerId: Int,
 
   def handleListGroups(): (Errors, List[GroupOverview]) = {
     if (!isActive.get) {
-      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
+      (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
     } else {
-      val error = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
-      (error, groupManager.currentGroups.map(_.overview).toList)
+      val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
+      (errorCode, groupManager.currentGroups.map(_.overview).toList)
     }
   }
 
   def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
     if (!isActive.get) {
-      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
+      (Errors.COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
     } else if (!isCoordinatorForGroup(groupId)) {
-      (Errors.NOT_COORDINATOR_FOR_GROUP, GroupCoordinator.EmptyGroup)
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
-      (Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
+      (Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup)
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      (Errors.COORDINATOR_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
     } else {
       groupManager.getGroup(groupId) match {
         case None => (Errors.NONE, GroupCoordinator.DeadGroup)
@@ -512,7 +512,7 @@ class GroupCoordinator(val brokerId: Int,
         case PreparingRebalance =>
           for (member <- group.allMemberMetadata) {
             if (member.awaitingJoinCallback != null) {
-              member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP))
+              member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR))
               member.awaitingJoinCallback = null
             }
           }
@@ -521,7 +521,7 @@ class GroupCoordinator(val brokerId: Int,
         case Stable | AwaitingSync =>
           for (member <- group.allMemberMetadata) {
             if (member.awaitingSyncCallback != null) {
-              member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP)
+              member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR)
               member.awaitingSyncCallback = null
             }
             heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
@@ -754,7 +754,7 @@ class GroupCoordinator(val brokerId: Int,
 
   private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId)
 
-  private def isCoordinatorLoadingInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
+  private def isCoordinatorLoadInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
 }
 
 object GroupCoordinator {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 2bc0c21..97dc7bc 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -184,10 +184,10 @@ class GroupMetadataManager(val brokerId: Int,
               case Errors.UNKNOWN_TOPIC_OR_PARTITION
                    | Errors.NOT_ENOUGH_REPLICAS
                    | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-                Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+                Errors.COORDINATOR_NOT_AVAILABLE
 
               case Errors.NOT_LEADER_FOR_PARTITION =>
-                Errors.NOT_COORDINATOR_FOR_GROUP
+                Errors.NOT_COORDINATOR
 
               case Errors.REQUEST_TIMED_OUT =>
                 Errors.REBALANCE_IN_PROGRESS
@@ -214,7 +214,7 @@ class GroupMetadataManager(val brokerId: Int,
         Some(DelayedStore(groupMetadataRecords, putCacheCallback))
 
       case None =>
-        responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
+        responseCallback(Errors.NOT_COORDINATOR)
         None
     }
   }
@@ -300,10 +300,10 @@ class GroupMetadataManager(val brokerId: Int,
                   case Errors.UNKNOWN_TOPIC_OR_PARTITION
                        | Errors.NOT_ENOUGH_REPLICAS
                        | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-                    Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+                    Errors.COORDINATOR_NOT_AVAILABLE
 
                   case Errors.NOT_LEADER_FOR_PARTITION =>
-                    Errors.NOT_COORDINATOR_FOR_GROUP
+                    Errors.NOT_COORDINATOR
 
                   case Errors.MESSAGE_TOO_LARGE
                        | Errors.RECORD_LIST_TOO_LARGE
@@ -335,7 +335,7 @@ class GroupMetadataManager(val brokerId: Int,
 
         case None =>
           val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
-            (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP)
+            (topicPartition, Errors.NOT_COORDINATOR)
           }
           responseCallback(commitStatus)
           None

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
index 41b4323..eced9fb 100644
--- a/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
@@ -59,7 +59,7 @@ class TransactionCoordinator(val brokerId: Int,
       responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
     } else {
       // check if it is the assigned coordinator for the transactional id
-      responseCallback(initPidError(Errors.NOT_COORDINATOR_FOR_GROUP))
+      responseCallback(initPidError(Errors.NOT_COORDINATOR))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c75e1b9..e6e2d4d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -35,7 +35,7 @@ import kafka.network.RequestChannel.{Request, Response, Session}
 import kafka.security.auth
 import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Write}
 import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
@@ -90,7 +90,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
         case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
         case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
-        case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
+        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
         case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
         case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
         case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
@@ -796,7 +796,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
         s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
         s"and not all brokers are up yet.")
-      new MetadataResponse.TopicMetadata(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Topic.GroupMetadataTopicName, true,
+      new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, Topic.GroupMetadataTopicName, true,
         java.util.Collections.emptyList())
     } else {
       createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
@@ -818,7 +818,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
         if (topic == Topic.GroupMetadataTopicName) {
           val topicMetadata = createGroupMetadataTopic()
-          if (topicMetadata.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE) {
+          if (topicMetadata.error == Errors.COORDINATOR_NOT_AVAILABLE) {
             new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, Topic.isInternal(topic),
               java.util.Collections.emptyList())
           } else topicMetadata
@@ -981,34 +981,44 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new Response(request, offsetFetchResponse))
   }
 
-  def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
-    val groupCoordinatorRequest = request.body[GroupCoordinatorRequest]
+  def handleFindCoordinatorRequest(request: RequestChannel.Request) {
+    val findCoordinatorRequest = request.body[FindCoordinatorRequest]
 
-    if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
-      val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
+    if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
+      !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) {
+
+      val responseBody = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
       requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     } else {
-      val partition = groupCoordinator.partitionFor(groupCoordinatorRequest.groupId)
+      // TODO: Authorize by transactional id if coordinator type is TRANSACTION
 
       // get metadata (and create the topic if necessary)
-      val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName)
+      val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match {
+        case FindCoordinatorRequest.CoordinatorType.GROUP =>
+          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
+          val metadata = getOrCreateGroupMetadataTopic(request.listenerName)
+          (partition, metadata)
+
+        case _ =>
+          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      }
 
-      val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) {
-        new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode)
+      val responseBody = if (topicMetadata.error != Errors.NONE) {
+        new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
       } else {
-        val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
+        val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
           .find(_.partition == partition)
           .map(_.leader())
 
         coordinatorEndpoint match {
           case Some(endpoint) if !endpoint.isEmpty =>
-            new GroupCoordinatorResponse(Errors.NONE, endpoint)
+            new FindCoordinatorResponse(Errors.NONE, endpoint)
           case _ =>
-            new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode)
+            new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
         }
       }
 
-      trace("Sending consumer metadata %s for correlation id %d to client %s."
+      trace("Sending FindCoordinator response %s for correlation id %d to client %s."
         .format(responseBody, request.header.correlationId, request.header.clientId))
       requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ec3eb88..1300629 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -92,7 +92,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.LIST_OFFSETS -> classOf[requests.ListOffsetResponse],
       ApiKeys.OFFSET_COMMIT -> classOf[requests.OffsetCommitResponse],
       ApiKeys.OFFSET_FETCH -> classOf[requests.OffsetFetchResponse],
-      ApiKeys.GROUP_COORDINATOR -> classOf[requests.GroupCoordinatorResponse],
+      ApiKeys.FIND_COORDINATOR -> classOf[FindCoordinatorResponse],
       ApiKeys.UPDATE_METADATA_KEY -> classOf[requests.UpdateMetadataResponse],
       ApiKeys.JOIN_GROUP -> classOf[JoinGroupResponse],
       ApiKeys.SYNC_GROUP -> classOf[SyncGroupResponse],
@@ -113,7 +113,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.error),
     ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
     ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
-    ApiKeys.GROUP_COORDINATOR -> ((resp: requests.GroupCoordinatorResponse) => resp.error),
+    ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error),
     ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.error),
     ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
     ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
@@ -134,7 +134,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.LIST_OFFSETS -> TopicDescribeAcl,
     ApiKeys.OFFSET_COMMIT -> (TopicReadAcl ++ GroupReadAcl),
     ApiKeys.OFFSET_FETCH -> (TopicReadAcl ++ GroupReadAcl),
-    ApiKeys.GROUP_COORDINATOR -> (TopicReadAcl ++ GroupReadAcl),
+    ApiKeys.FIND_COORDINATOR -> (TopicReadAcl ++ GroupReadAcl),
     ApiKeys.UPDATE_METADATA_KEY -> ClusterAcl,
     ApiKeys.JOIN_GROUP -> GroupReadAcl,
     ApiKeys.SYNC_GROUP -> GroupReadAcl,
@@ -212,8 +212,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     new requests.OffsetFetchRequest.Builder(group, List(tp).asJava).build()
   }
 
-  private def createGroupCoordinatorRequest = {
-    new requests.GroupCoordinatorRequest.Builder(group).build()
+  private def createFindCoordinatorRequest = {
+    new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build()
   }
 
   private def createUpdateMetadataRequest = {
@@ -281,7 +281,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.FETCH -> createFetchRequest,
       ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
       ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
-      ApiKeys.GROUP_COORDINATOR -> createGroupCoordinatorRequest,
+      ApiKeys.FIND_COORDINATOR -> createFindCoordinatorRequest,
       ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest,
       ApiKeys.JOIN_GROUP -> createJoinGroupRequest,
       ApiKeys.SYNC_GROUP -> createSyncGroupRequest,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 905d113..2bceeaa 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -26,7 +26,6 @@ import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Before
 import org.junit.Test
-
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
 import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
 import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
@@ -37,9 +36,8 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException
 import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.errors.{CoordinatorNotAvailableException, WakeupException}
 import org.apache.kafka.common.serialization.StringDeserializer
 
 
@@ -184,6 +182,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val (state, assignments) = consumerGroupCommand.describeGroup()
     assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List()))
     consumerGroupCommand.close()
+    executor.shutdown()
   }
 
   @Test
@@ -203,9 +202,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
         assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-      }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.")
+    }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.")
 
     consumerGroupCommand.close()
+    executor.shutdown()
   }
 
   @Test
@@ -218,9 +218,9 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-        val (state, _) = consumerGroupCommand.describeGroup()
-        state == Some("Stable")
-      }, "Expected the group to initially become stable.")
+      val (state, _) = consumerGroupCommand.describeGroup()
+      state == Some("Stable")
+    }, "Expected the group to initially become stable.")
 
     // stop the consumer so the group has no active member anymore
     executor.shutdown()
@@ -233,7 +233,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
         assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
         assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-      }, "Expected no active member in describe group results.")
+    }, "Expected no active member in describe group results.")
 
     consumerGroupCommand.close()
   }
@@ -254,9 +254,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
         assignments.get.count(_.group == group) == 2 &&
         assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 &&
         assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1
-      }, "Expected rows for consumers with no assigned partitions in describe group results.")
+    }, "Expected rows for consumers with no assigned partitions in describe group results.")
 
     consumerGroupCommand.close()
+    executor.shutdown()
   }
 
   @Test
@@ -272,15 +273,16 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-          val (state, assignments) = consumerGroupCommand.describeGroup()
-          state == Some("Stable") &&
-          assignments.isDefined &&
-          assignments.get.count(_.group == group) == 2 &&
-          assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
-          assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0
-      }, "Expected two rows (one row per consumer) in describe group results.")
+      val (state, assignments) = consumerGroupCommand.describeGroup()
+      state == Some("Stable") &&
+      assignments.isDefined &&
+      assignments.get.count(_.group == group) == 2 &&
+      assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
+      assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0
+    }, "Expected two rows (one row per consumer) in describe group results.")
 
     consumerGroupCommand.close()
+    executor.shutdown()
   }
 
   @Test
@@ -294,16 +296,17 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
     try {
-      val (state, assignments) = consumerGroupCommand.describeGroup()
+      consumerGroupCommand.describeGroup()
       fail("The consumer group command should fail due to low initialization timeout")
     } catch {
-      case e: TimeoutException =>
+      case _: TimeoutException =>
         // OK
       case e: Throwable =>
         fail("An unexpected exception occurred: " + e.getMessage)
         throw e
     } finally {
       consumerGroupCommand.close()
+      executor.shutdown()
     }
   }
 }
@@ -341,7 +344,7 @@ class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String,
   for (i <- 1 to numConsumers) {
     val consumer = new ConsumerThread(broker, i, groupId, topic)
     consumers ++= List(consumer)
-    executor.submit(consumer);
+    executor.submit(consumer)
   }
 
   Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -352,7 +355,7 @@ class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String,
 
   def shutdown() {
     consumers.foreach(_.shutdown)
-    executor.shutdown();
+    executor.shutdown()
     try {
       executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
     } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 5342dac..90b67ba 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -175,7 +175,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
   private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
-  private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE, 0)
+  private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, Errors.COORDINATOR_NOT_AVAILABLE, 0)
 
   @Test
   def testSerializationAndDeserialization() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 61199c2..ccbba5c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -114,7 +114,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val joinGroupResult = joinGroup(otherGroupId, memberId, protocolType, protocols)
     val joinGroupError = joinGroupResult.error
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, joinGroupError)
+    assertEquals(Errors.NOT_COORDINATOR, joinGroupError)
   }
 
   @Test
@@ -204,7 +204,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   def testHeartbeatWrongCoordinator() {
 
     val heartbeatResult = heartbeat(otherGroupId, memberId, -1)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, heartbeatResult)
+    assertEquals(Errors.NOT_COORDINATOR, heartbeatResult)
   }
 
   @Test
@@ -471,7 +471,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val generation = 1
 
     val syncGroupResult = syncGroupFollower(otherGroupId, generation, memberId)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, syncGroupResult._2)
+    assertEquals(Errors.NOT_COORDINATOR, syncGroupResult._2)
   }
 
   @Test
@@ -771,7 +771,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   def testFetchOffsetNotCoordinatorForGroup(): Unit = {
     val tp = new TopicPartition("topic", 0)
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(otherGroupId, Some(Seq(tp)))
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error)
+    assertEquals(Errors.NOT_COORDINATOR, error)
     assertTrue(partitionData.isEmpty)
   }
 
@@ -864,7 +864,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val leaveGroupResult = leaveGroup(otherGroupId, memberId)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, leaveGroupResult)
+    assertEquals(Errors.NOT_COORDINATOR, leaveGroupResult)
   }
 
   @Test
@@ -937,7 +937,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   def testDescribeGroupWrongCoordinator() {
     EasyMock.reset(replicaManager)
     val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error)
+    assertEquals(Errors.NOT_COORDINATOR, error)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 9d38485..b1f68bb 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -259,10 +259,10 @@ class GroupMetadataManagerTest {
   @Test
   def testStoreGroupErrorMapping() {
     assertStoreGroupErrorMapping(Errors.NONE, Errors.NONE)
-    assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR_FOR_GROUP)
+    assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
     assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN)
     assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN)
     assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN)
@@ -343,7 +343,7 @@ class GroupMetadataManagerTest {
     }
 
     groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback)
-    assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP), maybeError)
+    assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
     EasyMock.verify(replicaManager)
   }
 
@@ -414,16 +414,16 @@ class GroupMetadataManagerTest {
 
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
-    assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP), maybeError)
+    assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
     EasyMock.verify(replicaManager)
   }
 
   @Test
   def testCommitOffsetFailure() {
-    assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertCommitOffsetErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR_FOR_GROUP)
+    assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertCommitOffsetErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
     assertCommitOffsetErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
     assertCommitOffsetErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
     assertCommitOffsetErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE)


Mime
View raw message