kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [4/5] kafka git commit: MINOR: Use an explicit `Errors` object when possible instead of a numeric error code
Date Fri, 10 Feb 2017 05:21:52 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index e1c3634..865d6c6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -14,6 +14,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -31,21 +32,21 @@ public class UpdateMetadataResponse extends AbstractResponse {
      *
      * STALE_CONTROLLER_EPOCH (11)
      */
-    private final short errorCode;
+    private final Errors error;
 
-    public UpdateMetadataResponse(short errorCode) {
+    public UpdateMetadataResponse(Errors error) {
         super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        this.errorCode = errorCode;
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        this.error = error;
     }
 
     public UpdateMetadataResponse(Struct struct) {
         super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
     }
 
-    public short errorCode() {
-        return errorCode;
+    public Errors error() {
+        return error;
     }
 
     public static UpdateMetadataResponse parse(ByteBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 59eee83..2b445e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -324,7 +324,7 @@ public class SaslClientAuthenticator implements Authenticator {
     }
 
     private void handleSaslHandshakeResponse(SaslHandshakeResponse response) {
-        Errors error = Errors.forCode(response.errorCode());
+        Errors error = response.error();
         switch (error) {
             case NONE:
                 break;
@@ -336,7 +336,7 @@ public class SaslClientAuthenticator implements Authenticator {
                     mechanism, response.enabledMechanisms()));
             default:
                 throw new AuthenticationException(String.format("Unknown error code %d, client mechanism is %s, enabled mechanisms are %s",
-                    response.errorCode(), mechanism, response.enabledMechanisms()));
+                    response.error(), mechanism, response.enabledMechanisms()));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 07792d2..7f6b7aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -359,11 +359,11 @@ public class SaslServerAuthenticator implements Authenticator {
         String clientMechanism = handshakeRequest.mechanism();
         if (enabledMechanisms.contains(clientMechanism)) {
             LOG.debug("Using SASL mechanism '{}' provided by client", clientMechanism);
-            sendKafkaResponse(requestHeader, new SaslHandshakeResponse((short) 0, enabledMechanisms));
+            sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.NONE, enabledMechanisms));
             return clientMechanism;
         } else {
             LOG.debug("SASL mechanism '{}' requested by client is not supported", clientMechanism);
-            sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM.code(), enabledMechanisms));
+            sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms));
             throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 4aaa172..eb0b5c8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -459,7 +459,7 @@ public class KafkaConsumerTest {
 
         // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
         // we just lookup the starting position and send the record fetch.
-        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L), Errors.NONE.code()));
+        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L), Errors.NONE));
         client.prepareResponse(fetchResponse(tp0, 50L, 5));
 
         ConsumerRecords<String, String> records = consumer.poll(0);
@@ -493,7 +493,7 @@ public class KafkaConsumerTest {
         consumer.assign(singletonList(tp0));
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // fetch offset for one topic
@@ -991,7 +991,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1009,7 +1009,7 @@ public class KafkaConsumerTest {
 
         // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
         // we just lookup the starting position and send the record fetch.
-        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE.code()));
+        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE));
         client.prepareResponse(fetchResponse(tp0, 10L, 1));
 
         ConsumerRecords<String, String> records = consumer.poll(0);
@@ -1056,7 +1056,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1074,7 +1074,7 @@ public class KafkaConsumerTest {
 
         // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
         // we just lookup the starting position and send the record fetch.
-        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE.code()));
+        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE));
         client.prepareResponse(fetchResponse(tp0, 10L, 1));
 
         ConsumerRecords<String, String> records = consumer.poll(0);
@@ -1117,7 +1117,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
+        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1139,8 +1139,8 @@ public class KafkaConsumerTest {
         assertEquals(0, consumer.committed(tp1).offset());
 
         // fetch and verify consumer's position in the two partitions
-        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 3L), Errors.NONE.code()));
-        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp1, 3L), Errors.NONE.code()));
+        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 3L), Errors.NONE));
+        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp1, 3L), Errors.NONE));
         assertEquals(3L, consumer.position(tp0));
         assertEquals(3L, consumer.position(tp1));
 
@@ -1183,10 +1183,10 @@ public class KafkaConsumerTest {
 
     @Test
     public void testGracefulClose() throws Exception {
-        Map<TopicPartition, Short> response = new HashMap<>();
-        response.put(tp0, Errors.NONE.code());
+        Map<TopicPartition, Errors> response = new HashMap<>();
+        response.put(tp0, Errors.NONE);
         OffsetCommitResponse commitResponse = offsetCommitResponse(response);
-        LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(Errors.NONE.code());
+        LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(Errors.NONE);
         consumerCloseTest(5000, Arrays.asList(commitResponse, leaveGroupResponse), 0, false);
     }
 
@@ -1197,8 +1197,8 @@ public class KafkaConsumerTest {
 
     @Test
     public void testLeaveGroupTimeout() throws Exception {
-        Map<TopicPartition, Short> response = new HashMap<>();
-        response.put(tp0, Errors.NONE.code());
+        Map<TopicPartition, Errors> response = new HashMap<>();
+        response.put(tp0, Errors.NONE);
         OffsetCommitResponse commitResponse = offsetCommitResponse(response);
         consumerCloseTest(5000, Arrays.asList(commitResponse), 5000, false);
     }
@@ -1332,7 +1332,7 @@ public class KafkaConsumerTest {
     private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
-            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
+            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
             coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         }
 
@@ -1344,10 +1344,10 @@ public class KafkaConsumerTest {
                 PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(joinGroupRequest.groupProtocols().get(0).metadata());
                 return subscribedTopics.equals(new HashSet<>(subscription.topics()));
             }
-        }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
+        }, joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
 
         // sync group
-        client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE.code()), coordinator);
+        client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE), coordinator);
 
         return coordinator;
     }
@@ -1355,15 +1355,15 @@ public class KafkaConsumerTest {
     private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
-            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node), node);
+            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
             coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         }
 
         // join group
-        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
+        client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE), coordinator);
 
         // sync group
-        client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE.code()), coordinator);
+        client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE), coordinator);
 
         return coordinator;
     }
@@ -1376,15 +1376,15 @@ public class KafkaConsumerTest {
                 heartbeatReceived.set(true);
                 return true;
             }
-        }, new HeartbeatResponse(Errors.NONE.code()), coordinator);
+        }, new HeartbeatResponse(Errors.NONE), coordinator);
         return heartbeatReceived;
     }
 
     private AtomicBoolean prepareOffsetCommitResponse(MockClient client, Node coordinator, final Map<TopicPartition, Long> partitionOffsets) {
         final AtomicBoolean commitReceived = new AtomicBoolean(true);
-        Map<TopicPartition, Short> response = new HashMap<>();
+        Map<TopicPartition, Errors> response = new HashMap<>();
         for (TopicPartition partition : partitionOffsets.keySet())
-            response.put(partition, Errors.NONE.code());
+            response.put(partition, Errors.NONE);
 
         client.prepareResponseFrom(new MockClient.RequestMatcher() {
             @Override
@@ -1408,16 +1408,16 @@ public class KafkaConsumerTest {
         return prepareOffsetCommitResponse(client, coordinator, Collections.singletonMap(partition, offset));
     }
 
-    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Short> responseData) {
+    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> responseData) {
         return new OffsetCommitResponse(responseData);
     }
 
-    private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) {
+    private JoinGroupResponse joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, Errors error) {
         return new JoinGroupResponse(error, generationId, assignor.name(), memberId, leaderId,
                 Collections.<String, ByteBuffer>emptyMap());
     }
 
-    private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, short error) {
+    private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
         ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
         return new SyncGroupResponse(error, buf);
     }
@@ -1430,7 +1430,7 @@ public class KafkaConsumerTest {
         return new OffsetFetchResponse(Errors.NONE, partitionData);
     }
 
-    private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) {
+    private ListOffsetResponse listOffsetsResponse(Map<TopicPartition, Long> offsets, Errors error) {
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
         for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
             partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error,
@@ -1448,7 +1448,7 @@ public class KafkaConsumerTest {
             MemoryRecordsBuilder records = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, fetchOffset);
             for (int i = 0; i < fetchCount; i++)
                 records.append(0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
-            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.build()));
+            tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE, 0, records.build()));
         }
         return new FetchResponse(tpResponses, 0);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index bb617ae..afdecfc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -467,20 +467,20 @@ public class AbstractCoordinatorTest {
     }
 
     private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
-        return new GroupCoordinatorResponse(error.code(), node);
+        return new GroupCoordinatorResponse(error, node);
     }
 
     private HeartbeatResponse heartbeatResponse(Errors error) {
-        return new HeartbeatResponse(error.code());
+        return new HeartbeatResponse(error);
     }
 
     private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
-        return new JoinGroupResponse(error.code(), generationId, "dummy-subprotocol", memberId, leaderId,
+        return new JoinGroupResponse(error, generationId, "dummy-subprotocol", memberId, leaderId,
                 Collections.<String, ByteBuffer>emptyMap());
     }
 
     private SyncGroupResponse syncGroupResponse(Errors error) {
-        return new SyncGroupResponse(error.code(), ByteBuffer.allocate(0));
+        return new SyncGroupResponse(error, ByteBuffer.allocate(0));
     }
 
     public class DummyCoordinator extends AbstractCoordinator {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index e11bf30..33f496b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -139,7 +139,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testNormalHeartbeat() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // normal heartbeat
@@ -148,7 +148,7 @@ public class ConsumerCoordinatorTest {
         assertEquals(1, consumerClient.pendingRequestCount());
         assertFalse(future.isDone());
 
-        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        client.prepareResponse(heartbeatResponse(Errors.NONE));
         consumerClient.poll(0);
 
         assertTrue(future.isDone());
@@ -157,7 +157,7 @@ public class ConsumerCoordinatorTest {
 
     @Test(expected = GroupAuthorizationException.class)
     public void testGroupDescribeUnauthorized() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.GROUP_AUTHORIZATION_FAILED.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.GROUP_AUTHORIZATION_FAILED));
         coordinator.ensureCoordinatorReady();
     }
 
@@ -165,17 +165,17 @@ public class ConsumerCoordinatorTest {
     public void testGroupReadUnauthorized() {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
-                Errors.GROUP_AUTHORIZATION_FAILED.code()));
+                Errors.GROUP_AUTHORIZATION_FAILED));
         coordinator.poll(time.milliseconds());
     }
 
     @Test
     public void testCoordinatorNotAvailable() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
@@ -184,7 +184,7 @@ public class ConsumerCoordinatorTest {
         assertEquals(1, consumerClient.pendingRequestCount());
         assertFalse(future.isDone());
 
-        client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()));
+        client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE));
         time.sleep(sessionTimeoutMs);
         consumerClient.poll(0);
 
@@ -196,7 +196,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testNotCoordinator() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // not_coordinator will mark coordinator as unknown
@@ -205,7 +205,7 @@ public class ConsumerCoordinatorTest {
         assertEquals(1, consumerClient.pendingRequestCount());
         assertFalse(future.isDone());
 
-        client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP.code()));
+        client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP));
         time.sleep(sessionTimeoutMs);
         consumerClient.poll(0);
 
@@ -217,7 +217,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testIllegalGeneration() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // illegal_generation will cause re-partition
@@ -229,7 +229,7 @@ public class ConsumerCoordinatorTest {
         assertEquals(1, consumerClient.pendingRequestCount());
         assertFalse(future.isDone());
 
-        client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
+        client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION));
         time.sleep(sessionTimeoutMs);
         consumerClient.poll(0);
 
@@ -241,7 +241,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testUnknownConsumerId() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // illegal_generation will cause re-partition
@@ -253,7 +253,7 @@ public class ConsumerCoordinatorTest {
         assertEquals(1, consumerClient.pendingRequestCount());
         assertFalse(future.isDone());
 
-        client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID.code()));
+        client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
         time.sleep(sessionTimeoutMs);
         consumerClient.poll(0);
 
@@ -265,7 +265,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCoordinatorDisconnect() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // coordinator disconnect will mark coordinator as unknown
@@ -274,7 +274,7 @@ public class ConsumerCoordinatorTest {
         assertEquals(1, consumerClient.pendingRequestCount());
         assertFalse(future.isDone());
 
-        client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
+        client.prepareResponse(heartbeatResponse(Errors.NONE), true); // return disconnected
         time.sleep(sessionTimeoutMs);
         consumerClient.poll(0);
 
@@ -294,11 +294,11 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(singletonList(topic1));
         metadata.update(cluster, time.milliseconds());
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String, List<String>>emptyMap(),
-                Errors.INVALID_GROUP_ID.code()));
+                Errors.INVALID_GROUP_ID));
         coordinator.poll(time.milliseconds());
     }
 
@@ -312,14 +312,14 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(singletonList(topic1));
         metadata.update(cluster, time.milliseconds());
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
         partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(t1p)));
 
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -328,7 +328,7 @@ public class ConsumerCoordinatorTest {
                         sync.generationId() == 1 &&
                         sync.groupAssignment().containsKey(consumerId);
             }
-        }, syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.poll(time.milliseconds());
 
         assertFalse(coordinator.needRejoin());
@@ -351,14 +351,14 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(singletonList(topic1));
         metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
         partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(t1p, t2p)));
 
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -367,7 +367,7 @@ public class ConsumerCoordinatorTest {
                         sync.generationId() == 1 &&
                         sync.groupAssignment().containsKey(consumerId);
             }
-        }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE.code()));
+        }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(cluster);
 
@@ -393,7 +393,7 @@ public class ConsumerCoordinatorTest {
 
         assertEquals(singleton(topic1), subscriptions.subscription());
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         Map<String, List<String>> initialSubscription = singletonMap(consumerId, singletonList(topic1));
@@ -403,7 +403,7 @@ public class ConsumerCoordinatorTest {
         final List<String> updatedSubscription = Arrays.asList(topic1, topic2);
         final Set<String> updatedSubscriptionSet = new HashSet<>(updatedSubscription);
 
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE.code()));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -413,7 +413,7 @@ public class ConsumerCoordinatorTest {
                 metadata.update(TestUtils.clusterWith(1, updatedPartitions), time.milliseconds());
                 return true;
             }
-        }, syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
 
         List<TopicPartition> newAssignment = Arrays.asList(t1p, t2p);
         Set<TopicPartition> newAssignmentSet = new HashSet<>(newAssignment);
@@ -431,8 +431,8 @@ public class ConsumerCoordinatorTest {
                 protocolMetadata.metadata().rewind();
                 return subscription.topics().containsAll(updatedSubscriptionSet);
             }
-        }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE.code()));
+        }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE));
 
         coordinator.poll(time.milliseconds());
 
@@ -455,14 +455,14 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(singletonList(topic1));
         metadata.update(cluster, time.milliseconds());
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
         partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(t1p)));
 
         // prepare only the first half of the join and then trigger the wakeup
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         consumerClient.wakeup();
 
         try {
@@ -472,7 +472,7 @@ public class ConsumerCoordinatorTest {
         }
 
         // now complete the second half
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.poll(time.milliseconds());
 
         assertFalse(coordinator.needRejoin());
@@ -489,11 +489,11 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -502,7 +502,7 @@ public class ConsumerCoordinatorTest {
                         sync.generationId() == 1 &&
                         sync.groupAssignment().isEmpty();
             }
-        }, syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        }, syncGroupResponse(singletonList(t1p), Errors.NONE));
 
         coordinator.joinGroupIfNeeded();
 
@@ -526,11 +526,11 @@ public class ConsumerCoordinatorTest {
         metadata.setTopics(singletonList(topic1));
         metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // normal join group
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -539,7 +539,7 @@ public class ConsumerCoordinatorTest {
                         sync.generationId() == 1 &&
                         sync.groupAssignment().isEmpty();
             }
-        }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE.code()));
+        }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE));
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(cluster);
 
@@ -559,11 +559,11 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.joinGroupIfNeeded();
 
         final AtomicBoolean received = new AtomicBoolean(false);
@@ -575,7 +575,7 @@ public class ConsumerCoordinatorTest {
                 return leaveRequest.memberId().equals(consumerId) &&
                         leaveRequest.groupId().equals(groupId);
             }
-        }, new LeaveGroupResponse(Errors.NONE.code()));
+        }, new LeaveGroupResponse(Errors.NONE));
         coordinator.close(0);
         assertTrue(received.get());
     }
@@ -586,11 +586,11 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.joinGroupIfNeeded();
 
         final AtomicBoolean received = new AtomicBoolean(false);
@@ -602,7 +602,7 @@ public class ConsumerCoordinatorTest {
                 return leaveRequest.memberId().equals(consumerId) &&
                         leaveRequest.groupId().equals(groupId);
             }
-        }, new LeaveGroupResponse(Errors.NONE.code()));
+        }, new LeaveGroupResponse(Errors.NONE));
         coordinator.maybeLeaveGroup();
         assertTrue(received.get());
 
@@ -616,12 +616,12 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN));
         coordinator.joinGroupIfNeeded();
     }
 
@@ -631,12 +631,12 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator returns unknown member id
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_MEMBER_ID.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_MEMBER_ID));
 
         // now we should see a new join with the empty UNKNOWN_MEMBER_ID
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -645,8 +645,8 @@ public class ConsumerCoordinatorTest {
                 JoinGroupRequest joinRequest = (JoinGroupRequest) body;
                 return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
             }
-        }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
         coordinator.joinGroupIfNeeded();
 
@@ -660,16 +660,16 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.REBALANCE_IN_PROGRESS.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.REBALANCE_IN_PROGRESS));
 
         // then let the full join/sync finish successfully
-        client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
         coordinator.joinGroupIfNeeded();
 
@@ -683,12 +683,12 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // join initially, but let coordinator rebalance on sync
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.ILLEGAL_GENERATION.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.ILLEGAL_GENERATION));
 
         // then let the full join/sync finish successfully
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -697,8 +697,8 @@ public class ConsumerCoordinatorTest {
                 JoinGroupRequest joinRequest = (JoinGroupRequest) body;
                 return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID);
             }
-        }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
         coordinator.joinGroupIfNeeded();
 
@@ -716,15 +716,15 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
         partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(t1p)));
 
         // the leader is responsible for picking up metadata changes and forcing a group rebalance
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
         coordinator.poll(time.milliseconds());
 
@@ -753,14 +753,14 @@ public class ConsumerCoordinatorTest {
         // we only have metadata for one topic initially
         metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds());
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // prepare initial rebalance
         Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, topics);
         partitionAssignor.prepare(Collections.singletonMap(consumerId, Collections.singletonList(tp1)));
 
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -777,11 +777,11 @@ public class ConsumerCoordinatorTest {
                 }
                 return false;
             }
-        }, syncGroupResponse(Collections.singletonList(tp1), Errors.NONE.code()));
+        }, syncGroupResponse(Collections.singletonList(tp1), Errors.NONE));
 
         // the metadata update should trigger a second rebalance
-        client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE.code()));
+        client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE));
 
         coordinator.poll(time.milliseconds());
 
@@ -815,12 +815,12 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // join the group once
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.joinGroupIfNeeded();
 
         assertEquals(1, rebalanceListener.revokedCount);
@@ -830,8 +830,8 @@ public class ConsumerCoordinatorTest {
 
         // and join the group again
         subscriptions.subscribe(new HashSet<>(Arrays.asList(topic1, otherTopic)), rebalanceListener);
-        client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.joinGroupIfNeeded();
 
         assertEquals(2, rebalanceListener.revokedCount);
@@ -844,14 +844,14 @@ public class ConsumerCoordinatorTest {
     public void testDisconnectInJoin() {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // disconnected from original coordinator will cause re-discover and join again
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true);
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE), true);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.joinGroupIfNeeded();
 
         assertFalse(coordinator.needRejoin());
@@ -865,11 +865,11 @@ public class ConsumerCoordinatorTest {
     public void testInvalidSessionTimeout() {
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // coordinator doesn't like the session timeout
-        client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code()));
+        client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT));
         coordinator.joinGroupIfNeeded();
     }
 
@@ -877,10 +877,10 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetOnly() {
         subscriptions.assignFromUser(singleton(t1p));
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
@@ -899,16 +899,16 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.joinGroupIfNeeded();
 
         subscriptions.seek(t1p, 100);
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         time.sleep(autoCommitIntervalMs);
         coordinator.poll(time.milliseconds());
 
@@ -924,20 +924,20 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // haven't joined, so should not cause a commit
         time.sleep(autoCommitIntervalMs);
         consumerClient.poll(0);
 
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
         coordinator.joinGroupIfNeeded();
 
         subscriptions.seek(t1p, 100);
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         time.sleep(autoCommitIntervalMs);
         coordinator.poll(time.milliseconds());
 
@@ -952,10 +952,10 @@ public class ConsumerCoordinatorTest {
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.seek(t1p, 100);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         time.sleep(autoCommitIntervalMs);
         coordinator.poll(time.milliseconds());
 
@@ -978,12 +978,12 @@ public class ConsumerCoordinatorTest {
         assertNull(subscriptions.committed(t1p));
 
         // now find the coordinator
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // sleep only for the retry backoff
         time.sleep(retryBackoffMs);
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         coordinator.poll(time.milliseconds());
 
         assertEquals(100L, subscriptions.committed(t1p).offset());
@@ -993,10 +993,10 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetMetadata() {
         subscriptions.assignFromUser(singleton(t1p));
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "hello")), callback(success));
@@ -1010,9 +1010,9 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetAsyncWithDefaultCallback() {
         int invokedBeforeTest = mockOffsetCommitCallback.invoked;
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
@@ -1024,18 +1024,18 @@ public class ConsumerCoordinatorTest {
         // enable auto-assignment
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
         client.prepareMetadataUpdate(cluster);
 
         coordinator.joinGroupIfNeeded();
 
         // now switch to manual assignment
-        client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()));
+        client.prepareResponse(new LeaveGroupResponse(Errors.NONE));
         subscriptions.unsubscribe();
         coordinator.maybeLeaveGroup();
         subscriptions.assignFromUser(singleton(t1p));
@@ -1048,7 +1048,7 @@ public class ConsumerCoordinatorTest {
                 return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
                         commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
             }
-        }, offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        }, offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
 
         AtomicBoolean success = new AtomicBoolean(false);
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
@@ -1059,9 +1059,9 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCommitOffsetAsyncFailedWithDefaultCallback() {
         int invokedBeforeTest = mockOffsetCommitCallback.invoked;
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
@@ -1070,12 +1070,12 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetAsyncCoordinatorNotAvailable() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // async commit with coordinator not available
         MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
@@ -1086,12 +1086,12 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetAsyncNotCoordinator() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // async commit with not coordinator
         MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP)));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
@@ -1102,12 +1102,12 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetAsyncDisconnected() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // async commit with coordinator disconnected
         MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())), true);
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)), true);
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
@@ -1118,109 +1118,109 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetSyncNotCoordinator() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP)));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test
     public void testCommitOffsetSyncCoordinatorNotAvailable() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test
     public void testCommitOffsetSyncCoordinatorDisconnected() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())), true);
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)), true);
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test(expected = KafkaException.class)
     public void testCommitUnknownTopicOrPartition() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = OffsetMetadataTooLarge.class)
     public void testCommitOffsetMetadataTooLarge() {
         // since offset metadata is provided by the user, we have to propagate the exception so they can handle it
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.OFFSET_METADATA_TOO_LARGE.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.OFFSET_METADATA_TOO_LARGE)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = CommitFailedException.class)
     public void testCommitOffsetIllegalGeneration() {
         // we cannot retry if a rebalance occurs before the commit completed
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.ILLEGAL_GENERATION.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.ILLEGAL_GENERATION)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = CommitFailedException.class)
     public void testCommitOffsetUnknownMemberId() {
         // we cannot retry if a rebalance occurs before the commit completed
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = CommitFailedException.class)
     public void testCommitOffsetRebalanceInProgress() {
         // we cannot retry if a rebalance occurs before the commit completed
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.REBALANCE_IN_PROGRESS.code())));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.REBALANCE_IN_PROGRESS)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = KafkaException.class)
     public void testCommitOffsetSyncCallbackWithNonRetriableException() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         // sync commit with invalid partitions should throw if we have no callback
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN.code())), false);
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN)), false);
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testCommitSyncNegativeOffset() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(-1L)), Long.MAX_VALUE);
     }
 
     @Test
     public void testCommitAsyncNegativeOffset() {
         int invokedBeforeTest = mockOffsetCommitCallback.invoked;
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(-1L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
@@ -1229,7 +1229,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffset() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
@@ -1242,7 +1242,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffsetLoadInProgress() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
@@ -1256,7 +1256,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffsetsGroupNotAuthorized() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
@@ -1272,7 +1272,7 @@ public class ConsumerCoordinatorTest {
 
     @Test(expected = KafkaException.class)
     public void testRefreshOffsetUnknownTopicOrPartition() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
@@ -1283,13 +1283,13 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffsetNotCoordinatorForConsumer() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.needRefreshCommits();
         client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR_FOR_GROUP));
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
@@ -1298,7 +1298,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testRefreshOffsetWithNoFetchableOffsets() {
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
         subscriptions.assignFromUser(singleton(t1p));
@@ -1429,12 +1429,12 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
         ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
                 ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit);
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code()));
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
         if (useGroupManagement) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
-            client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
-            client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code()));
+            client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+            client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
             coordinator.joinGroupIfNeeded();
         } else
             subscriptions.assignFromUser(singleton(t1p));
@@ -1445,10 +1445,10 @@ public class ConsumerCoordinatorTest {
         return coordinator;
     }
 
-    private void makeCoordinatorUnknown(ConsumerCoordinator coordinator, Errors errorCode) {
+    private void makeCoordinatorUnknown(ConsumerCoordinator coordinator, Errors error) {
         time.sleep(sessionTimeoutMs);
         coordinator.sendHeartbeatRequest();
-        client.prepareResponse(heartbeatResponse(errorCode.code()));
+        client.prepareResponse(heartbeatResponse(error));
         time.sleep(sessionTimeoutMs);
         consumerClient.poll(0);
         assertTrue(coordinator.coordinatorUnknown());
@@ -1497,7 +1497,7 @@ public class ConsumerCoordinatorTest {
                 OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
                 return commitRequest.groupId().equals(groupId);
             }
-        }, new OffsetCommitResponse(new HashMap<TopicPartition, Short>()));
+        }, new OffsetCommitResponse(new HashMap<TopicPartition, Errors>()));
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
@@ -1505,7 +1505,7 @@ public class ConsumerCoordinatorTest {
                 LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
                 return leaveRequest.groupId().equals(groupId);
             }
-        }, new LeaveGroupResponse(Errors.NONE.code()));
+        }, new LeaveGroupResponse(Errors.NONE));
 
         coordinator.close();
         assertTrue("Commit not requested", commitRequested.get());
@@ -1536,18 +1536,18 @@ public class ConsumerCoordinatorTest {
                 excludeInternalTopics);
     }
 
-    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short error) {
+    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
         return new GroupCoordinatorResponse(error, node);
     }
 
-    private HeartbeatResponse heartbeatResponse(short error) {
+    private HeartbeatResponse heartbeatResponse(Errors error) {
         return new HeartbeatResponse(error);
     }
 
     private JoinGroupResponse joinGroupLeaderResponse(int generationId,
                                                       String memberId,
                                                       Map<String, List<String>> subscriptions,
-                                                      short error) {
+                                                      Errors error) {
         Map<String, ByteBuffer> metadata = new HashMap<>();
         for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
             PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
@@ -1557,17 +1557,17 @@ public class ConsumerCoordinatorTest {
         return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata);
     }
 
-    private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
+    private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
         return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, leaderId,
                 Collections.<String, ByteBuffer>emptyMap());
     }
 
-    private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, short error) {
+    private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
         ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
         return new SyncGroupResponse(error, buf);
     }
 
-    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Short> responseData) {
+    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> responseData) {
         return new OffsetCommitResponse(responseData);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 3694714..83bb145 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -47,7 +47,7 @@ public class ConsumerNetworkClientTest {
 
     @Test
     public void send() {
-        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        client.prepareResponse(heartbeatResponse(Errors.NONE));
         RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
         assertEquals(1, consumerClient.pendingRequestCount());
         assertEquals(1, consumerClient.pendingRequestCount(node));
@@ -59,13 +59,13 @@ public class ConsumerNetworkClientTest {
 
         ClientResponse clientResponse = future.value();
         HeartbeatResponse response = (HeartbeatResponse) clientResponse.responseBody();
-        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertEquals(Errors.NONE, response.error());
     }
 
     @Test
     public void multiSend() {
-        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
-        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        client.prepareResponse(heartbeatResponse(Errors.NONE));
+        client.prepareResponse(heartbeatResponse(Errors.NONE));
         RequestFuture<ClientResponse> future1 = consumerClient.send(node, heartbeat());
         RequestFuture<ClientResponse> future2 = consumerClient.send(node, heartbeat());
         assertEquals(2, consumerClient.pendingRequestCount());
@@ -150,7 +150,7 @@ public class ConsumerNetworkClientTest {
         } catch (WakeupException e) {
         }
 
-        client.respond(heartbeatResponse(Errors.NONE.code()));
+        client.respond(heartbeatResponse(Errors.NONE));
         consumerClient.poll(future);
         assertTrue(future.isDone());
     }
@@ -201,11 +201,11 @@ public class ConsumerNetworkClientTest {
 
         // Enable send, the un-expired send should succeed on poll
         isReady.set(true);
-        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        client.prepareResponse(heartbeatResponse(Errors.NONE));
         consumerClient.poll(future2);
         ClientResponse clientResponse = future2.value();
         HeartbeatResponse response = (HeartbeatResponse) clientResponse.responseBody();
-        assertEquals(Errors.NONE.code(), response.errorCode());
+        assertEquals(Errors.NONE, response.error());
 
         // Disable ready flag to delay send and queue another send. Disconnection should remove pending send
         isReady.set(false);
@@ -224,7 +224,7 @@ public class ConsumerNetworkClientTest {
         return new HeartbeatRequest.Builder("group", 1, "memberId");
     }
 
-    private HeartbeatResponse heartbeatResponse(short error) {
+    private HeartbeatResponse heartbeatResponse(Errors error) {
         return new HeartbeatResponse(error);
     }
 


Mime
View raw message