kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/5] kafka git commit: MINOR: Use an explicit `Errors` object when possible instead of a numeric error code
Date Fri, 10 Feb 2017 05:21:50 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 36b0c86..891896a 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -46,7 +46,7 @@ class GroupCoordinator(val brokerId: Int,
                        val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
                        time: Time) extends Logging {
   type JoinCallback = JoinGroupResult => Unit
-  type SyncCallback = (Array[Byte], Short) => Unit
+  type SyncCallback = (Array[Byte], Errors) => Unit
 
   this.logIdent = "[GroupCoordinator " + brokerId + "]: "
 
@@ -99,16 +99,16 @@ class GroupCoordinator(val brokerId: Int,
                       protocols: List[(String, Array[Byte])],
                       responseCallback: JoinCallback) {
     if (!isActive.get) {
-      responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
+      responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE))
     } else if (!validGroupId(groupId)) {
-      responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID.code))
+      responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID))
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
+      responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP))
     } else if (isCoordinatorLoadingInProgress(groupId)) {
-      responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS.code))
+      responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS))
     } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
                sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
-      responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code))
+      responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
     } else {
       // only try to create the group if the group is not unknown AND
       // the member id is UNKNOWN, if member is specified but group does not
@@ -116,7 +116,7 @@ class GroupCoordinator(val brokerId: Int,
       groupManager.getGroup(groupId) match {
         case None =>
           if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
           } else {
             val group = groupManager.addGroup(new GroupMetadata(groupId))
             doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
@@ -140,11 +140,11 @@ class GroupCoordinator(val brokerId: Int,
     group synchronized {
       if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
         // if the new member does not support the group protocol, reject it
-        responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
+        responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
       } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
         // if the member trying to register with a un-recognized id, send the response to let
         // it reset its member id and retry
-        responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+        responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
       } else {
         group.currentState match {
           case Dead =>
@@ -152,7 +152,7 @@ class GroupCoordinator(val brokerId: Int,
             // from the coordinator metadata; this is likely that the group has migrated to some other
             // coordinator OR the group is in a transient unstable phase. Let the member retry
             // joining without the specified member id,
-            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
+            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
 
           case PreparingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
@@ -181,7 +181,7 @@ class GroupCoordinator(val brokerId: Int,
                   generationId = group.generationId,
                   subProtocol = group.protocol,
                   leaderId = group.leaderId,
-                  errorCode = Errors.NONE.code))
+                  error = Errors.NONE))
               } else {
                 // member has changed metadata, so force a rebalance
                 updateMemberAndRebalance(group, member, protocols, responseCallback)
@@ -208,7 +208,7 @@ class GroupCoordinator(val brokerId: Int,
                   generationId = group.generationId,
                   subProtocol = group.protocol,
                   leaderId = group.leaderId,
-                  errorCode = Errors.NONE.code))
+                  error = Errors.NONE))
               }
             }
         }
@@ -225,12 +225,12 @@ class GroupCoordinator(val brokerId: Int,
                       groupAssignment: Map[String, Array[Byte]],
                       responseCallback: SyncCallback) {
     if (!isActive.get) {
-      responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
+      responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)
+      responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP)
     } else {
       groupManager.getGroup(groupId) match {
-        case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+        case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
         case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
       }
     }
@@ -245,16 +245,16 @@ class GroupCoordinator(val brokerId: Int,
 
     group synchronized {
       if (!group.has(memberId)) {
-        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
       } else if (generationId != group.generationId) {
-        responseCallback(Array.empty, Errors.ILLEGAL_GENERATION.code)
+        responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
       } else {
         group.currentState match {
           case Empty | Dead =>
-            responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
+            responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
 
           case PreparingRebalance =>
-            responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)
+            responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
 
           case AwaitingSync =>
             group.get(memberId).awaitingSyncCallback = responseCallback
@@ -288,7 +288,7 @@ class GroupCoordinator(val brokerId: Int,
           case Stable =>
             // if the group is stable, we just return the current assignment
             val memberMetadata = group.get(memberId)
-            responseCallback(memberMetadata.assignment, Errors.NONE.code)
+            responseCallback(memberMetadata.assignment, Errors.NONE)
             completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
         }
       }
@@ -300,13 +300,13 @@ class GroupCoordinator(val brokerId: Int,
     delayedGroupStore.foreach(groupManager.store)
   }
 
-  def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Short => Unit) {
+  def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit) {
     if (!isActive.get) {
-      responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
+      responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
+      responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
     } else if (isCoordinatorLoadingInProgress(groupId)) {
-      responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code)
+      responseCallback(Errors.GROUP_LOAD_IN_PROGRESS)
     } else {
       groupManager.getGroup(groupId) match {
         case None =>
@@ -314,17 +314,17 @@ class GroupCoordinator(val brokerId: Int,
           // from the coordinator metadata; this is likely that the group has migrated to some other
           // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
           // joining without specified consumer id,
-          responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+          responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
         case Some(group) =>
           group synchronized {
             if (group.is(Dead) || !group.has(memberId)) {
-              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+              responseCallback(Errors.UNKNOWN_MEMBER_ID)
             } else {
               val member = group.get(memberId)
               removeHeartbeatForLeavingMember(group, member)
               onMemberFailure(group, member)
-              responseCallback(Errors.NONE.code)
+              responseCallback(Errors.NONE)
             }
           }
       }
@@ -334,18 +334,18 @@ class GroupCoordinator(val brokerId: Int,
   def handleHeartbeat(groupId: String,
                       memberId: String,
                       generationId: Int,
-                      responseCallback: Short => Unit) {
+                      responseCallback: Errors => Unit) {
     if (!isActive.get) {
-      responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
+      responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
+      responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
     } else if (isCoordinatorLoadingInProgress(groupId)) {
       // the group is still loading, so respond just blindly
-      responseCallback(Errors.NONE.code)
+      responseCallback(Errors.NONE)
     } else {
       groupManager.getGroup(groupId) match {
         case None =>
-          responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+          responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
         case Some(group) =>
           group synchronized {
@@ -355,37 +355,37 @@ class GroupCoordinator(val brokerId: Int,
                 // from the coordinator metadata; this is likely that the group has migrated to some other
                 // coordinator OR the group is in a transient unstable phase. Let the member retry
                 // joining without the specified member id,
-                responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
               case Empty =>
-                responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
               case AwaitingSync =>
                 if (!group.has(memberId))
-                  responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                 else
-                  responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
+                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
 
               case PreparingRebalance =>
                 if (!group.has(memberId)) {
-                  responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                 } else if (generationId != group.generationId) {
-                  responseCallback(Errors.ILLEGAL_GENERATION.code)
+                  responseCallback(Errors.ILLEGAL_GENERATION)
                 } else {
                   val member = group.get(memberId)
                   completeAndScheduleNextHeartbeatExpiration(group, member)
-                  responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
+                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
                 }
 
               case Stable =>
                 if (!group.has(memberId)) {
-                  responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
+                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                 } else if (generationId != group.generationId) {
-                  responseCallback(Errors.ILLEGAL_GENERATION.code)
+                  responseCallback(Errors.ILLEGAL_GENERATION)
                 } else {
                   val member = group.get(memberId)
                   completeAndScheduleNextHeartbeatExpiration(group, member)
-                  responseCallback(Errors.NONE.code)
+                  responseCallback(Errors.NONE)
                 }
             }
           }
@@ -397,13 +397,13 @@ class GroupCoordinator(val brokerId: Int,
                           memberId: String,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
+                          responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
     if (!isActive.get) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
+      responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE))
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code))
+      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP))
     } else if (isCoordinatorLoadingInProgress(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code))
+      responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS))
     } else {
       groupManager.getGroup(groupId) match {
         case None =>
@@ -413,7 +413,7 @@ class GroupCoordinator(val brokerId: Int,
             doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
           } else {
             // or this is a request coming from an older generation. either way, reject the commit
-            responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+            responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
           }
 
         case Some(group) =>
@@ -426,22 +426,22 @@ class GroupCoordinator(val brokerId: Int,
                       memberId: String,
                       generationId: Int,
                       offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                      responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
+                      responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
     var delayedOffsetStore: Option[DelayedStore] = None
 
     group synchronized {
       if (group.is(Dead)) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
+        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
       } else if (generationId < 0 && group.is(Empty)) {
         // the group is only using Kafka to store offsets
         delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
           offsetMetadata, responseCallback)
       } else if (group.is(AwaitingSync)) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
+        responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
       } else if (!group.has(memberId)) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
+        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
       } else if (generationId != group.generationId) {
-        responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+        responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
       } else {
         val member = group.get(memberId)
         completeAndScheduleNextHeartbeatExpiration(group, member)
@@ -512,7 +512,7 @@ class GroupCoordinator(val brokerId: Int,
         case PreparingRebalance =>
           for (member <- group.allMemberMetadata) {
             if (member.awaitingJoinCallback != null) {
-              member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
+              member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP))
               member.awaitingJoinCallback = null
             }
           }
@@ -521,7 +521,7 @@ class GroupCoordinator(val brokerId: Int,
         case Stable | AwaitingSync =>
           for (member <- group.allMemberMetadata) {
             if (member.awaitingSyncCallback != null) {
-              member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP.code)
+              member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP)
               member.awaitingSyncCallback = null
             }
             heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
@@ -561,7 +561,7 @@ class GroupCoordinator(val brokerId: Int,
   private def propagateAssignment(group: GroupMetadata, error: Errors) {
     for (member <- group.allMemberMetadata) {
       if (member.awaitingSyncCallback != null) {
-        member.awaitingSyncCallback(member.assignment, error.code)
+        member.awaitingSyncCallback(member.assignment, error)
         member.awaitingSyncCallback = null
 
         // reset the session timeout for members after propagating the member's assignment.
@@ -577,14 +577,14 @@ class GroupCoordinator(val brokerId: Int,
     groupId != null && !groupId.isEmpty
   }
 
-  private def joinError(memberId: String, errorCode: Short): JoinGroupResult = {
+  private def joinError(memberId: String, error: Errors): JoinGroupResult = {
     JoinGroupResult(
-      members=Map.empty,
-      memberId=memberId,
-      generationId=0,
-      subProtocol=GroupCoordinator.NoProtocol,
-      leaderId=GroupCoordinator.NoLeader,
-      errorCode=errorCode)
+      members = Map.empty,
+      memberId = memberId,
+      generationId = 0,
+      subProtocol = GroupCoordinator.NoProtocol,
+      leaderId = GroupCoordinator.NoLeader,
+      error = error)
   }
 
   /**
@@ -707,12 +707,12 @@ class GroupCoordinator(val brokerId: Int,
           for (member <- group.allMemberMetadata) {
             assert(member.awaitingJoinCallback != null)
             val joinResult = JoinGroupResult(
-              members=if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
-              memberId=member.memberId,
-              generationId=group.generationId,
-              subProtocol=group.protocol,
-              leaderId=group.leaderId,
-              errorCode=Errors.NONE.code)
+              members = if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
+              memberId = member.memberId,
+              generationId = group.generationId,
+              subProtocol = group.protocol,
+              leaderId = group.leaderId,
+              error = Errors.NONE)
 
             member.awaitingJoinCallback(joinResult)
             member.awaitingJoinCallback = null
@@ -814,4 +814,4 @@ case class JoinGroupResult(members: Map[String, Array[Byte]],
                            generationId: Int,
                            subProtocol: String,
                            leaderId: String,
-                           errorCode: Short)
+                           error: Errors)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index c66ce74..a6ed6a9 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -224,7 +224,7 @@ class GroupMetadataManager(val brokerId: Int,
                           consumerId: String,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicPartition, Short] => Unit): Option[DelayedStore] = {
+                          responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Option[DelayedStore] = {
     // first filter out partitions with offset metadata size exceeding limit
     val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
@@ -254,7 +254,7 @@ class GroupMetadataManager(val brokerId: Int,
           // the offset and metadata to cache if the append status has no error
           val status = responseStatus(offsetTopicPartition)
 
-          val responseCode =
+          val response =
             group synchronized {
               if (status.error == Errors.NONE) {
                 if (!group.is(Dead)) {
@@ -262,7 +262,7 @@ class GroupMetadataManager(val brokerId: Int,
                     group.completePendingOffsetWrite(topicPartition, offsetAndMetadata)
                   }
                 }
-                Errors.NONE.code
+                Errors.NONE
               } else {
                 if (!group.is(Dead)) {
                   filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
@@ -291,16 +291,16 @@ class GroupMetadataManager(val brokerId: Int,
                   case other => other
                 }
 
-                responseError.code
+                responseError
               }
             }
 
           // compute the final error codes for the commit response
           val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
             if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
-              (topicPartition, responseCode)
+              (topicPartition, response)
             else
-              (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
+              (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
           }
 
           // finally trigger the callback logic passed from the API layer
@@ -315,7 +315,7 @@ class GroupMetadataManager(val brokerId: Int,
 
       case None =>
         val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
-          (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code)
+          (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP)
         }
         responseCallback(commitStatus)
         None

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
index 6149276..729e483 100644
--- a/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/MemberMetadata.scala
@@ -23,6 +23,8 @@ import kafka.utils.nonthreadsafe
 
 import scala.collection.Map
 
+import org.apache.kafka.common.protocol.Errors
+
 
 case class MemberSummary(memberId: String,
                          clientId: String,
@@ -62,7 +64,7 @@ private[coordinator] class MemberMetadata(val memberId: String,
 
   var assignment: Array[Byte] = Array.empty[Byte]
   var awaitingJoinCallback: JoinGroupResult => Unit = null
-  var awaitingSyncCallback: (Array[Byte], Short) => Unit = null
+  var awaitingSyncCallback: (Array[Byte], Errors) => Unit = null
   var latestHeartbeat: Long = -1
   var isLeaving: Boolean = false
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/javaapi/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchResponse.scala b/core/src/main/scala/kafka/javaapi/FetchResponse.scala
index 37db3f7..9c67dd8 100644
--- a/core/src/main/scala/kafka/javaapi/FetchResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchResponse.scala
@@ -28,7 +28,9 @@ class FetchResponse(private val underlying: kafka.api.FetchResponse) {
 
   def hasError = underlying.hasError
 
-  def errorCode(topic: String, partition: Int) = underlying.errorCode(topic, partition)
+  def error(topic: String, partition: Int) = underlying.error(topic, partition)
+
+  def errorCode(topic: String, partition: Int) = error(topic, partition).code
 
   override def equals(other: Any) = canEqual(other) && {
     val otherFetchResponse = other.asInstanceOf[kafka.javaapi.FetchResponse]

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
index 0e14758..9871ca0 100644
--- a/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/GroupCoordinatorResponse.scala
@@ -22,7 +22,9 @@ import kafka.cluster.BrokerEndPoint
 
 class GroupCoordinatorResponse(private val underlying: kafka.api.GroupCoordinatorResponse) {
 
-  def errorCode = underlying.errorCode
+  def error = underlying.error
+
+  def errorCode = error.code
 
   def coordinator: BrokerEndPoint = {
     import kafka.javaapi.Implicits._

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
index c79f5b6..c348eba 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -20,16 +20,18 @@ package kafka.javaapi
 import java.nio.ByteBuffer
 
 import kafka.common.TopicAndPartition
+import org.apache.kafka.common.protocol.Errors
 import scala.collection.JavaConverters._
 
 class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitResponse) {
 
-  def errors: java.util.Map[TopicAndPartition, Short] = underlying.commitStatus.asJava
+  def errors: java.util.Map[TopicAndPartition, Errors] = underlying.commitStatus.asJava
 
   def hasError = underlying.hasError
 
-  def errorCode(topicAndPartition: TopicAndPartition) = underlying.commitStatus(topicAndPartition)
+  def error(topicAndPartition: TopicAndPartition) = underlying.commitStatus(topicAndPartition)
 
+  def errorCode(topicAndPartition: TopicAndPartition) = error(topicAndPartition).code
 }
 
 object OffsetCommitResponse {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
index 8b1847e..42ee2ab 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
@@ -23,10 +23,10 @@ class OffsetResponse(private val underlying: kafka.api.OffsetResponse) {
 
   def hasError = underlying.hasError
 
-
-  def errorCode(topic: String, partition: Int) =
+  def error(topic: String, partition: Int) =
     underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).error
 
+  def errorCode(topic: String, partition: Int) = error(topic, partition).code
 
   def offsets(topic: String, partition: Int) =
     underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).offsets.toArray

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index 4e2631f..c9ec48a 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -17,6 +17,7 @@
 package kafka.javaapi
 
 import kafka.cluster.BrokerEndPoint
+import org.apache.kafka.common.protocol.Errors
 import scala.collection.JavaConverters._
 
 private[javaapi] object MetadataListImplicits {
@@ -35,7 +36,9 @@ class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
     underlying.partitionsMetadata
   }
 
-  def errorCode: Short = underlying.errorCode
+  def error = underlying.error
+
+  def errorCode = error.code
 
   def sizeInBytes: Int = underlying.sizeInBytes
 
@@ -55,7 +58,9 @@ class PartitionMetadata(private val underlying: kafka.api.PartitionMetadata) {
 
   def isr: java.util.List[BrokerEndPoint] = underlying.isr.asJava
 
-  def errorCode: Short = underlying.errorCode
+  def error = underlying.error
+
+  def errorCode = error.code
 
   def sizeInBytes: Int = underlying.sizeInBytes
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 97289a1..e77a50c 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -56,8 +56,8 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
       }
     val partitionMetadata = metadata.partitionsMetadata
     if(partitionMetadata.isEmpty) {
-      if(metadata.errorCode != Errors.NONE.code) {
-        throw new KafkaException(Errors.forCode(metadata.errorCode).exception)
+      if(metadata.error != Errors.NONE) {
+        throw new KafkaException(metadata.error.exception)
       } else {
         throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))
       }
@@ -85,14 +85,14 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
     // throw partition specific exception
     topicsMetadata.foreach(tmd =>{
       trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
-      if(tmd.errorCode == Errors.NONE.code) {
+      if(tmd.error == Errors.NONE) {
         topicPartitionInfo.put(tmd.topic, tmd)
       } else
-        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, Errors.forCode(tmd.errorCode).exception.getClass))
+        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, tmd.error.exception.getClass))
       tmd.partitionsMetadata.foreach(pmd =>{
-        if (pmd.errorCode != Errors.NONE.code && pmd.errorCode == Errors.LEADER_NOT_AVAILABLE.code) {
+        if (pmd.error != Errors.NONE && pmd.error == Errors.LEADER_NOT_AVAILABLE) {
           warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
-            Errors.forCode(pmd.errorCode).exception.getClass))
+            pmd.error.exception.getClass))
         } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
       })
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 380b1c8..77c3b7d 100755
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -281,11 +281,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           if (response.status.size != producerRequest.data.size)
             throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
           if (logger.isTraceEnabled) {
-            val successfullySentData = response.status.filter(_._2.error == Errors.NONE.code)
+            val successfullySentData = response.status.filter(_._2.error == Errors.NONE)
             successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
               trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString()))))
           }
-          val failedPartitionsAndStatus = response.status.filter(_._2.error != Errors.NONE.code).toSeq
+          val failedPartitionsAndStatus = response.status.filter(_._2.error != Errors.NONE).toSeq
           failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
           if(failedTopicPartitions.nonEmpty) {
             val errorString = failedPartitionsAndStatus
@@ -293,7 +293,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
                                     (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
               .map{
                 case(topicAndPartition, status) =>
-                  topicAndPartition.toString + ": " + Errors.forCode(status.error).exceptionName
+                  topicAndPartition.toString + ": " + status.error.exceptionName
               }.mkString(",")
             warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index 4e264ca..9630c82 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -24,21 +24,21 @@ import org.apache.kafka.common.protocol.Errors
  */
 
 
-sealed trait ResourceType extends BaseEnum { def errorCode: Short }
+sealed trait ResourceType extends BaseEnum { def error: Errors }
 
 case object Cluster extends ResourceType {
   val name = "Cluster"
-  val errorCode = Errors.CLUSTER_AUTHORIZATION_FAILED.code
+  val error = Errors.CLUSTER_AUTHORIZATION_FAILED
 }
 
 case object Topic extends ResourceType {
   val name = "Topic"
-  val errorCode = Errors.TOPIC_AUTHORIZATION_FAILED.code
+  val error = Errors.TOPIC_AUTHORIZATION_FAILED
 }
 
 case object Group extends ResourceType {
   val name = "Group"
-  val errorCode = Errors.GROUP_AUTHORIZATION_FAILED.code
+  val error = Errors.GROUP_AUTHORIZATION_FAILED
 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index febe9da..6462968 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -141,7 +141,7 @@ abstract class AbstractFetcherThread(name: String,
           Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState =>
             // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
             if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) {
-              Errors.forCode(partitionData.errorCode) match {
+              partitionData.error match {
                 case Errors.NONE =>
                   try {
                     val records = partitionData.toRecords
@@ -259,7 +259,7 @@ object AbstractFetcherThread {
   }
 
   trait PartitionData {
-    def errorCode: Short
+    def error: Errors
     def exception: Option[Throwable]
     def toRecords: MemoryRecords
     def highWatermark: Long

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fad75ec..1308216 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -148,10 +148,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       val leaderAndIsrResponse =
         if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
           val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
-          new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
+          new LeaderAndIsrResponse(result.error, result.responseMap.asJava)
         } else {
-          val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
-          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
+          val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
         }
 
       requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
@@ -178,15 +178,15 @@ class KafkaApis(val requestChannel: RequestChannel,
         // Consider old replicas : {[1,2,3], Leader = 1} is reassigned to new replicas : {[2,3,4], Leader = 2}, broker 1 does not receive a LeaderAndIsr
         // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader,
         // is not cleared.
-        result.foreach { case (topicPartition, errorCode) =>
-          if (errorCode == Errors.NONE.code && stopReplicaRequest.deletePartitions() && topicPartition.topic == Topic.GroupMetadataTopicName) {
+        result.foreach { case (topicPartition, error) =>
+          if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == Topic.GroupMetadataTopicName) {
             coordinator.handleGroupEmigration(topicPartition.partition)
           }
         }
-        new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava)
+        new StopReplicaResponse(error, result.asJava)
       } else {
-        val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
-        new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
+        val result = stopReplicaRequest.partitions.asScala.map((_, Errors.CLUSTER_AUTHORIZATION_FAILED)).toMap
+        new StopReplicaResponse(Errors.CLUSTER_AUTHORIZATION_FAILED, result.asJava)
       }
 
     requestChannel.sendResponse(new RequestChannel.Response(request, response))
@@ -208,9 +208,9 @@ class KafkaApis(val requestChannel: RequestChannel,
             adminManager.tryCompleteDelayedTopicOperations(topic)
           }
         }
-        new UpdateMetadataResponse(Errors.NONE.code)
+        new UpdateMetadataResponse(Errors.NONE)
       } else {
-        new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code)
+        new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED)
       }
 
     requestChannel.sendResponse(new Response(request, updateMetadataResponse))
@@ -226,7 +226,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
     val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
-      Errors.NONE.code, partitionsRemaining)
+      Errors.NONE, partitionsRemaining)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
   }
 
@@ -239,9 +239,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // reject the request if not authorized to the group
     if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
-      val errorCode = new JShort(Errors.GROUP_AUTHORIZATION_FAILED.code)
+      val error = Errors.GROUP_AUTHORIZATION_FAILED
       val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
-        (topicPartition, errorCode)
+        (topicPartition, error)
       }.toMap
       val response = new OffsetCommitResponse(results.asJava)
       requestChannel.sendResponse(new RequestChannel.Response(request, response))
@@ -262,16 +262,16 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       // the callback for sending an offset commit response
-      def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) {
-        val combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++
-          unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++
-          nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
+      def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]) {
+        val combinedCommitStatus = commitStatus ++
+          unauthorizedForReadTopics.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED) ++
+          nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION)
 
         if (isDebugEnabled)
-          combinedCommitStatus.foreach { case (topicPartition, errorCode) =>
-            if (errorCode != Errors.NONE.code) {
+          combinedCommitStatus.foreach { case (topicPartition, error) =>
+            if (error != Errors.NONE) {
               debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
-                s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
+                s"on partition $topicPartition failed due to ${error.exceptionName}")
             }
           }
         val response = new OffsetCommitResponse(combinedCommitStatus.asJava)
@@ -287,13 +287,13 @@ class KafkaApis(val requestChannel: RequestChannel,
             val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
             try {
               if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
-                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
+                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
               else {
                 zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString)
-                (topicPartition, Errors.NONE.code)
+                (topicPartition, Errors.NONE)
               }
             } catch {
-              case e: Throwable => (topicPartition, Errors.forException(e).code)
+              case e: Throwable => (topicPartition, Errors.forException(e))
             }
         }
         sendResponseCallback(responseInfo)
@@ -458,11 +458,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map {
-      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, MemoryRecords.EMPTY))
+      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, -1, MemoryRecords.EMPTY))
     }
 
     val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map {
-      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MemoryRecords.EMPTY))
+      case (tp, _) => (tp, new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -1, MemoryRecords.EMPTY))
     }
 
     // the callback for sending a fetch response
@@ -485,7 +485,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             FetchPartitionData(data.error, data.hw, downConvertedRecords)
           } else data
 
-          tp -> new FetchResponse.PartitionData(convertedData.error.code, convertedData.hw, convertedData.records)
+          tp -> new FetchResponse.PartitionData(convertedData.error, convertedData.hw, convertedData.records)
         }
       }
 
@@ -494,9 +494,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       val fetchedPartitionData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData]()
 
       mergedPartitionData.foreach { case (topicPartition, data) =>
-        if (data.errorCode != Errors.NONE.code)
+        if (data.error != Errors.NONE)
           debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
-            s"on partition $topicPartition failed due to ${Errors.forCode(data.errorCode).exceptionName}")
+            s"on partition $topicPartition failed due to ${data.error.exceptionName}")
 
         fetchedPartitionData.put(topicPartition, data)
 
@@ -584,7 +584,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ =>
-      new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, List[JLong]().asJava)
+      new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, List[JLong]().asJava)
     )
 
     val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) =>
@@ -609,17 +609,17 @@ class KafkaApis(val requestChannel: RequestChannel,
               allOffsets
           }
         }
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, offsets.map(new JLong(_)).asJava))
+        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(new JLong(_)).asJava))
       } catch {
         // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
         // are typically transient and there is no value in logging the entire stack trace for the same
         case e @ ( _ : UnknownTopicOrPartitionException | _ : NotLeaderForPartitionException) =>
           debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
             correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
+          (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
         case e: Throwable =>
           error("Error while responding to offset request", e)
-          (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava))
+          (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
       }
     }
     responseMap ++ unauthorizedResponseStatus
@@ -635,7 +635,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => {
-      new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code,
+      new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
                                            ListOffsetResponse.UNKNOWN_TIMESTAMP,
                                            ListOffsetResponse.UNKNOWN_OFFSET)
     })
@@ -644,7 +644,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (offsetRequest.duplicatePartitions().contains(topicPartition)) {
         debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " +
             s"failed because the partition is duplicated in the request.")
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST.code,
+        (topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
                                                               ListOffsetResponse.UNKNOWN_TIMESTAMP,
                                                               ListOffsetResponse.UNKNOWN_OFFSET))
       } else {
@@ -671,7 +671,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             }
           }
 
-          (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE.code, found.timestamp, found.offset))
+          (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, found.timestamp, found.offset))
         } catch {
           // NOTE: These exceptions are special cased since these error messages are typically transient or the client
           // would have received a clear exception and there is no value in logging the entire stack trace for the same
@@ -680,12 +680,12 @@ class KafkaApis(val requestChannel: RequestChannel,
                     _ : UnsupportedForMessageFormatException) =>
             debug(s"Offset request with correlation id $correlationId from client $clientId on " +
                 s"partition $topicPartition failed due to ${e.getMessage}")
-            (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code,
+            (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e),
                                                                   ListOffsetResponse.UNKNOWN_TIMESTAMP,
                                                                   ListOffsetResponse.UNKNOWN_OFFSET))
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code,
+            (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e),
                                                                   ListOffsetResponse.UNKNOWN_TIMESTAMP,
                                                                   ListOffsetResponse.UNKNOWN_OFFSET))
         }
@@ -805,7 +805,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
         if (topic == Topic.GroupMetadataTopicName) {
           val topicMetadata = createGroupMetadataTopic()
-          if (topicMetadata.error() == Errors.GROUP_COORDINATOR_NOT_AVAILABLE) {
+          if (topicMetadata.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE) {
             new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, Topic.isInternal(topic),
               java.util.Collections.emptyList())
           } else topicMetadata
@@ -973,7 +973,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
 
     if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
-      val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
+      val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
       requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     } else {
       val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
@@ -982,7 +982,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName)
 
       val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) {
-        new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
+        new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode)
       } else {
         val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
           .find(_.partition == partition)
@@ -990,9 +990,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         coordinatorEndpoint match {
           case Some(endpoint) if !endpoint.isEmpty =>
-            new GroupCoordinatorResponse(Errors.NONE.code, endpoint)
+            new GroupCoordinatorResponse(Errors.NONE, endpoint)
           case _ =>
-            new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
+            new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode)
         }
       }
 
@@ -1015,7 +1015,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             val assignment = ByteBuffer.wrap(member.assignment)
             new DescribeGroupsResponse.GroupMember(member.memberId, member.clientId, member.clientHost, metadata, assignment)
           }
-          groupId -> new DescribeGroupsResponse.GroupMetadata(error.code, summary.state, summary.protocolType,
+          groupId -> new DescribeGroupsResponse.GroupMetadata(error, summary.state, summary.protocolType,
             summary.protocol, members.asJava)
         }
     }.toMap
@@ -1030,7 +1030,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     } else {
       val (error, groups) = coordinator.handleListGroups()
       val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) }
-      new ListGroupsResponse(error.code, allGroups.asJava)
+      new ListGroupsResponse(error, allGroups.asJava)
     }
     requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
   }
@@ -1041,7 +1041,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     // the callback for sending a join-group response
     def sendResponseCallback(joinResult: JoinGroupResult) {
       val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
-      val responseBody = new JoinGroupResponse(request.header.apiVersion, joinResult.errorCode, joinResult.generationId,
+      val responseBody = new JoinGroupResponse(request.header.apiVersion, joinResult.error, joinResult.generationId,
         joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
 
       trace("Sending join group response %s for correlation id %d to client %s."
@@ -1052,7 +1052,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (!authorize(request.session, Read, new Resource(Group, joinGroupRequest.groupId()))) {
       val responseBody = new JoinGroupResponse(
         request.header.apiVersion,
-        Errors.GROUP_AUTHORIZATION_FAILED.code,
+        Errors.GROUP_AUTHORIZATION_FAILED,
         JoinGroupResponse.UNKNOWN_GENERATION_ID,
         JoinGroupResponse.UNKNOWN_PROTOCOL,
         JoinGroupResponse.UNKNOWN_MEMBER_ID, // memberId
@@ -1079,13 +1079,13 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleSyncGroupRequest(request: RequestChannel.Request) {
     val syncGroupRequest = request.body.asInstanceOf[SyncGroupRequest]
 
-    def sendResponseCallback(memberState: Array[Byte], errorCode: Short) {
-      val responseBody = new SyncGroupResponse(errorCode, ByteBuffer.wrap(memberState))
+    def sendResponseCallback(memberState: Array[Byte], error: Errors) {
+      val responseBody = new SyncGroupResponse(error, ByteBuffer.wrap(memberState))
       requestChannel.sendResponse(new Response(request, responseBody))
     }
 
     if (!authorize(request.session, Read, new Resource(Group, syncGroupRequest.groupId()))) {
-      sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED.code)
+      sendResponseCallback(Array[Byte](), Errors.GROUP_AUTHORIZATION_FAILED)
     } else {
       coordinator.handleSyncGroup(
         syncGroupRequest.groupId(),
@@ -1101,15 +1101,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     val heartbeatRequest = request.body.asInstanceOf[HeartbeatRequest]
 
     // the callback for sending a heartbeat response
-    def sendResponseCallback(errorCode: Short) {
-      val response = new HeartbeatResponse(errorCode)
+    def sendResponseCallback(error: Errors) {
+      val response = new HeartbeatResponse(error)
       trace("Sending heartbeat response %s for correlation id %d to client %s."
         .format(response, request.header.correlationId, request.header.clientId))
       requestChannel.sendResponse(new RequestChannel.Response(request, response))
     }
 
     if (!authorize(request.session, Read, new Resource(Group, heartbeatRequest.groupId))) {
-      val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
+      val heartbeatResponse = new HeartbeatResponse(Errors.GROUP_AUTHORIZATION_FAILED)
       requestChannel.sendResponse(new Response(request, heartbeatResponse))
     }
     else {
@@ -1126,15 +1126,15 @@ class KafkaApis(val requestChannel: RequestChannel,
     val leaveGroupRequest = request.body.asInstanceOf[LeaveGroupRequest]
 
     // the callback for sending a leave-group response
-    def sendResponseCallback(errorCode: Short) {
-      val response = new LeaveGroupResponse(errorCode)
+    def sendResponseCallback(error: Errors) {
+      val response = new LeaveGroupResponse(error)
       trace("Sending leave group response %s for correlation id %d to client %s."
                     .format(response, request.header.correlationId, request.header.clientId))
       requestChannel.sendResponse(new RequestChannel.Response(request, response))
     }
 
     if (!authorize(request.session, Read, new Resource(Group, leaveGroupRequest.groupId))) {
-      val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED.code)
+      val leaveGroupResponse = new LeaveGroupResponse(Errors.GROUP_AUTHORIZATION_FAILED)
       requestChannel.sendResponse(new Response(request, leaveGroupResponse))
     } else {
       // let the coordinator to handle leave-group
@@ -1146,7 +1146,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleSaslHandshakeRequest(request: RequestChannel.Request) {
-    val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE.code, config.saslEnabledMechanisms)
+    val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms)
     requestChannel.sendResponse(new RequestChannel.Response(request, response))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 779f489..54431d9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -439,13 +439,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
               val clientResponse = networkClient.blockingSendAndReceive(request)(time)
 
               val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
-              if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining.isEmpty) {
+              if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining.isEmpty) {
                 shutdownSucceeded = true
                 info("Controlled shutdown succeeded")
               }
               else {
                 info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.asScala.mkString(",")))
-                info("Error code from controller: %d".format(shutdownResponse.errorCode))
+                info("Error code from controller: %d".format(shutdownResponse.error.code))
               }
             }
             catch {
@@ -511,14 +511,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
               response = channel.receive()
               val shutdownResponse = kafka.api.ControlledShutdownResponse.readFrom(response.payload())
-              if (shutdownResponse.errorCode == Errors.NONE.code && shutdownResponse.partitionsRemaining != null &&
+              if (shutdownResponse.error == Errors.NONE && shutdownResponse.partitionsRemaining != null &&
                 shutdownResponse.partitionsRemaining.isEmpty) {
                 shutdownSucceeded = true
                 info ("Controlled shutdown succeeded")
               }
               else {
                 info("Remaining partitions to move: %s".format(shutdownResponse.partitionsRemaining.mkString(",")))
-                info("Error code from controller: %d".format(shutdownResponse.errorCode))
+                info("Error code from controller: %d".format(shutdownResponse.error.code))
               }
             }
             catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 6e6cffa..c99d7c5 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -276,7 +276,7 @@ class ReplicaFetcherThread(name: String,
     val clientResponse = sendRequest(requestBuilder)
     val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse]
     val partitionData = response.responseData.get(topicPartition)
-    Errors.forCode(partitionData.errorCode) match {
+    partitionData.error match {
       case Errors.NONE =>
         if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2)
           partitionData.offset
@@ -321,7 +321,7 @@ object ReplicaFetcherThread {
 
   private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData {
 
-    def errorCode: Short = underlying.errorCode
+    def error = underlying.error
 
     def toRecords: MemoryRecords = {
       underlying.records.asInstanceOf[MemoryRecords]
@@ -329,7 +329,7 @@ object ReplicaFetcherThread {
 
     def highWatermark: Long = underlying.highWatermark
 
-    def exception: Option[Throwable] = Errors.forCode(errorCode) match {
+    def exception: Option[Throwable] = error match {
       case Errors.NONE => None
       case e => Some(e.exception)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 475b2ed..1cec4a2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -88,10 +88,10 @@ object LogReadResult {
                                            readSize = -1)
 }
 
-case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Short], errorCode: Short) {
+case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[TopicPartition, Errors], error: Errors) {
 
   override def toString = {
-    "update results: [%s], global error: [%d]".format(responseMap, errorCode)
+    "update results: [%s], global error: [%d]".format(responseMap, error.code)
   }
 }
 
@@ -219,9 +219,9 @@ class ReplicaManager(val config: KafkaConfig,
     scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
   }
 
-  def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Short  = {
+  def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Errors  = {
     stateChangeLogger.trace(s"Broker $localBrokerId handling stop replica (delete=$deletePartition) for partition $topicPartition")
-    val errorCode = Errors.NONE.code
+    val error = Errors.NONE
     getPartition(topicPartition) match {
       case Some(_) =>
         if (deletePartition) {
@@ -241,26 +241,26 @@ class ReplicaManager(val config: KafkaConfig,
         stateChangeLogger.trace(s"Broker $localBrokerId ignoring stop replica (delete=$deletePartition) for partition $topicPartition as replica doesn't exist on broker")
     }
     stateChangeLogger.trace(s"Broker $localBrokerId finished handling stop replica (delete=$deletePartition) for partition $topicPartition")
-    errorCode
+    error
   }
 
-  def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Short], Short) = {
+  def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors) = {
     replicaStateChangeLock synchronized {
-      val responseMap = new collection.mutable.HashMap[TopicPartition, Short]
+      val responseMap = new collection.mutable.HashMap[TopicPartition, Errors]
       if(stopReplicaRequest.controllerEpoch() < controllerEpoch) {
         stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d. Latest known controller epoch is %d"
           .format(localBrokerId, stopReplicaRequest.controllerEpoch, controllerEpoch))
-        (responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
+        (responseMap, Errors.STALE_CONTROLLER_EPOCH)
       } else {
         val partitions = stopReplicaRequest.partitions.asScala
         controllerEpoch = stopReplicaRequest.controllerEpoch
         // First stop fetchers for all partitions, then stop the corresponding replicas
         replicaFetcherManager.removeFetcherForPartitions(partitions)
         for (topicPartition <- partitions){
-          val errorCode = stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
-          responseMap.put(topicPartition, errorCode)
+          val error = stopReplica(topicPartition, stopReplicaRequest.deletePartitions)
+          responseMap.put(topicPartition, error)
         }
-        (responseMap, Errors.NONE.code)
+        (responseMap, Errors.NONE)
       }
     }
   }
@@ -649,12 +649,12 @@ class ReplicaManager(val config: KafkaConfig,
                                         leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
     }
     replicaStateChangeLock synchronized {
-      val responseMap = new mutable.HashMap[TopicPartition, Short]
+      val responseMap = new mutable.HashMap[TopicPartition, Errors]
       if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
         stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
           "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
           correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
-        BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
+        BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH)
       } else {
         val controllerId = leaderAndISRRequest.controllerId
         controllerEpoch = leaderAndISRRequest.controllerEpoch
@@ -674,7 +674,7 @@ class ReplicaManager(val config: KafkaConfig,
                 "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
                 .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                   topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
-              responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+              responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
             }
           } else {
             // Otherwise record the error code in response
@@ -682,7 +682,7 @@ class ReplicaManager(val config: KafkaConfig,
               "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d")
               .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
                 topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
-            responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
+            responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH)
           }
         }
 
@@ -709,7 +709,7 @@ class ReplicaManager(val config: KafkaConfig,
         replicaFetcherManager.shutdownIdleFetcherThreads()
 
         onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-        BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
+        BecomeLeaderOrFollowerResult(responseMap, Errors.NONE)
       }
     }
   }
@@ -731,7 +731,7 @@ class ReplicaManager(val config: KafkaConfig,
                           epoch: Int,
                           partitionState: Map[Partition, PartitionState],
                           correlationId: Int,
-                          responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {
+                          responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
     partitionState.keys.foreach { partition =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
         "starting the become-leader transition for partition %s")
@@ -739,7 +739,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
 
     for (partition <- partitionState.keys)
-      responseMap.put(partition.topicPartition, Errors.NONE.code)
+      responseMap.put(partition.topicPartition, Errors.NONE)
 
     val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
 
@@ -802,7 +802,7 @@ class ReplicaManager(val config: KafkaConfig,
                             epoch: Int,
                             partitionState: Map[Partition, PartitionState],
                             correlationId: Int,
-                            responseMap: mutable.Map[TopicPartition, Short],
+                            responseMap: mutable.Map[TopicPartition, Errors],
                             metadataCache: MetadataCache) : Set[Partition] = {
     partitionState.keys.foreach { partition =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
@@ -811,7 +811,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
 
     for (partition <- partitionState.keys)
-      responseMap.put(partition.topicPartition, Errors.NONE.code)
+      responseMap.put(partition.topicPartition, Errors.NONE)
 
     val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 7cc2ea7..b269966 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -179,10 +179,10 @@ object ConsumerOffsetChecker extends Logging {
                 throw z
           }
         }
-        else if (offsetAndMetadata.error == Errors.NONE.code)
+        else if (offsetAndMetadata.error == Errors.NONE)
           offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
         else {
-          println("Could not fetch offset for %s due to %s.".format(topicAndPartition, Errors.forCode(offsetAndMetadata.error).exception))
+          println("Could not fetch offset for %s due to %s.".format(topicAndPartition, offsetAndMetadata.error.exception))
         }
       }
       channel.disconnect()

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 7e31ac7..492b304 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -239,7 +239,7 @@ private class ReplicaBuffer(expectedReplicasPerTopicAndPartition: Map[TopicAndPa
 
   private def offsetResponseStringWithError(offsetResponse: OffsetResponse): String = {
     offsetResponse.partitionErrorAndOffsets.filter { case (_, partitionOffsetsResponse) =>
-      partitionOffsetsResponse.error != Errors.NONE.code
+      partitionOffsetsResponse.error != Errors.NONE
     }.mkString
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index c9d35af..3285bf2 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -105,24 +105,24 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DELETE_TOPICS -> classOf[requests.DeleteTopicsResponse]
   )
 
-  val RequestKeyToErrorCode = Map[ApiKeys, (Nothing) => Short](
-    ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code),
-    ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.error.code),
-    ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
-    ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode),
+  val RequestKeyToError = Map[ApiKeys, (Nothing) => Errors](
+    ApiKeys.METADATA -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2),
+    ApiKeys.PRODUCE -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.error),
+    ApiKeys.FETCH -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.error),
+    ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.error),
     ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
-    ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error.code),
-    ApiKeys.GROUP_COORDINATOR -> ((resp: requests.GroupCoordinatorResponse) => resp.errorCode()),
-    ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()),
-    ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.errorCode()),
-    ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.errorCode()),
-    ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.errorCode()),
-    ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.errorCode()),
+    ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
+    ApiKeys.GROUP_COORDINATOR -> ((resp: requests.GroupCoordinatorResponse) => resp.error),
+    ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.error),
+    ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
+    ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
+    ApiKeys.HEARTBEAT -> ((resp: HeartbeatResponse) => resp.error),
+    ApiKeys.LEAVE_GROUP -> ((resp: LeaveGroupResponse) => resp.error),
     ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
     ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2),
-    ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode()),
-    ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.error.code),
-    ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2.code)
+    ApiKeys.CONTROLLED_SHUTDOWN_KEY -> ((resp: requests.ControlledShutdownResponse) => resp.error),
+    ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => resp.errors().asScala.find(_._1 == createTopic).get._2.error),
+    ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors().asScala.find(_._1 == deleteTopic).get._2)
   )
 
   val RequestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -809,18 +809,18 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
                                             topicExists: Boolean = true): AbstractResponse = {
     val resp = send(request, apiKey)
     val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractResponse]
-    val error = Errors.forCode(RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractResponse) => Short](response))
+    val error = RequestKeyToError(apiKey).asInstanceOf[(AbstractResponse) => Errors](response)
 
     val authorizationErrorCodes = resources.flatMap { resourceType =>
       if (resourceType == Topic) {
         if (isAuthorized)
-          Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forCode(Topic.errorCode))
+          Set(Errors.UNKNOWN_TOPIC_OR_PARTITION, Topic.error)
         else if (!isAuthorizedTopicDescribe)
           Set(Errors.UNKNOWN_TOPIC_OR_PARTITION)
         else
-          Set(Errors.forCode(Topic.errorCode))
+          Set(Topic.error)
       } else {
-        Set(Errors.forCode(resourceType.errorCode))
+        Set(resourceType.error)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 3cab534..853dad6 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -91,7 +91,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
         override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw new FatalExitError
         override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
           fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
-            (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE.code, -1, null)))
+            (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, -1, null)))
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
index ffa3474..4788b4a 100644
--- a/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
+++ b/core/src/test/scala/kafka/tools/ReplicaVerificationToolTest.scala
@@ -45,7 +45,7 @@ class ReplicaVerificationToolTest {
         }
         val initialOffset = 4
         val memoryRecords = MemoryRecords.withRecords(initialOffset, records: _*)
-        replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE.code(), hw = 20,
+        replicaBuffer.addFetchedData(tp, replicaId, new FetchResponsePartitionData(Errors.NONE, hw = 20,
           new ByteBufferMessageSet(memoryRecords.buffer)))
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 324b440..d908175 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -92,7 +92,7 @@ object TestOffsetManager {
         numCommits.getAndIncrement
         commitTimer.time {
           val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload())
-          if (response.commitStatus.exists(_._2 != Errors.NONE.code)) numErrors.getAndIncrement
+          if (response.commitStatus.exists(_._2 != Errors.NONE)) numErrors.getAndIncrement
         }
         offset += 1
       }
@@ -155,7 +155,7 @@ object TestOffsetManager {
 
           fetchTimer.time {
             val response = OffsetFetchResponse.readFrom(channel.receive().payload())
-            if (response.requestInfo.exists(_._2.error != Errors.NONE.code)) {
+            if (response.requestInfo.exists(_._2.error != Errors.NONE)) {
               numErrors.getAndIncrement
             }
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9898d665/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index e6090c1..5342dac 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -85,8 +85,8 @@ object SerializationTestUtils {
 
   def createTestProducerResponse: ProducerResponse =
     ProducerResponse(1, Map(
-      TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001),
-      TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001)
+      TopicAndPartition(topic1, 0) -> ProducerResponseStatus(Errors.forCode(0.toShort), 10001),
+      TopicAndPartition(topic2, 0) -> ProducerResponseStatus(Errors.forCode(0.toShort), 20001)
     ), ProducerRequest.CurrentVersion, 100)
 
   def createTestFetchRequest: FetchRequest = new FetchRequest(requestInfo = requestInfos.toVector)
@@ -100,7 +100,7 @@ object SerializationTestUtils {
 
   def createTestOffsetResponse: OffsetResponse = {
     new OffsetResponse(0, collection.immutable.Map(
-      TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(Errors.NONE.code, Seq(1000l, 2000l, 3000l, 4000l)))
+      TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(Errors.NONE, Seq(1000l, 2000l, 3000l, 4000l)))
     )
   }
 
@@ -135,8 +135,8 @@ object SerializationTestUtils {
   }
 
   def createTestOffsetCommitResponse: OffsetCommitResponse = {
-    new OffsetCommitResponse(collection.immutable.Map(TopicAndPartition(topic1, 0) -> Errors.NONE.code,
-                                 TopicAndPartition(topic1, 1) -> Errors.NONE.code))
+    new OffsetCommitResponse(collection.immutable.Map(TopicAndPartition(topic1, 0) -> Errors.NONE,
+                                 TopicAndPartition(topic1, 1) -> Errors.NONE))
   }
 
   def createTestOffsetFetchRequest: OffsetFetchRequest = {
@@ -148,16 +148,16 @@ object SerializationTestUtils {
 
   def createTestOffsetFetchResponse: OffsetFetchResponse = {
     new OffsetFetchResponse(collection.immutable.Map(
-      TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", Errors.NONE.code),
-      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadata.NoMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-    ), errorCode = Errors.NONE.code)
+      TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", Errors.NONE),
+      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadata.NoMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+    ), error = Errors.NONE)
   }
 
   def createConsumerMetadataRequest: GroupCoordinatorRequest = GroupCoordinatorRequest("group 1", clientId = "client 1")
 
   def createConsumerMetadataResponse: GroupCoordinatorResponse = {
     GroupCoordinatorResponse(Some(
-      brokers.head.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), Errors.NONE.code, 0)
+      brokers.head.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), Errors.NONE, 0)
   }
 }
 
@@ -175,7 +175,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
   private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
-  private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, 0)
+  private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE, 0)
 
   @Test
   def testSerializationAndDeserialization() {
@@ -203,13 +203,13 @@ class RequestResponseSerializationTest extends JUnitSuite {
   @Test
   def testProduceResponseVersion() {
     val oldClientResponse = ProducerResponse(1, Map(
-      TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001),
-      TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001)
+      TopicAndPartition("t1", 0) -> ProducerResponseStatus(Errors.NONE, 10001),
+      TopicAndPartition("t2", 0) -> ProducerResponseStatus(Errors.NONE, 20001)
     ))
 
     val newClientResponse = ProducerResponse(1, Map(
-      TopicAndPartition("t1", 0) -> ProducerResponseStatus(0.toShort, 10001),
-      TopicAndPartition("t2", 0) -> ProducerResponseStatus(0.toShort, 20001)
+      TopicAndPartition("t1", 0) -> ProducerResponseStatus(Errors.NONE, 10001),
+      TopicAndPartition("t2", 0) -> ProducerResponseStatus(Errors.NONE, 20001)
     ), 1, 100)
 
     // new response should have 4 bytes more than the old response since delayTime is an INT32


Mime
View raw message