kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2428: Add sanity check in KafkaConsumer for the timeouts
Date Thu, 08 Oct 2015 01:26:39 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2254f2bfa -> 693d4ca1a


KAFKA-2428: Add sanity check in KafkaConsumer for the timeouts

Add sanity test in kafkaConsumer for the timeouts. This is a followup ticket for Kafka-2120.

Author: Mayuresh Gharat <mgharat@mgharat-ld1.linkedin.biz>

Reviewers: Dong Lin, Ismael Juma, Guozhang Wang

Closes #282 from MayureshGharat/Kafka-2428


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/693d4ca1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/693d4ca1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/693d4ca1

Branch: refs/heads/trunk
Commit: 693d4ca1a6fbb883fef5a2e7228ad56dd2f95fc8
Parents: 2254f2b
Author: Mayuresh Gharat <mgharat@linkedin.com>
Authored: Wed Oct 7 18:30:53 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Oct 7 18:30:53 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java   | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/693d4ca1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 0771687..a0d04bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -494,6 +495,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V>
{
         try {
             log.debug("Starting the Kafka consumer");
             this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+            int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
+            int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
+            if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <=
fetchMaxWaitMs)
+                throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should
be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
             this.time = new SystemTime();
 
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))


Mime
View raw message