Repository: kafka
Updated Branches:
refs/heads/trunk 106a74560 -> 6a13a3dba
KAFKA-3590; Handle not-enough-replicas errors when writing to offsets topic
Author: Jason Gustafson <jason@confluent.io>
Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>
Closes #1859 from hachikuji/KAFKA-3590
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6a13a3db
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6a13a3db
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6a13a3db
Branch: refs/heads/trunk
Commit: 6a13a3dbaddf99850b2583007577fa2a6e1e6d3a
Parents: 106a745
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Sep 23 13:13:29 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Fri Sep 23 13:13:29 2016 -0700
----------------------------------------------------------------------
.../kafka/coordinator/GroupCoordinator.scala | 24 ++---
.../coordinator/GroupMetadataManager.scala | 92 ++++++++++++--------
.../coordinator/GroupMetadataManagerTest.scala | 71 ++++++++++++---
3 files changed, 127 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6a13a3db/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 726426a..48efe39 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -267,14 +267,14 @@ class GroupCoordinator(val brokerId: Int,
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
- delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment,
(errorCode: Short) => {
+ delayedGroupStore = Some(groupManager.prepareStoreGroup(group, assignment,
(error: Errors) => {
group synchronized {
// another member may have joined the group while we were awaiting this
callback,
// so we must ensure we are still in the AwaitingSync state and the same
generation
// when it gets invoked. if we have transitioned to another state, then
do nothing
if (group.is(AwaitingSync) && generationId == group.generationId)
{
- if (errorCode != Errors.NONE.code) {
- resetAndPropagateAssignmentError(group, errorCode)
+ if (error != Errors.NONE) {
+ resetAndPropagateAssignmentError(group, error)
maybePrepareRebalance(group)
} else {
setAndPropagateAssignment(group, assignment)
@@ -549,19 +549,19 @@ class GroupCoordinator(val brokerId: Int,
private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]])
{
assert(group.is(AwaitingSync))
group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
- propagateAssignment(group, Errors.NONE.code)
+ propagateAssignment(group, Errors.NONE)
}
- private def resetAndPropagateAssignmentError(group: GroupMetadata, errorCode: Short) {
+ private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors) {
assert(group.is(AwaitingSync))
group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
- propagateAssignment(group, errorCode)
+ propagateAssignment(group, error)
}
- private def propagateAssignment(group: GroupMetadata, errorCode: Short) {
+ private def propagateAssignment(group: GroupMetadata, error: Errors) {
for (member <- group.allMemberMetadata) {
if (member.awaitingSyncCallback != null) {
- member.awaitingSyncCallback(member.assignment, errorCode)
+ member.awaitingSyncCallback(member.assignment, error.code)
member.awaitingSyncCallback = null
// reset the session timeout for members after propagating the member's assignment.
@@ -645,7 +645,7 @@ class GroupCoordinator(val brokerId: Int,
private def prepareRebalance(group: GroupMetadata) {
// if any members are awaiting sync, cancel their request and have them rejoin
if (group.is(AwaitingSync))
- resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS.code)
+ resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
group.transitionTo(PreparingRebalance)
info("Preparing to restabilize group %s with old generation %s".format(group.groupId,
group.generationId))
@@ -692,12 +692,12 @@ class GroupCoordinator(val brokerId: Int,
if (group.is(Empty)) {
info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
- delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, errorCode
=> {
- if (errorCode != Errors.NONE.code) {
+ delayedStore = Some(groupManager.prepareStoreGroup(group, Map.empty, error =>
{
+ if (error != Errors.NONE) {
// we failed to write the empty group metadata. If the broker fails before
another rebalance,
// the previous generation written to the log will become active again (and
most likely timeout).
// This should be safe since there are no active members in an empty generation,
so we just warn.
- warn(s"Failed to write empty metadata for group ${group.groupId}: ${Errors.forCode(errorCode).message()}")
+ warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
}
}))
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6a13a3db/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 1dc2a49..79d4411 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -175,7 +175,7 @@ class GroupMetadataManager(val brokerId: Int,
def prepareStoreGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
- responseCallback: Short => Unit): DelayedStore = {
+ responseCallback: Errors => Unit): DelayedStore = {
val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId))
val groupMetadataValueVersion = if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
0.toShort else GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
@@ -202,36 +202,45 @@ class GroupMetadataManager(val brokerId: Int,
// construct the error status in the propagated assignment response
// in the cache
val status = responseStatus(groupMetadataPartition)
+ val statusError = Errors.forCode(status.errorCode)
- var responseCode = Errors.NONE.code
- if (status.errorCode != Errors.NONE.code) {
- debug("Metadata from group %s with generation %d failed when appending to log due
to %s"
- .format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName))
+ val responseError = if (statusError == Errors.NONE) {
+ Errors.NONE
+ } else {
+ debug(s"Metadata from group ${group.groupId} with generation $generationId failed
when appending to log " +
+ s"due to ${statusError.exceptionName}")
// transform the log append error code to the corresponding the commit status error
code
- responseCode = if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code) {
- Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
- } else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code) {
- Errors.NOT_COORDINATOR_FOR_GROUP.code
- } else if (status.errorCode == Errors.REQUEST_TIMED_OUT.code) {
- Errors.REBALANCE_IN_PROGRESS.code
- } else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
- || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
- || status.errorCode == Errors.INVALID_FETCH_SIZE.code) {
-
- error("Appending metadata message for group %s generation %d failed due to %s,
returning UNKNOWN error code to the client"
- .format(group.groupId, generationId, Errors.forCode(status.errorCode).exceptionName))
-
- Errors.UNKNOWN.code
- } else {
- error("Appending metadata message for group %s generation %d failed due to unexpected
error: %s"
- .format(group.groupId, generationId, status.errorCode))
+ statusError match {
+ case Errors.UNKNOWN_TOPIC_OR_PARTITION
+ | Errors.NOT_ENOUGH_REPLICAS
+ | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+ Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+
+ case Errors.NOT_LEADER_FOR_PARTITION =>
+ Errors.NOT_COORDINATOR_FOR_GROUP
+
+ case Errors.REQUEST_TIMED_OUT =>
+ Errors.REBALANCE_IN_PROGRESS
+
+ case Errors.MESSAGE_TOO_LARGE
+ | Errors.RECORD_LIST_TOO_LARGE
+ | Errors.INVALID_FETCH_SIZE =>
+
+ error(s"Appending metadata message for group ${group.groupId} generation $generationId
failed due to " +
+ s"${statusError.exceptionName}, returning UNKNOWN error code to the client")
+
+ Errors.UNKNOWN
- status.errorCode
+ case other =>
+ error(s"Appending metadata message for group ${group.groupId} generation $generationId
failed " +
+ s"due to unexpected error: ${statusError.exceptionName}")
+
+ other
}
}
- responseCallback(responseCode)
+ responseCallback(responseError)
}
DelayedStore(groupMetadataMessageSet, putCacheCallback)
@@ -286,10 +295,11 @@ class GroupMetadataManager(val brokerId: Int,
// construct the commit response status and insert
// the offset and metadata to cache if the append status has no error
val status = responseStatus(offsetTopicPartition)
+ val statusError = Errors.forCode(status.errorCode)
val responseCode =
group synchronized {
- if (status.errorCode == Errors.NONE.code) {
+ if (statusError == Errors.NONE) {
if (!group.is(Dead)) {
filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata)
=>
group.completePendingOffsetWrite(topicAndPartition, offsetAndMetadata)
@@ -303,20 +313,28 @@ class GroupMetadataManager(val brokerId: Int,
}
}
- debug("Offset commit %s from group %s consumer %s with generation %d failed when
appending to log due to %s"
- .format(filteredOffsetMetadata, group.groupId, consumerId, generationId, Errors.forCode(status.errorCode).exceptionName))
+ debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer
$consumerId " +
+ s"with generation $generationId failed when appending to log due to ${statusError.exceptionName}")
// transform the log append error code to the corresponding the commit status
error code
- if (status.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
- else if (status.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code)
- Errors.NOT_COORDINATOR_FOR_GROUP.code
- else if (status.errorCode == Errors.MESSAGE_TOO_LARGE.code
- || status.errorCode == Errors.RECORD_LIST_TOO_LARGE.code
- || status.errorCode == Errors.INVALID_FETCH_SIZE.code)
- Errors.INVALID_COMMIT_OFFSET_SIZE.code
- else
- status.errorCode
+ val responseError = statusError match {
+ case Errors.UNKNOWN_TOPIC_OR_PARTITION
+ | Errors.NOT_ENOUGH_REPLICAS
+ | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+ Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+
+ case Errors.NOT_LEADER_FOR_PARTITION =>
+ Errors.NOT_COORDINATOR_FOR_GROUP
+
+ case Errors.MESSAGE_TOO_LARGE
+ | Errors.RECORD_LIST_TOO_LARGE
+ | Errors.INVALID_FETCH_SIZE =>
+ Errors.INVALID_COMMIT_OFFSET_SIZE
+
+ case other => other
+ }
+
+ responseError.code
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6a13a3db/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index b4f9ba3..0a1032f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -101,14 +101,48 @@ class GroupMetadataManagerTest {
expectAppendMessage(Errors.NONE)
EasyMock.replay(replicaManager)
- var errorCode: Option[Short] = None
- def callback(error: Short) {
- errorCode = Some(error)
+ var maybeError: Option[Errors] = None
+ def callback(error: Errors) {
+ maybeError = Some(error)
}
val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback)
groupMetadataManager.store(delayedStore)
- assertEquals(Errors.NONE.code, errorCode.get)
+ assertEquals(Some(Errors.NONE), maybeError)
+ }
+
+ @Test
+ def testStoreGroupErrorMapping() {
+ assertStoreGroupErrorMapping(Errors.NONE, Errors.NONE)
+ assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+ assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+ assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+ assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR_FOR_GROUP)
+ assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN)
+ assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN)
+ assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN)
+ assertStoreGroupErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE)
+ }
+
+ private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors) {
+ EasyMock.reset(replicaManager)
+
+ val group = new GroupMetadata(groupId)
+ groupMetadataManager.addGroup(group)
+
+ expectAppendMessage(appendError)
+ EasyMock.replay(replicaManager)
+
+ var maybeError: Option[Errors] = None
+ def callback(error: Errors) {
+ maybeError = Some(error)
+ }
+
+ val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback)
+ groupMetadataManager.store(delayedStore)
+ assertEquals(Some(expectedError), maybeError)
+
+ EasyMock.verify(replicaManager)
}
@Test
@@ -130,14 +164,14 @@ class GroupMetadataManagerTest {
expectAppendMessage(Errors.NONE)
EasyMock.replay(replicaManager)
- var errorCode: Option[Short] = None
- def callback(error: Short) {
- errorCode = Some(error)
+ var maybeError: Option[Errors] = None
+ def callback(error: Errors) {
+ maybeError = Some(error)
}
val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()),
callback)
groupMetadataManager.store(delayedStore)
- assertEquals(Errors.NONE.code, errorCode.get)
+ assertEquals(Some(Errors.NONE), maybeError)
}
@Test
@@ -183,6 +217,19 @@ class GroupMetadataManagerTest {
@Test
def testCommitOffsetFailure() {
+ assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+ assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+ assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+ assertCommitOffsetErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR_FOR_GROUP)
+ assertCommitOffsetErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
+ assertCommitOffsetErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
+ assertCommitOffsetErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE)
+ assertCommitOffsetErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE)
+ }
+
+ private def assertCommitOffsetErrorMapping(appendError: Errors, expectedError: Errors):
Unit = {
+ EasyMock.reset(replicaManager)
+
val memberId = ""
val generationId = -1
val topicPartition = new TopicPartition("foo", 0)
@@ -195,7 +242,7 @@ class GroupMetadataManagerTest {
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
- expectAppendMessage(Errors.NOT_LEADER_FOR_PARTITION)
+ expectAppendMessage(appendError)
EasyMock.replay(replicaManager)
var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None
@@ -210,11 +257,13 @@ class GroupMetadataManagerTest {
assertFalse(commitErrors.isEmpty)
val maybeError = commitErrors.get.get(topicPartition)
- assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP.code), maybeError)
+ assertEquals(Some(expectedError.code), maybeError)
assertFalse(group.hasOffsets)
val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition).map(_.offset))
+
+ EasyMock.verify(replicaManager)
}
@Test
@@ -400,7 +449,7 @@ class GroupMetadataManagerTest {
new PartitionResponse(error.code, 0L, Record.NO_TIMESTAMP)
)
)})
- EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes()
+ EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andStubReturn(Some(Message.MagicValue_V1))
}
|