kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject kafka git commit: KAFKA-2960 KAFKA-1148; Clear purgatory for partitions before becoming follower
Date Fri, 11 Mar 2016 19:22:28 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 764d8ca9e -> 02d4da5f6


KAFKA-2960 KAFKA-1148; Clear purgatory for partitions before becoming follower

Author: Jiangjie Qin <becket.qin@gmail.com>

Reviewers: Aditya Auradkar <aauradkar@linkedin.com>, Ismael Juma <ismael@juma.me.uk>,
Joel Koshy <jjkoshy.w@gmail.com>, Jun Rao <junrao@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #1018 from becketqin/KAFKA-2960


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/02d4da5f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/02d4da5f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/02d4da5f

Branch: refs/heads/trunk
Commit: 02d4da5f64989b41358cdfd94d95b94fb4e20198
Parents: 764d8ca
Author: Jiangjie Qin <becket.qin@gmail.com>
Authored: Fri Mar 11 11:22:15 2016 -0800
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Fri Mar 11 11:22:15 2016 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/ReplicaManager.scala     |   8 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  | 109 ++++++++++++++++---
 2 files changed, 100 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/02d4da5f/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5655313..de58e56 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -737,7 +737,8 @@ class ReplicaManager(val config: KafkaConfig,
    * 2. Mark the replicas as followers so that no more data can be added from the producer
clients.
    * 3. Stop fetchers for these partitions so that no more data can be added by the replica
fetcher threads.
    * 4. Truncate the log and checkpoint offsets for these partitions.
-   * 5. If the broker is not shutting down, add the fetcher to the new leaders.
+   * 5. Clear the produce and fetch requests in the purgatory
+   * 6. If the broker is not shutting down, add the fetcher to the new leaders.
    *
    * The ordering of doing these steps make sure that the replicas in transition will not
    * take any more messages before checkpointing offsets so that all messages before the
checkpoint
@@ -800,6 +801,11 @@ class ReplicaManager(val config: KafkaConfig,
       }
 
       logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition),
partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
+      partitionsToMakeFollower.foreach { partition =>
+        val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topic,
partition.partitionId)
+        tryCompleteDelayedProduce(topicPartitionOperationKey)
+        tryCompleteDelayedFetch(topicPartitionOperationKey)
+      }
 
       partitionsToMakeFollower.foreach { partition =>
         stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries
for partition [%s,%d] as part of " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/02d4da5f/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 32085f6..a5a8df1 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -18,25 +18,28 @@
 package kafka.server
 
 
-import kafka.api.SerializationTestUtils
-import kafka.message.{Message, ByteBufferMessageSet}
-import kafka.utils.{ZkUtils, MockScheduler, MockTime, TestUtils}
-import org.apache.kafka.common.requests.ProduceRequest
-
-import java.util.concurrent.atomic.AtomicBoolean
 import java.io.File
+import java.util.concurrent.atomic.AtomicBoolean
 
+import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo}
+import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+import kafka.message.{ByteBufferMessageSet, Message}
+import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.LeaderAndIsrRequest
+import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
+import org.apache.kafka.common.{BrokerEndPoint, TopicPartition}
 import org.easymock.EasyMock
-import org.I0Itec.zkclient.ZkClient
+import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Test
 
-import scala.collection.Map
 import scala.collection.JavaConverters._
+import scala.collection.Map
 
 class ReplicaManagerTest {
 
@@ -47,9 +50,9 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, false)
+    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time: MockTime = new MockTime()
+    val time = new MockTime()
     val jTime = new JMockTime
     val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time),
mockLogMgr,
@@ -73,7 +76,7 @@ class ReplicaManagerTest {
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     val zkUtils = ZkUtils(zkClient, false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time: MockTime = new MockTime()
+    val time = new MockTime()
     val jTime = new JMockTime
     val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time),
mockLogMgr,
@@ -84,7 +87,7 @@ class ReplicaManagerTest {
       rm.checkpointHighWatermarks()
     } finally {
       // shutdown the replica manager upon test completion
-      rm.shutdown(false)
+      rm.shutdown(checkpointHW = false)
       metrics.close()
     }
   }
@@ -94,9 +97,9 @@ class ReplicaManagerTest {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, false)
+    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time: MockTime = new MockTime()
+    val time = new MockTime()
     val jTime = new JMockTime
     val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time),
mockLogMgr,
@@ -112,10 +115,84 @@ class ReplicaManagerTest {
         messagesPerPartition = Map(new TopicPartition("test1", 0) -> new ByteBufferMessageSet(new
Message("first message".getBytes))),
         responseCallback = callback)
     } finally {
-      rm.shutdown(false)
+      rm.shutdown(checkpointHW = false)
       metrics.close()
     }
 
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
+
+  @Test
+  def testClearPurgatoryOnBecomingFollower() {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    val config = KafkaConfig.fromProps(props)
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
+    val time = new MockTime()
+    val jTime = new JMockTime
+    val metrics = new Metrics
+    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time),
mockLogMgr,
+      new AtomicBoolean(false))
+
+    try {
+      var produceCallbackFired = false
+      def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
+        assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code,
responseStatus.values.head.errorCode)
+        produceCallbackFired = true
+      }
+
+      var fetchCallbackFired = false
+      def fetchCallback(responseStatus: Map[TopicAndPartition, FetchResponsePartitionData])
= {
+        assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code,
responseStatus.values.head.error)
+        fetchCallbackFired = true
+      }
+
+      val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1))
+      val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+      EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+      EasyMock.replay(metadataCache)
+
+      val brokerList : java.util.List[Integer] = Seq[Integer](0, 1).asJava
+      val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1).asJava
+
+      val partition = rm.getOrCreatePartition(topic, 0)
+      partition.getOrCreateReplica(0)
+      // Make this replica the leader.
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0,
0, 0, brokerList, 0, brokerSet)).asJava,
+        Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava)
+      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
+      rm.getLeaderReplicaIfLocal(topic, 0)
+
+      // Append a message.
+      rm.appendMessages(
+        timeout = 1000,
+        requiredAcks = -1,
+        internalTopicsAllowed = false,
+        messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new
Message("first message".getBytes))),
+        responseCallback = produceCallback)
+
+      // Fetch some messages
+      rm.fetchMessages(
+        timeout = 1000,
+        replicaId = -1,
+        fetchMinBytes = 100000,
+        fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(0,
100000)),
+        responseCallback = fetchCallback)
+
+      // Make this replica the follower
+      val leaderAndIsrRequest2 = new LeaderAndIsrRequest(0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0,
1, 1, brokerList, 0, brokerSet)).asJava,
+        Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava)
+      rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {})
+
+      assertTrue(produceCallbackFired)
+      assertTrue(fetchCallbackFired)
+    } finally {
+      rm.shutdown(checkpointHW = false)
+      metrics.close()
+    }
+  }
 }


Mime
View raw message