kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5970; Use ReentrantLock for delayed operation lock to avoid blocking
Date Wed, 04 Oct 2017 18:35:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d985513b2 -> f8621b417


KAFKA-5970; Use ReentrantLock for delayed operation lock to avoid blocking

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>,
Jason Gustafson <jason@confluent.io>

Closes #3956 from rajinisivaram/KAFKA-5970-delayedproduce-deadlock


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f8621b41
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f8621b41
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f8621b41

Branch: refs/heads/trunk
Commit: f8621b4174ddb14f9e8377da34e81e1b7ddd205f
Parents: d985513
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
Authored: Wed Oct 4 11:35:37 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Wed Oct 4 11:35:37 2017 -0700

----------------------------------------------------------------------
 .../coordinator/group/DelayedHeartbeat.scala    |  4 -
 .../kafka/coordinator/group/DelayedJoin.scala   |  4 -
 .../group/GroupMetadataManager.scala            |  3 +-
 .../transaction/DelayedTxnMarker.scala          |  4 -
 .../transaction/TransactionStateManager.scala   |  6 +-
 .../scala/kafka/server/DelayedOperation.scala   | 29 ++++---
 .../scala/kafka/server/DelayedProduce.scala     | 11 +--
 .../scala/kafka/server/ReplicaManager.scala     |  3 +-
 .../group/GroupCoordinatorTest.scala            |  3 -
 .../group/GroupMetadataManagerTest.scala        |  2 -
 .../TransactionStateManagerTest.scala           |  2 -
 .../kafka/server/DelayedOperationTest.scala     | 81 +++++++++++++++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala |  3 -
 13 files changed, 105 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 2cbdf30..73d5d0f 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -30,10 +30,6 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                       sessionTimeout: Long)
   extends DelayedOperation(sessionTimeout) {
 
-  // overridden since tryComplete already synchronizes on the group. This makes it safe to
-  // call purgatory operations while holding the group lock.
-  override def safeTryComplete(): Boolean = tryComplete()
-
   override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline,
forceComplete _)
   override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
   override def onComplete() = coordinator.onCompleteHeartbeat()

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index 6a81242..5232287 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -35,10 +35,6 @@ private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
                                  rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout)
{
 
-  // overridden since tryComplete already synchronizes on the group. This makes it safe to
-  // call purgatory operations while holding the group lock.
-  override def safeTryComplete(): Boolean = tryComplete()
-
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete
_)
   override def onExpiration() = coordinator.onExpireJoin()
   override def onComplete() = coordinator.onCompleteJoin(group)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index c818b57..7519dc4 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -243,8 +243,7 @@ class GroupMetadataManager(brokerId: Int,
       internalTopicsAllowed = true,
       isFromClient = false,
       entriesPerPartition = records,
-      responseCallback = callback,
-      delayedProduceLock = Some(group))
+      responseCallback = callback)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
index 82c4a8c..bc0f1b7 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -28,10 +28,6 @@ private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
                                            completionCallback: Errors => Unit)
   extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) {
 
-  // overridden since tryComplete already synchronizes on the existing txn metadata. This
makes it safe to
-  // call purgatory operations while holding the group lock.
-  override def safeTryComplete(): Boolean = tryComplete()
-
   override def tryComplete(): Boolean = {
     txnMetadata synchronized {
       if (txnMetadata.topicPartitions.isEmpty)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 394817c..ad5d33b 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -196,8 +196,7 @@ class TransactionStateManager(brokerId: Int,
           internalTopicsAllowed = true,
           isFromClient = false,
           recordsPerPartition,
-          removeFromCacheCallback,
-          None
+          removeFromCacheCallback
         )
       }
 
@@ -601,8 +600,7 @@ class TransactionStateManager(brokerId: Int,
                 internalTopicsAllowed = true,
                 isFromClient = false,
                 recordsPerPartition,
-                updateCacheCallback,
-                delayedProduceLock = Some(newMetadata))
+                updateCacheCallback)
 
               trace(s"Appending new metadata $newMetadata for transaction id $transactionalId
with coordinator epoch $coordinatorEpoch to the local transaction log")
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 8997395..86bf1ff 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
 
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
@@ -46,6 +46,8 @@ import scala.collection.mutable.ListBuffer
 abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging
{
 
   private val completed = new AtomicBoolean(false)
+  // Visible for testing
+  private[server] val lock: ReentrantLock = new ReentrantLock
 
   /*
    * Force completing the delayed operation, if not already completed.
@@ -96,13 +98,18 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask
wi
   def tryComplete(): Boolean
 
   /**
-   * Thread-safe variant of tryComplete(). This can be overridden if the operation provides
its
-   * own synchronization.
+   * Thread-safe variant of tryComplete() that attempts completion only if the lock can be
acquired
+   * without blocking.
    */
-  def safeTryComplete(): Boolean = {
-    synchronized {
-      tryComplete()
-    }
+  private[server] def maybeTryComplete(): Boolean = {
+    if (lock.tryLock()) {
+      try {
+        tryComplete()
+      } finally {
+        lock.unlock()
+      }
+    } else
+      false
   }
 
   /*
@@ -196,7 +203,9 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName:
Stri
     // operation is unnecessarily added for watch. However, this is a less severe issue since
the
     // expire reaper will clean it up periodically.
 
-    var isCompletedByMe = operation.safeTryComplete()
+    // At this point the only thread that can attempt this operation is this current thread
+    // Hence it is safe to tryComplete() without a lock
+    var isCompletedByMe = operation.tryComplete()
     if (isCompletedByMe)
       return true
 
@@ -213,7 +222,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName:
Stri
       }
     }
 
-    isCompletedByMe = operation.safeTryComplete()
+    isCompletedByMe = operation.maybeTryComplete()
     if (isCompletedByMe)
       return true
 
@@ -335,7 +344,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName:
Stri
         if (curr.isCompleted) {
           // another thread has completed this operation, just remove it
           iter.remove()
-        } else if (curr.safeTryComplete()) {
+        } else if (curr.maybeTryComplete()) {
           iter.remove()
           completed += 1
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 0d452cc..ebbd9ee 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
 import com.yammer.metrics.core.Meter
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Pool
+
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -53,12 +54,9 @@ case class ProduceMetadata(produceRequiredAcks: Short,
 class DelayedProduce(delayMs: Long,
                      produceMetadata: ProduceMetadata,
                      replicaManager: ReplicaManager,
-                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
-                     lockOpt: Option[Object] = None)
+                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit)
   extends DelayedOperation(delayMs) {
 
-  val lock = lockOpt.getOrElse(this)
-
   // first update the acks pending variable according to the error code
   produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
     if (status.responseStatus.error == Errors.NONE) {
@@ -72,11 +70,6 @@ class DelayedProduce(delayMs: Long,
     trace("Initial partition status for %s is %s".format(topicPartition, status))
   }
 
-  override def safeTryComplete(): Boolean = lock synchronized {
-    tryComplete()
-  }
-
-
   /**
    * The delayed produce operation can be completed if every partition
    * it produces to is satisfied by one of the following:

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 3a4ecef..98a4be1 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -450,7 +450,6 @@ class ReplicaManager(val config: KafkaConfig,
                     isFromClient: Boolean,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
-                    delayedProduceLock: Option[Object] = None,
                     processingStatsCallback: Map[TopicPartition, RecordsProcessingStats]
=> Unit = _ => ()) {
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
@@ -470,7 +469,7 @@ class ReplicaManager(val config: KafkaConfig,
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults))
{
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
-        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback,
delayedProduceLock)
+        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
 
         // create a list of (topic, partition) pairs to use as keys for this delayed produce
operation
         val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 85d72c3..3fed45d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -1367,7 +1367,6 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]],
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@@ -1451,7 +1450,6 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]],
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
@@ -1481,7 +1479,6 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]],
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 4a509ed..46a1878 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1306,7 +1306,6 @@ class GroupMetadataManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]],
       EasyMock.anyObject())
     )
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -1321,7 +1320,6 @@ class GroupMetadataManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject().asInstanceOf[Option[Object]],
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index ed1636c..0a2b641 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -498,7 +498,6 @@ class TransactionStateManagerTest {
           EasyMock.eq(false),
           EasyMock.eq(recordsByPartition),
           EasyMock.capture(capturedArgument),
-          EasyMock.eq(None),
           EasyMock.anyObject()
         )).andAnswer(new IAnswer[Unit] {
           override def answer(): Unit = {
@@ -599,7 +598,6 @@ class TransactionStateManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
-      EasyMock.anyObject(),
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = capturedArgument.getValue.apply(

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 82cf642..fdfb582 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -17,6 +17,11 @@
 
 package kafka.server
 
+import java.util.concurrent.{Executors, Future}
+import java.util.concurrent.locks.ReentrantLock
+
+import kafka.utils.CoreUtils.inLock
+
 import org.apache.kafka.common.utils.Time
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
@@ -117,9 +122,83 @@ class DelayedOperationTest {
     assertEquals(Nil, cancelledOperations)
   }
 
+  @Test
+  def testDelayedOperationLock() {
+    val key = "key"
+    val executorService = Executors.newSingleThreadExecutor
+    try {
+      def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = {
+        (1 to count).map { _ =>
+          val op = new MockDelayedOperation(100000L)
+          purgatory.tryCompleteElseWatch(op, Seq(key))
+          assertFalse("Not completable", op.isCompleted)
+          op
+        }
+      }
+
+      def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = {
+        (1 to count).map { _ =>
+          val op = new MockDelayedOperation(100000L)
+          op.completable = true
+          op
+        }
+      }
+
+      def runOnAnotherThread(fun: => Unit, shouldComplete: Boolean): Future[_] = {
+        val future = executorService.submit(new Runnable {
+          def run() = fun
+        })
+        if (shouldComplete)
+          future.get()
+        else
+          assertFalse("Should not have completed", future.isDone)
+        future
+      }
+
+      def checkAndComplete(completableOps: Seq[MockDelayedOperation], expectedComplete: Seq[MockDelayedOperation]):
Unit = {
+        completableOps.foreach(op => op.completable = true)
+        val completed = purgatory.checkAndComplete(key)
+        assertEquals(expectedComplete.size, completed)
+        expectedComplete.foreach(op => assertTrue("Should have completed", op.isCompleted))
+        val expectedNotComplete = completableOps.toSet -- expectedComplete
+        expectedNotComplete.foreach(op => assertFalse("Should not have completed", op.isCompleted))
+      }
+
+      // If locks are free all completable operations should complete
+      var ops = createDelayedOperations(2)
+      checkAndComplete(ops, ops)
+
+      // Lock held by current thread, completable operations should complete
+      ops = createDelayedOperations(2)
+      inLock(ops(1).lock) {
+        checkAndComplete(ops, ops)
+      }
+
+      // Lock held by another thread, should not block, only operations that can be
+      // locked without blocking on the current thread should complete
+      ops = createDelayedOperations(2)
+      runOnAnotherThread(ops(0).lock.lock(), true)
+      try {
+        checkAndComplete(ops, Seq(ops(1)))
+      } finally {
+        runOnAnotherThread(ops(0).lock.unlock(), true)
+      }
+
+      // Immediately completable operations should complete without locking
+      ops = createCompletableOperations(2)
+      ops.foreach { op =>
+        assertTrue("Should have completed", purgatory.tryCompleteElseWatch(op, Seq(key)))
+        assertTrue("Should have completed", op.isCompleted)
+      }
+
+    } finally {
+      executorService.shutdown()
+    }
+  }
 
 
-  class MockDelayedOperation(delayMs: Long) extends DelayedOperation(delayMs) {
+  class MockDelayedOperation(delayMs: Long)
+    extends DelayedOperation(delayMs) {
     var completable = false
 
     def awaitExpiration() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f8621b41/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 508bc35..76ae35b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -180,7 +180,6 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
-      EasyMock.anyObject(),
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
@@ -219,7 +218,6 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
-      EasyMock.anyObject(),
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
@@ -250,7 +248,6 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.anyObject(),
-      EasyMock.anyObject(),
       EasyMock.anyObject()))
 
     EasyMock.replay(replicaManager)


Mime
View raw message