kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2208; add consumer side error handling upon coordinator failure; reviewed by Onur Karaman
Date Wed, 03 Jun 2015 20:47:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 48edeca33 -> d22987f01


KAFKA-2208; add consumer side error handling upon coordinator failure; reviewed by Onur Karaman


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d22987f0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d22987f0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d22987f0

Branch: refs/heads/trunk
Commit: d22987f01d50549d855ae092b69f520d75bfeb7b
Parents: 48edeca
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Jun 3 13:47:01 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jun 3 13:47:08 2015 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Coordinator.java | 91 ++++++++++++++------
 .../clients/consumer/internals/Fetcher.java     |  7 +-
 .../common/requests/HeartbeatResponse.java      |  5 +-
 .../common/requests/JoinGroupResponse.java      |  7 +-
 .../kafka/coordinator/ConsumerCoordinator.scala | 27 +++---
 .../coordinator/ConsumerGroupMetadata.scala     | 18 ++--
 .../scala/kafka/network/RequestChannel.scala    |  9 +-
 .../main/scala/kafka/server/KafkaConfig.scala   | 28 ++++++
 .../kafka/api/ConsumerBounceTest.scala          | 10 ++-
 .../integration/kafka/api/ConsumerTest.scala    | 14 +--
 .../kafka/server/KafkaConfigConfigDefTest.scala |  5 ++
 11 files changed, 156 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index b2764df..fac7995 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -120,30 +120,58 @@ public final class Coordinator {
         // send a join group request to the coordinator
         log.debug("(Re-)joining group {} with subscribed topics {}", groupId, subscribedTopics);
 
-        JoinGroupRequest request = new JoinGroupRequest(groupId,
-            (int) this.sessionTimeoutMs,
-            subscribedTopics,
-            this.consumerId,
-            this.assignmentStrategy);
-        ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(),
null, now);
-
-        // process the response
-        JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
-        // TODO: needs to handle disconnects and errors, should not just throw exceptions
-        Errors.forCode(response.errorCode()).maybeThrow();
-        this.consumerId = response.consumerId();
-        this.generation = response.generationId();
-
-        // set the flag to refresh last committed offsets
-        this.subscriptions.needRefreshCommits();
-
-        log.debug("Joined group: {}", response);
-
-        // record re-assignment time
-        this.sensors.partitionReassignments.record(time.milliseconds() - now);
-
-        // return assigned partitions
-        return response.assignedPartitions();
+        // repeat processing the response until succeed or fatal error
+        do {
+            JoinGroupRequest request = new JoinGroupRequest(groupId,
+                (int) this.sessionTimeoutMs,
+                subscribedTopics,
+                this.consumerId,
+                this.assignmentStrategy);
+
+            ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, request.toStruct(),
null, now);
+            JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
+            short errorCode = response.errorCode();
+
+            if (errorCode == Errors.NONE.code()) {
+                this.consumerId = response.consumerId();
+                this.generation = response.generationId();
+
+                // set the flag to refresh last committed offsets
+                this.subscriptions.needRefreshCommits();
+
+                log.debug("Joined group: {}", response);
+
+                // record re-assignment time
+                this.sensors.partitionReassignments.record(time.milliseconds() - now);
+
+                // return assigned partitions
+                return response.assignedPartitions();
+            } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
+                // reset the consumer id and retry immediately
+                this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+                log.info("Attempt to join group {} failed due to unknown consumer id, resetting
and retrying.",
+                    groupId);
+            } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+                    || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+                // re-discover the coordinator and retry with backoff
+                coordinatorDead();
+                Utils.sleep(this.retryBackoffMs);
+
+                log.info("Attempt to join group {} failed due to obsolete coordinator information,
retrying.",
+                    groupId);
+            } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()
+                    || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code()
+                    || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
+                // log the error and re-throw the exception
+                log.error("Attempt to join group {} failed due to: {}",
+                    groupId, Errors.forCode(errorCode).exception().getMessage());
+                Errors.forCode(errorCode).maybeThrow();
+            } else {
+                // unexpected error, throw the exception
+                throw new KafkaException("Unexpected error in join group response: "
+                    + Errors.forCode(response.errorCode()).exception().getMessage());
+            }
+        } while (true);
     }
 
     /**
@@ -217,7 +245,6 @@ public final class Coordinator {
             // parse the response to get the offsets
             boolean offsetsReady = true;
             OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
-            // TODO: needs to handle disconnects
             Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
             for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry
: response.responseData().entrySet()) {
                 TopicPartition tp = entry.getKey();
@@ -239,7 +266,8 @@ public final class Coordinator {
                         // just ignore this partition
                         log.debug("Unknown topic or partition for " + tp);
                     } else {
-                        throw new IllegalStateException("Unexpected error code " + data.errorCode
+ " while fetching offset");
+                        throw new KafkaException("Unexpected error in fetch offset response:
"
+                            + Errors.forCode(data.errorCode).exception().getMessage());
                     }
                 } else if (data.offset >= 0) {
                     // record the position with the offset (-1 indicates no committed offset
to fetch)
@@ -471,9 +499,15 @@ public final class Coordinator {
                 if (response.errorCode() == Errors.NONE.code()) {
                     log.debug("Received successful heartbeat response.");
                 } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                    || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code())
{
+                        || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code())
{
+                    log.info("Attempt to heart beat failed since coordinator is either not
started or not valid, marking it as dead.");
                     coordinatorDead();
                 } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) {
+                    log.info("Attempt to heart beat failed since generation id is not legal,
try to re-join group.");
+                    subscriptions.needReassignment();
+                } else if (response.errorCode() == Errors.UNKNOWN_CONSUMER_ID.code()) {
+                    log.info("Attempt to heart beat failed since consumer id is not valid,
reset it and try to re-join group.");
+                    consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
                     subscriptions.needReassignment();
                 } else {
                     throw new KafkaException("Unexpected error in heartbeat response: "
@@ -506,9 +540,10 @@ public final class Coordinator {
                         log.debug("Committed offset {} for partition {}", offset, tp);
                         subscriptions.committed(tp, offset);
                     } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
-                        || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+                            || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                         coordinatorDead();
                     } else {
+                        // do not need to throw the exception but just log the error
                         log.error("Error committing partition {} at offset {}: {}",
                             tp,
                             offset,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ef9dd52..c5e577f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -231,13 +231,14 @@ public class Fetcher<K, V> {
                         log.debug("Fetched offset {} for partition {}", offset, topicPartition);
                         return offset;
                     } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
-                        || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
+                            || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                         log.warn("Attempt to fetch offsets for partition {} failed due to
obsolete leadership information, retrying.",
                             topicPartition);
                         awaitMetadataUpdate();
                     } else {
-                        // TODO: we should not just throw exceptions but should handle and
log it.
-                        Errors.forCode(errorCode).maybeThrow();
+                        log.error("Attempt to fetch offsets for partition {} failed due to:
{}",
+                            topicPartition, Errors.forCode(errorCode).exception().getMessage());
+                        awaitMetadataUpdate();
                     }
                 }
             } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index f548cd0..96e6ab0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -27,7 +27,10 @@ public class HeartbeatResponse extends AbstractRequestResponse {
     /**
      * Possible error code:
      *
-     * TODO
+     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_CONSUMER (16)
+     * ILLEGAL_GENERATION (22)
+     * UNKNOWN_CONSUMER_ID (25)
      */
 
     private final short errorCode;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index fd9c545..8d418cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -30,7 +30,12 @@ public class JoinGroupResponse extends AbstractRequestResponse {
     /**
      * Possible error code:
      *
-     * TODO
+     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_CONSUMER (16)
+     * INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23)
+     * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24)
+     * UNKNOWN_CONSUMER_ID (25)
+     * INVALID_SESSION_TIMEOUT (26)
      */
 
     private static final String GENERATION_ID_KEY_NAME = "group_generation_id";

http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index af06ad4..51e89c8 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -25,11 +25,6 @@ import org.apache.kafka.common.requests.JoinGroupRequest
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
 
-// TODO: expose MinSessionTimeoutMs and MaxSessionTimeoutMs in broker configs
-object ConsumerCoordinator {
-  private val MinSessionTimeoutMs = 6000
-  private val MaxSessionTimeoutMs = 30000
-}
 
 /**
  * ConsumerCoordinator handles consumer group and consumer offset management.
@@ -41,7 +36,6 @@ object ConsumerCoordinator {
 class ConsumerCoordinator(val config: KafkaConfig,
                           val zkClient: ZkClient,
                           val offsetManager: OffsetManager) extends Logging {
-  import ConsumerCoordinator._
 
   this.logIdent = "[ConsumerCoordinator " + config.brokerId + "]: "
 
@@ -93,15 +87,18 @@ class ConsumerCoordinator(val config: KafkaConfig,
       responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
     } else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) {
       responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code)
-    } else if (sessionTimeoutMs < MinSessionTimeoutMs || sessionTimeoutMs > MaxSessionTimeoutMs)
{
+    } else if (sessionTimeoutMs < config.consumerMinSessionTimeoutMs || sessionTimeoutMs
> config.consumerMaxSessionTimeoutMs) {
       responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code)
     } else {
-      val group = coordinatorMetadata.getGroup(groupId)
+      // only try to create the group if the group is not unknown AND
+      // the consumer id is UNKNOWN, if consumer is specified but group does not
+      // exist we should reject the request
+      var group = coordinatorMetadata.getGroup(groupId)
       if (group == null) {
         if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) {
           responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
         } else {
-          val group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy)
+          group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy)
           doJoinGroup(group, consumerId, topics, sessionTimeoutMs, partitionAssignmentStrategy,
responseCallback)
         }
       } else {
@@ -118,10 +115,16 @@ class ConsumerCoordinator(val config: KafkaConfig,
                           responseCallback:(Set[TopicAndPartition], String, Int, Short) =>
Unit) {
     group synchronized {
       if (group.is(Dead)) {
+        // if the group is marked as dead, it means some other thread has just removed the
group
+        // from the coordinator metadata; this is likely that the group has migrated to some
other
+        // coordinator OR the group is in a transient unstable phase. Let the consumer to
retry
+        // joining without specified consumer id,
         responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
       } else if (partitionAssignmentStrategy != group.partitionAssignmentStrategy) {
         responseCallback(Set.empty, consumerId, 0, Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code)
       } else if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID && !group.has(consumerId))
{
+        // if the consumer trying to register with a un-recognized id, send the response
to let
+        // it reset its consumer id and retry
         responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
       } else if (group.has(consumerId) && group.is(Stable) && topics == group.get(consumerId).topics)
{
         /*
@@ -170,6 +173,10 @@ class ConsumerCoordinator(val config: KafkaConfig,
     } else {
       val group = coordinatorMetadata.getGroup(groupId)
       if (group == null) {
+        // if the group is marked as dead, it means some other thread has just removed the
group
+        // from the coordinator metadata; this is likely that the group has migrated to some
other
+        // coordinator OR the group is in a transient unstable phase. Let the consumer to
retry
+        // joining without specified consumer id,
         responseCallback(Errors.UNKNOWN_CONSUMER_ID.code)
       } else {
         group synchronized {
@@ -304,7 +311,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
 
         if (group.isEmpty) {
           group.transitionTo(Dead)
-          info("Group %s generation %s is dead".format(group.groupId, group.generationId))
+          info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
           coordinatorMetadata.removeGroup(group.groupId, group.topics)
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
index 47bdfa7..0e3657f 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala
@@ -62,6 +62,14 @@ private[coordinator] case object Stable extends GroupState { val state:
Byte = 3
 private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
 
 
+private object ConsumerGroupMetadata {
+  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
+    Map(Dead -> Set(PreparingRebalance),
+      Stable -> Set(Rebalancing),
+      PreparingRebalance -> Set(Stable),
+      Rebalancing -> Set(PreparingRebalance))
+}
+
 /**
  * Group contains the following metadata:
  *
@@ -77,12 +85,6 @@ private[coordinator] case object Dead extends GroupState { val state: Byte
= 4 }
 private[coordinator] class ConsumerGroupMetadata(val groupId: String,
                                                  val partitionAssignmentStrategy: String)
{
 
-  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
-    Map(Dead -> Set(PreparingRebalance),
-      Stable -> Set(Rebalancing),
-      PreparingRebalance -> Set(Stable),
-      Rebalancing -> Set(PreparingRebalance))
-
   private val consumers = new mutable.HashMap[String, ConsumerMetadata]
   private var state: GroupState = Stable
   var generationId = 0
@@ -124,8 +126,8 @@ private[coordinator] class ConsumerGroupMetadata(val groupId: String,
   }
 
   private def assertValidTransition(targetState: GroupState) {
-    if (!validPreviousStates(targetState).contains(state))
+    if (!ConsumerGroupMetadata.validPreviousStates(targetState).contains(state))
       throw new IllegalStateException("Group %s should be in the %s states before moving
to %s state. Instead it is in %s state"
-        .format(groupId, validPreviousStates(targetState).mkString(","), targetState, state))
+        .format(groupId, ConsumerGroupMetadata.validPreviousStates(targetState).mkString(","),
targetState, state))
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1d0024c..24edb61 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -49,11 +49,14 @@ object RequestChannel extends Logging {
     @volatile var responseCompleteTimeMs = -1L
     @volatile var responseDequeueTimeMs = -1L
     val requestId = buffer.getShort()
+    // for server-side request / response format
+    // TODO: this will be removed once we migrated to client-side format
     val requestObj =
       if ( RequestKeys.keyToNameAndDeserializerMap.contains(requestId))
         RequestKeys.deserializerForKey(requestId)(buffer)
       else
         null
+    // for client-side request / response format
     val header: RequestHeader =
       if (requestObj == null) {
         buffer.rewind
@@ -68,7 +71,7 @@ object RequestChannel extends Logging {
 
     buffer = null
     private val requestLogger = Logger.getLogger("kafka.request.logger")
-    trace("Processor %d received request : %s".format(processor, requestObj))
+    trace("Processor %d received request : %s".format(processor, if (requestObj != null)
requestObj.describe(false) else header.toString + " : " + body.toString))
 
     def updateRequestMetrics() {
       val endTimeMs = SystemTime.milliseconds
@@ -101,10 +104,10 @@ object RequestChannel extends Logging {
       }
       if(requestLogger.isTraceEnabled)
         requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
-          .format(requestObj.describe(true), remoteAddress, totalTime, requestQueueTime,
apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
+          .format(if (requestObj != null) requestObj.describe(true) else header.toString
+ " : " + body.toString, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime,
responseQueueTime, responseSendTime))
       else if(requestLogger.isDebugEnabled) {
         requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
-          .format(requestObj.describe(false), remoteAddress, totalTime, requestQueueTime,
apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
+          .format(if (requestObj != null) requestObj.describe(false) else header.toString
+ " : " + body.toString, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime,
responseQueueTime, responseSendTime))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9efa15c..6f25afd 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -111,6 +111,10 @@ object Defaults {
   val ControlledShutdownRetryBackoffMs = 5000
   val ControlledShutdownEnable = true
 
+  /** ********* Consumer coordinator configuration ***********/
+  val ConsumerMinSessionTimeoutMs = 6000
+  val ConsumerMaxSessionTimeoutMs = 30000
+
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSize = OffsetManagerConfig.DefaultMaxMetadataSize
   val OffsetsLoadBufferSize = OffsetManagerConfig.DefaultLoadBufferSize
@@ -218,6 +222,9 @@ object KafkaConfig {
   val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
   val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
   val ControlledShutdownEnableProp = "controlled.shutdown.enable"
+  /** ********* Consumer coordinator configuration ***********/
+  val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms"
+  val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms"
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes"
   val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size"
@@ -343,6 +350,9 @@ object KafkaConfig {
   val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons.
This determines the number of retries when such failure happens"
   val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to
recover from the state that caused the previous failure (Controller fail over, replica lag
etc). This config determines the amount of time to wait before retrying."
   val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server"
+  /** ********* Consumer coordinator configuration ***********/
+  val ConsumerMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered
consumers"
+  val ConsumerMaxSessionTimeoutMsDoc = "The maximum allowed session timeout for registered
consumers"
   /** ********* Offset management configuration ***********/
   val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an
offset commit"
   val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading
offsets into the cache."
@@ -461,11 +471,16 @@ object KafkaConfig {
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable,
HIGH, UncleanLeaderElectionEnableDoc)
       .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol,
MEDIUM, InterBrokerSecurityProtocolDoc)
       .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion,
MEDIUM, InterBrokerProtocolVersionDoc)
+
       /** ********* Controlled shutdown configuration ***********/
       .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries,
MEDIUM, ControlledShutdownMaxRetriesDoc)
       .define(ControlledShutdownRetryBackoffMsProp, INT, Defaults.ControlledShutdownRetryBackoffMs,
MEDIUM, ControlledShutdownRetryBackoffMsDoc)
       .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM,
ControlledShutdownEnableDoc)
 
+      /** ********* Consumer coordinator configuration ***********/
+      .define(ConsumerMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs,
MEDIUM, ConsumerMinSessionTimeoutMsDoc)
+      .define(ConsumerMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs,
MEDIUM, ConsumerMaxSessionTimeoutMsDoc)
+
       /** ********* Offset management configuration ***********/
       .define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc)
       .define(OffsetsLoadBufferSizeProp, INT, Defaults.OffsetsLoadBufferSize, atLeast(1),
HIGH, OffsetsLoadBufferSizeDoc)
@@ -581,11 +596,16 @@ object KafkaConfig {
       uncleanLeaderElectionEnable = parsed.get(UncleanLeaderElectionEnableProp).asInstanceOf[Boolean],
       interBrokerSecurityProtocol = SecurityProtocol.valueOf(parsed.get(InterBrokerSecurityProtocolProp).asInstanceOf[String]),
       interBrokerProtocolVersion =  ApiVersion(parsed.get(InterBrokerProtocolVersionProp).asInstanceOf[String]),
+
       /** ********* Controlled shutdown configuration ***********/
       controlledShutdownMaxRetries = parsed.get(ControlledShutdownMaxRetriesProp).asInstanceOf[Int],
       controlledShutdownRetryBackoffMs = parsed.get(ControlledShutdownRetryBackoffMsProp).asInstanceOf[Int],
       controlledShutdownEnable = parsed.get(ControlledShutdownEnableProp).asInstanceOf[Boolean],
 
+      /** ********* Consumer coordinator configuration ***********/
+      consumerMinSessionTimeoutMs = parsed.get(ConsumerMinSessionTimeoutMsProp).asInstanceOf[Int],
+      consumerMaxSessionTimeoutMs = parsed.get(ConsumerMaxSessionTimeoutMsProp).asInstanceOf[Int],
+
       /** ********* Offset management configuration ***********/
       offsetMetadataMaxSize = parsed.get(OffsetMetadataMaxSizeProp).asInstanceOf[Int],
       offsetsLoadBufferSize = parsed.get(OffsetsLoadBufferSizeProp).asInstanceOf[Int],
@@ -729,6 +749,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
                   val controlledShutdownRetryBackoffMs: Int = Defaults.ControlledShutdownRetryBackoffMs,
                   val controlledShutdownEnable: Boolean = Defaults.ControlledShutdownEnable,
 
+                  /** ********* Consumer coordinator configuration ***********/
+                  val consumerMinSessionTimeoutMs: Int = Defaults.ConsumerMinSessionTimeoutMs,
+                  val consumerMaxSessionTimeoutMs: Int = Defaults.ConsumerMaxSessionTimeoutMs,
+
                   /** ********* Offset management configuration ***********/
                   val offsetMetadataMaxSize: Int = Defaults.OffsetMetadataMaxSize,
                   val offsetsLoadBufferSize: Int = Defaults.OffsetsLoadBufferSize,
@@ -951,6 +975,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/
     props.put(ControlledShutdownRetryBackoffMsProp, controlledShutdownRetryBackoffMs.toString)
     props.put(ControlledShutdownEnableProp, controlledShutdownEnable.toString)
 
+    /** ********* Consumer coordinator configuration ***********/
+    props.put(ConsumerMinSessionTimeoutMsProp, consumerMinSessionTimeoutMs.toString)
+    props.put(ConsumerMaxSessionTimeoutMsProp, consumerMaxSessionTimeoutMs.toString)
+
     /** ********* Offset management configuration ***********/
     props.put(OffsetMetadataMaxSizeProp, offsetMetadataMaxSize.toString)
     props.put(OffsetsLoadBufferSizeProp, offsetsLoadBufferSize.toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index fbc6706..f56096b 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -40,12 +40,14 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   val tp = new TopicPartition(topic, part)
 
   // configure the servers and clients
-  this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown
-  this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to
lose offset
-  this.serverConfig.setProperty("offsets.topic.num.partitions", "1")
+  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed
up shutdown
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't
want to lose offset
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "10") // set
small enough session timeout
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
+  this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20")
   this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
 
   override def generateConfigs() = {
@@ -60,7 +62,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     TestUtils.createTopic(this.zkClient, topic, 1, serverCount, this.servers)
   }
 
-  def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(5)
+  def testConsumptionWithBrokerFailures() = consumeWithBrokerFailures(20)
 
   /*
    * 1. Produce a bunch of messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index a1eed96..17b17b9 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -24,8 +24,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
 
-import kafka.utils.{ShutdownableThread, TestUtils, Logging}
-import kafka.server.OffsetManager
+import kafka.utils.{TestUtils, Logging}
+import kafka.server.{KafkaConfig, OffsetManager}
 
 import java.util.ArrayList
 import org.junit.Assert._
@@ -47,9 +47,10 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
   val tp = new TopicPartition(topic, part)
 
   // configure the servers and clients
-  this.serverConfig.setProperty("controlled.shutdown.enable", "false") // speed up shutdown
-  this.serverConfig.setProperty("offsets.topic.replication.factor", "3") // don't want to
lose offset
-  this.serverConfig.setProperty("offsets.topic.num.partitions", "1")
+  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // speed
up shutdown
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't
want to lose offset
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set
small enough session timeout
   this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all")
   this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test")
   this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString)
@@ -146,8 +147,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     assertNull(this.consumers(0).partitionsFor("non-exist-topic"))
   }
 
-  // TODO: fix test after fixing consumer-side Coordinator logic
-  def failingTestPartitionReassignmentCallback() {
+  def testPartitionReassignmentCallback() {
     val callback = new TestConsumerReassignmentCallback()
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "200"); //
timeout quickly to avoid slow test
     val consumer0 = new KafkaConsumer(this.consumerConfig, callback, new ByteArrayDeserializer(),
new ByteArrayDeserializer())

http://git-wip-us.apache.org/repos/asf/kafka/blob/d22987f0/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
index 8014a5a..71f48c0 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
@@ -133,6 +133,9 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
     Assert.assertEquals(expectedConfig.controlledShutdownRetryBackoffMs, actualConfig.controlledShutdownRetryBackoffMs)
     Assert.assertEquals(expectedConfig.controlledShutdownEnable, actualConfig.controlledShutdownEnable)
 
+    Assert.assertEquals(expectedConfig.consumerMinSessionTimeoutMs, actualConfig.consumerMinSessionTimeoutMs)
+    Assert.assertEquals(expectedConfig.consumerMaxSessionTimeoutMs, actualConfig.consumerMaxSessionTimeoutMs)
+
     Assert.assertEquals(expectedConfig.offsetMetadataMaxSize, actualConfig.offsetMetadataMaxSize)
     Assert.assertEquals(expectedConfig.offsetsLoadBufferSize, actualConfig.offsetsLoadBufferSize)
     Assert.assertEquals(expectedConfig.offsetsTopicReplicationFactor, actualConfig.offsetsTopicReplicationFactor)
@@ -330,6 +333,8 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
         case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_boolean", "0")
+        case KafkaConfig.ConsumerMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
+        case KafkaConfig.ConsumerMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number")
         case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")
         case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(),
name, "not_a_number", "0")


Mime
View raw message