kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: MINOR: Increase timeout for flaky ResetConsumerGroupOffsetTest (#6900)
Date Fri, 07 Jun 2019 07:48:48 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 07dd7db  MINOR: Increase timeout for flaky ResetConsumerGroupOffsetTest (#6900)
07dd7db is described below

commit 07dd7db324ac623190e48720ce446d82320edc36
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Fri Jun 7 00:48:18 2019 -0700

    MINOR: Increase timeout for flaky ResetConsumerGroupOffsetTest (#6900)
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
---
 .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 104 ++++++++++-----------
 1 file changed, 49 insertions(+), 55 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index baf1d05..8b1bad6 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -22,6 +22,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.test
 import org.junit.Assert._
 import org.junit.Test
 
@@ -81,11 +82,29 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
 
+  private def basicArgs: Array[String] = {
+    Array("--reset-offsets",
+      "--bootstrap-server", brokerList,
+      "--timeout", test.TestUtils.DEFAULT_MAX_WAIT_MS.toString)
+  }
+
+  private def buildArgsForGroups(groups: Seq[String], args: String*): Array[String] = {
+    val groupArgs = groups.flatMap(group => Seq("--group", group)).toArray
+    basicArgs ++ groupArgs ++ args
+  }
+
+  private def buildArgsForGroup(group: String, args: String*): Array[String] = {
+    buildArgsForGroups(Seq(group), args: _*)
+  }
+
+  private def buildArgsForAllGroups(args: String*): Array[String] = {
+    basicArgs ++ Array("--all-groups") ++ args
+  }
+
   @Test
   def testResetOffsetsNotExistingGroup() {
     val group = "missing.group"
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--to-current", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--to-current", "--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
     // Make sure we got a coordinator
     TestUtils.waitUntilTrue(() => {
@@ -99,8 +118,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
   @Test
   def testResetOffsetsExistingTopic(): Unit = {
     val group = "new.group"
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--topic", topic,
-      "--to-offset", "50")
+    val args = buildArgsForGroup(group, "--topic", topic, "--to-offset", "50")
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
     resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true)
@@ -110,16 +128,15 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest
{
   @Test
   def testResetOffsetsExistingTopicSelectedGroups(): Unit = {
     produceMessages(topic, 100)
-    val groups = (
+    val groups =
       for (id <- 1 to 3) yield {
         val group = this.group + id
         val executor = addConsumerGroupExecutor(numConsumers = 1, topic = topic, group =
group)
         awaitConsumerProgress(count = 100L, group = group)
         executor.shutdown()
-        Array("--group", group)
-      }).toArray.flatten
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--topic", topic,
-      "--to-offset", "50") ++ groups
+        group
+      }
+    val args = buildArgsForGroups(groups,"--topic", topic, "--to-offset", "50")
     resetAndAssertOffsets(args, expectedOffset = 50, dryRun = true)
     resetAndAssertOffsets(args ++ Array("--dry-run"), expectedOffset = 50, dryRun = true)
     resetAndAssertOffsets(args ++ Array("--execute"), expectedOffset = 50)
@@ -127,8 +144,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
 
   @Test
   def testResetOffsetsExistingTopicAllGroups(): Unit = {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--all-groups",
"--topic", topic,
-      "--to-offset", "50")
+    val args = buildArgsForAllGroups("--topic", topic, "--to-offset", "50")
     produceMessages(topic, 100)
     for (group <- 1 to 3 map (group + _)) {
       val executor = addConsumerGroupExecutor(numConsumers = 1, topic = topic, group = group)
@@ -142,8 +158,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
 
   @Test
   def testResetOffsetsAllTopicsAllGroups(): Unit = {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--all-groups",
"--all-topics",
-      "--to-offset", "50")
+    val args = buildArgsForAllGroups("--all-topics", "--to-offset", "50")
     val topics = 1 to 3 map (topic + _)
     val groups = 1 to 3 map (group + _)
     topics foreach (topic => produceMessages(topic, 100))
@@ -172,8 +187,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     awaitConsumerProgress(count = 100L)
     executor.shutdown()
 
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--to-datetime", format.format(calendar.getTime), "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--to-datetime", format.format(calendar.getTime),
"--execute")
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
@@ -189,39 +203,34 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest
{
     awaitConsumerProgress(count = 100L)
     executor.shutdown()
 
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--to-datetime", format.format(checkpoint), "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--to-datetime", format.format(checkpoint),
"--execute")
     resetAndAssertOffsets(args, expectedOffset = 50)
   }
 
   @Test
   def testResetOffsetsByDuration() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--by-duration", "PT1M", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--by-duration", "PT1M", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
   @Test
   def testResetOffsetsByDurationToEarliest() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--by-duration", "PT0.1S", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--by-duration", "PT0.1S", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 100)
   }
 
   @Test
   def testResetOffsetsToEarliest() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--to-earliest", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--to-earliest", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
 
   @Test
   def testResetOffsetsToLatest() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--to-latest", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--to-latest", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 200)
@@ -229,8 +238,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
 
   @Test
   def testResetOffsetsToCurrentOffset() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--to-current", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--to-current", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 100)
@@ -238,16 +246,14 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest
{
 
   @Test
   def testResetOffsetsToSpecificOffset() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--to-offset", "1", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--to-offset", "1", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 1)
   }
 
   @Test
   def testResetOffsetsShiftPlus() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--shift-by", "50", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--shift-by", "50", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 150)
@@ -255,8 +261,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
 
   @Test
   def testResetOffsetsShiftMinus() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--shift-by", "-50", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--shift-by", "-50", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 50)
@@ -264,8 +269,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
 
   @Test
   def testResetOffsetsShiftByLowerThanEarliest() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--shift-by", "-150", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--shift-by", "-150", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
@@ -273,8 +277,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
 
   @Test
   def testResetOffsetsShiftByHigherThanLatest() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--shift-by", "150", "--execute")
+    val args = buildArgsForGroup(group, "--all-topics", "--shift-by", "150", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     produceMessages(topic, 100)
     resetAndAssertOffsets(args, expectedOffset = 200)
@@ -282,8 +285,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
 
   @Test
   def testResetOffsetsToEarliestOnOneTopic() {
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--topic", topic,
-      "--to-earliest", "--execute")
+    val args = buildArgsForGroup(group, "--topic", topic, "--to-earliest", "--execute")
     produceConsumeAndShutdown(topic, group, totalMessages = 100)
     resetAndAssertOffsets(args, expectedOffset = 0)
   }
@@ -293,8 +295,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     val topic = "bar"
     createTopic(topic, 2, 1)
 
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--topic",
-      s"$topic:1", "--to-earliest", "--execute")
+    val args = buildArgsForGroup(group, "--topic", s"$topic:1", "--to-earliest", "--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
 
     produceConsumeAndShutdown(topic, group, totalMessages = 100, numConsumers = 2)
@@ -315,8 +316,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     createTopic(topic1, 1, 1)
     createTopic(topic2, 1, 1)
 
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--topic", topic1,
-      "--topic", topic2, "--to-earliest", "--execute")
+    val args = buildArgsForGroup(group, "--topic", topic1, "--topic", topic2, "--to-earliest",
"--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
 
     produceConsumeAndShutdown(topic1, group, 100, 1)
@@ -342,8 +342,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     createTopic(topic1, 2, 1)
     createTopic(topic2, 2, 1)
 
-    val args = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--topic",
-      s"$topic1:1", "--topic", s"$topic2:1", "--to-earliest", "--execute")
+    val args = buildArgsForGroup(group, "--topic", s"$topic1:1", "--topic", s"$topic2:1",
"--to-earliest", "--execute")
     val consumerGroupCommand = getConsumerGroupService(args)
 
     produceConsumeAndShutdown(topic1, group, 100, 2)
@@ -372,8 +371,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     val tp1 = new TopicPartition(topic, 1)
     createTopic(topic, 2, 1)
 
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group,
"--all-topics",
-      "--to-offset", "2", "--export")
+    val cgcArgs = buildArgsForGroup(group, "--all-topics", "--to-offset", "2", "--export")
     val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     produceConsumeAndShutdown(topic = topic, group = group, totalMessages = 100, numConsumers
= 2)
@@ -387,8 +385,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     bw.close()
     assertEquals(Map(tp0 -> 2L, tp1 -> 2L), exportedOffsets(group).mapValues(_.offset))
 
-    val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group",
group, "--all-topics",
-      "--from-file", file.getCanonicalPath, "--dry-run")
+    val cgcArgsExec = buildArgsForGroup(group, "--all-topics", "--from-file", file.getCanonicalPath,
"--dry-run")
     val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
     val importedOffsets = consumerGroupCommandExec.resetOffsets()
     assertEquals(Map(tp0 -> 2L, tp1 -> 2L), importedOffsets(group).mapValues(_.offset))
@@ -411,8 +408,7 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
     createTopic(topic1, 2, 1)
     createTopic(topic2, 2, 1)
 
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group1,
"--group", group2, "--all-topics",
-      "--to-offset", "2", "--export")
+    val cgcArgs = buildArgsForGroups(Seq(group1, group2), "--all-topics", "--to-offset",
"2", "--export")
     val consumerGroupCommand = getConsumerGroupService(cgcArgs)
 
     produceConsumeAndShutdown(topic = topic1, group = group1, totalMessages = 100, numConsumers
= 2)
@@ -429,16 +425,14 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest
{
     assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), exportedOffsets(group2).mapValues(_.offset))
 
     // Multiple --group's offset import
-    val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group",
group1, "--group", group2, "--all-topics",
-      "--from-file", file.getCanonicalPath, "--dry-run")
+    val cgcArgsExec = buildArgsForGroups(Seq(group1, group2), "--all-topics", "--from-file",
file.getCanonicalPath, "--dry-run")
     val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec)
     val importedOffsets = consumerGroupCommandExec.resetOffsets()
     assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets(group1).mapValues(_.offset))
     assertEquals(Map(t2p0 -> 2L, t2p1 -> 2L), importedOffsets(group2).mapValues(_.offset))
 
     // Single --group offset import using "group,topic,partition,offset" csv format
-    val cgcArgsExec2 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group",
group1, "--all-topics",
-      "--from-file", file.getCanonicalPath, "--dry-run")
+    val cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", "--from-file", file.getCanonicalPath,
"--dry-run")
     val consumerGroupCommandExec2 = getConsumerGroupService(cgcArgsExec2)
     val importedOffsets2 = consumerGroupCommandExec2.resetOffsets()
     assertEquals(Map(t1p0 -> 2L, t1p1 -> 2L), importedOffsets2(group1).mapValues(_.offset))


Mime
View raw message