kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Fix bugs identified by compiler warnings (#6258)
Date Fri, 15 Feb 2019 01:13:33 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a421dd2  MINOR: Fix bugs identified by compiler warnings (#6258)
a421dd2 is described below

commit a421dd2a26ca140f821cd5be1a4f716cf04beb43
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Thu Feb 14 17:13:20 2019 -0800

    MINOR: Fix bugs identified by compiler warnings (#6258)
    
    - Add missing string interpolation
    - Fix and simplify testElectPreferredLeaders
    - Remove unused code
    - Replace deprecated usage of JUnit `assertThat`
    - Change var to val and fix non-exhaustive pattern match
    - Fix eta warning
    - Simplify code
    - Remove commented out code
    
    Reviewers: Jun Rao <junrao@gmail.com>
---
 core/src/main/scala/kafka/admin/TopicCommand.scala |   3 +-
 .../kafka/coordinator/group/GroupMetadata.scala    |   9 +-
 .../kafka/server/DelayedElectPreferredLeader.scala |   4 -
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   3 +-
 .../kafka/api/AdminClientIntegrationTest.scala     | 113 +++++++--------------
 .../integration/kafka/api/ConsumerBounceTest.scala |   2 +-
 .../scala/kafka/tools/CustomDeserializerTest.scala |   4 +-
 ...PreferredReplicaLeaderElectionCommandTest.scala |   4 +-
 .../scala/unit/kafka/admin/TopicCommandTest.scala  |   3 +-
 .../admin/TopicCommandWithAdminClientTest.scala    |   4 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |   2 +-
 .../coordinator/group/GroupMetadataTest.scala      |   1 -
 .../server/AbstractCreateTopicsRequestTest.scala   |   1 -
 .../kafka/server/ReplicaFetcherThreadTest.scala    |   4 +-
 15 files changed, 50 insertions(+), 109 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 2fcbe17..a4fa20f 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -559,7 +559,8 @@ object TopicCommand extends Logging {
 
     private val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware
replica assignment")
 
-    private val forceOpt = parser.accepts("force", "Suppress console prompts")
+    // This is not currently used, but we keep it for compatibility
+    parser.accepts("force", "Suppress console prompts")
 
     private val excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal
topics when running list or describe command. The internal topics will be listed by default")
 
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 3bc0117..c02a020 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -232,13 +232,8 @@ private[group] class GroupMetadata(val groupId: String, initialState:
GroupState
         numMembersAwaitingJoin -= 1
     }
 
-    if (isLeader(memberId)) {
-      leaderId = if (members.isEmpty) {
-        None
-      } else {
-        Some(members.keys.head)
-      }
-    }
+    if (isLeader(memberId))
+      leaderId = members.keys.headOption
   }
 
   def isPendingMember(memberId: String): Boolean = pendingMembers.contains(memberId) &&
!has(memberId)
diff --git a/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala b/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala
index 38b07ad..f3543a8 100644
--- a/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala
+++ b/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala
@@ -55,10 +55,6 @@ class DelayedElectPreferredLeader(delayMs: Long,
     responseCallback(timedout ++ fullResults)
   }
 
-  private def timeoutWaiting = {
-    waitingPartitions.map(partition => partition -> new ApiError(Errors.REQUEST_TIMED_OUT,
null)).toMap
-  }
-
   /**
     * Try to complete the delayed operation by first checking if the operation
     * can be completed by now. If yes execute the completion logic by calling
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 75027f9..23225e8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1402,7 +1402,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           setTopics(results)
         val responseBody = new CreateTopicsResponse(responseData)
         trace(s"Sending create topics response $responseData for correlation id " +
-          "${request.header.correlationId} to client ${request.header.clientId}.")
+          s"${request.header.correlationId} to client ${request.header.clientId}.")
         responseBody
       }
       sendResponseMaybeThrottle(request, createResponse)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 09b2e33..6d8d504 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -1751,7 +1751,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure:
Boo
 
     private def reCreate(): Stat = {
       val codeAfterDelete = delete()
-      var codeAfterReCreate = codeAfterDelete
+      val codeAfterReCreate = codeAfterDelete
       debug(s"Result of znode ephemeral deletion at $path is: $codeAfterDelete")
       if (codeAfterDelete == Code.OK || codeAfterDelete == Code.NONODE) {
         create()
@@ -1880,6 +1880,7 @@ object KafkaZkClient {
                   case _ => null
                 }
                 SetDataResponse(resultCode, setDataOp.path, ctx, stat, responseMetadata)
+              case zkOp => throw new IllegalStateException(s"Unexpected zkOp: $zkOp")
             }
           case null => throw KeeperException.create(resultCode)
           case _ => throw new IllegalStateException(s"Cannot unwrap $response because
the first zookeeper op is not check op in original MultiRequest")
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 5a3278c..96de860 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -1274,17 +1274,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     waitForLeaderToBecome(partition1, 1)
 
     // topic 2 unchanged
-    try {
-      electResult.partitionResult(partition2).get()
-      fail("topic 2 wasn't requested")
-    } catch {
-      case e: ExecutionException =>
-        val cause = e.getCause
-        assertTrue(cause.getClass.getName, cause.isInstanceOf[UnknownTopicOrPartitionException])
-        assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\"
was not attempted",
-          cause.getMessage)
-        assertEquals(0, currentLeader(partition2))
-    }
+    var e = intercept[ExecutionException](electResult.partitionResult(partition2).get()).getCause
+    assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass)
+    assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\"
was not attempted",
+      e.getMessage)
+    assertEquals(0, currentLeader(partition2))
 
     // meaningful election with null partitions
     electResult = client.electPreferredLeaders(null)
@@ -1298,17 +1292,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
     electResult = client.electPreferredLeaders(asList(unknownPartition))
     assertEquals(Set(unknownPartition).asJava, electResult.partitions.get)
-    try {
-      electResult.partitionResult(unknownPartition).get()
-    } catch {
-      case e: Exception =>
-        val cause = e.getCause
-        assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException])
-        assertEquals("The partition does not exist.",
-          cause.getMessage)
-        assertEquals(1, currentLeader(partition1))
-        assertEquals(1, currentLeader(partition2))
-    }
+    e = intercept[ExecutionException](electResult.partitionResult(unknownPartition).get()).getCause
+    assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass)
+    assertEquals("The partition does not exist.", e.getMessage)
+    assertEquals(1, currentLeader(partition1))
+    assertEquals(1, currentLeader(partition2))
 
     // Now change the preferred leader to 2
     changePreferredLeader(prefer2)
@@ -1318,15 +1306,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get)
     waitForLeaderToBecome(partition1, 2)
     assertEquals(1, currentLeader(partition2))
-    try {
-      electResult.partitionResult(unknownPartition).get()
-    } catch {
-      case e: Exception =>
-        val cause = e.getCause
-        assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException])
-        assertEquals("The partition does not exist.",
-          cause.getMessage)
-    }
+    e = intercept[ExecutionException](electResult.partitionResult(unknownPartition).get()).getCause
+    assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass)
+    assertEquals("The partition does not exist.", e.getMessage)
 
     // dupe partitions
     electResult = client.electPreferredLeaders(asList(partition2, partition2))
@@ -1338,63 +1320,36 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with
Logging {
     changePreferredLeader(prefer1)
     // but shut it down...
     servers(1).shutdown()
-    waitUntilTrue (
-      () => {
-        val description = client.describeTopics(Set (partition1.topic(), partition2.topic()).asJava).all().get()
-        return !description.asScala.flatMap{
-          case (topic, description) => description.partitions().asScala.map(
-            partition => partition.isr().asScala).flatten
-        }.exists(node => node.id == 1)
-      },
-      "Expect broker 1 to no longer be in any ISR"
-    )
+    waitUntilTrue (() => {
+      val description = client.describeTopics(Set(partition1.topic, partition2.topic).asJava).all().get()
+      val isr = description.asScala.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
+      !isr.exists(_.id == 1)
+    }, "Expect broker 1 to no longer be in any ISR")
 
     // ... now what happens if we try to elect the preferred leader and it's down?
     val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000)
     electResult = client.electPreferredLeaders(asList(partition1), shortTimeout)
     assertEquals(Set(partition1).asJava, electResult.partitions.get)
-    try {
-      electResult.partitionResult(partition1).get()
-      fail()
-    } catch {
-      case e: Exception =>
-        val cause = e.getCause
-        assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
-        assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains(
-          "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy
PreferredReplicaPartitionLeaderElectionStrategy"))
-    }
+    e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause
+    assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
+    assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains(
+      "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy
PreferredReplicaPartitionLeaderElectionStrategy"))
     assertEquals(2, currentLeader(partition1))
 
     // preferred leader unavailable with null argument
     electResult = client.electPreferredLeaders(null, shortTimeout)
-    try {
-      electResult.partitions.get()
-      fail()
-    } catch {
-      case e: Exception =>
-        val cause = e.getCause
-        assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
-    }
-    try {
-      electResult.partitionResult(partition1).get()
-      fail()
-    } catch {
-      case e: Exception =>
-        val cause = e.getCause
-        assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
-        assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains(
-          "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy
PreferredReplicaPartitionLeaderElectionStrategy"))
-    }
-    try {
-      electResult.partitionResult(partition2).get()
-      fail()
-    } catch {
-      case e: Exception =>
-        val cause = e.getCause
-        assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
-        assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains(
-          "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy
PreferredReplicaPartitionLeaderElectionStrategy"))
-    }
+    e = intercept[ExecutionException](electResult.partitions.get()).getCause
+    assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
+
+    e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause
+    assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
+    assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains(
+      "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy
PreferredReplicaPartitionLeaderElectionStrategy"))
+
+    e = intercept[ExecutionException](electResult.partitionResult(partition2).get()).getCause
+    assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
+    assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains(
+      "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy
PreferredReplicaPartitionLeaderElectionStrategy"))
 
     assertEquals(2, currentLeader(partition1))
     assertEquals(2, currentLeader(partition2))
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index a382154..e535104 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -418,7 +418,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
   /**
     * Creates N consumers with the same group ID and ensures the group rebalances properly
at each step
     */
-  private def createConsumersWithGroupId(groupId: String, consumerCount: Int, executor: ExecutorService,
topic: String = topic): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = {
+  private def createConsumersWithGroupId(groupId: String, consumerCount: Int, executor: ExecutorService,
topic: String): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = {
     val stableConsumers = ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
     for (_ <- 1.to(consumerCount)) {
       val newConsumer = createConsumerWithGroupId(groupId)
diff --git a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
index 37b5b79..f94a900 100644
--- a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
+++ b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
@@ -22,8 +22,8 @@ import java.io.PrintStream
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.serialization.Deserializer
 import org.hamcrest.CoreMatchers
+import org.hamcrest.MatcherAssert._
 import org.junit.Test
-import org.junit.Assert.assertThat
 import org.scalatest.mockito.MockitoSugar
 
 class CustomDeserializer extends Deserializer[String] {
@@ -31,7 +31,7 @@ class CustomDeserializer extends Deserializer[String] {
   }
 
   override def deserialize(topic: String, data: Array[Byte]): String = {
-    assertThat("topic must not be null", topic, CoreMatchers.notNullValue())
+    assertThat("topic must not be null", topic, CoreMatchers.notNullValue)
     new String(data)
   }
 
diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
index 824e8fb..96e7dac 100644
--- a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala
@@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 import java.util.Properties
 
-import kafka.admin.PreferredReplicaLeaderElectionCommand
 import kafka.common.{AdminCommandFailedException, TopicAndPartition}
 import kafka.network.RequestChannel
 import kafka.security.auth._
@@ -34,8 +33,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.junit.Assert._
 import org.junit.{After, Test}
 
-class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness with Logging
/*with RackAwareTest*/ {
-
+class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness with Logging
{
   var servers: Seq[KafkaServer] = Seq()
 
   @After
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index eeb9101..008fd66 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -172,7 +172,6 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with
RackAwareT
     val brokers = List(0)
     TestUtils.createBrokersInZk(zkClient, brokers)
 
-    val topic = "testTopic"
     topicService.createTopic(new TopicCommandOptions(
       Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))
 
@@ -570,4 +569,4 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with
RackAwareT
     assertFalse(TestUtils.grabConsoleOutput(topicService.deleteTopic(escapedCommandOpts)).contains(topic2))
     assertTrue(TestUtils.grabConsoleOutput(topicService.deleteTopic(unescapedCommandOpts)).contains(topic2))
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index 238afa3..e0597de 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -71,7 +71,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with
Loggin
   }
 
   def assertCheckArgsExitCode(expected: Int, options: TopicCommandOptions) {
-    assertExitCode(expected, options.checkArgs)
+    assertExitCode(expected, options.checkArgs _)
   }
 
   def createAndWaitTopic(opts: TopicCommandOptions): Unit = {
@@ -635,4 +635,4 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with
Loggin
     assertTrue(output.contains(testTopicName))
     assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index c58532d..615b21b 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -558,7 +558,7 @@ class PartitionTest {
       case Right(Some(offsetAndTimestamp)) => fail("Should have failed")
       case Right(None) => fail("Should have failed")
       case Left(e: OffsetNotAvailableException) => // ok
-      case Left(e: ApiException) => fail("Should have seen OffsetNotAvailableException,
saw $e")
+      case Left(e: ApiException) => fail(s"Should have seen OffsetNotAvailableException,
saw $e")
     }
 
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 9281ea9..3108b15 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.coordinator.group
 
-import kafka.admin.ConsumerGroupCommand.GroupState
 import kafka.common.OffsetAndMetadata
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index bef894a..ec520f9 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -29,7 +29,6 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, CreateTopicsResponse,
MetadataRequest, MetadataResponse}
 import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
 
-import scala.collection.JavaConverters
 import scala.collection.JavaConverters._
 
 class AbstractCreateTopicsRequestTest extends BaseRequestTest {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 04e0218..e6df8e8 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -24,7 +24,6 @@ import kafka.cluster.Partition
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -33,7 +32,7 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.utils.SystemTime
 import org.easymock.EasyMock._
-import org.easymock.{Capture, CaptureType, IAnswer}
+import org.easymock.{Capture, CaptureType}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -46,7 +45,6 @@ class ReplicaFetcherThreadTest {
   private val t1p1 = new TopicPartition("topic1", 1)
   private val t2p1 = new TopicPartition("topic2", 1)
 
-  private var toFail = false
   private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000)
 
   private def offsetAndEpoch(fetchOffset: Long, leaderEpoch: Int = 1): OffsetAndEpoch = {


Mime
View raw message