kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove AbstractFetcherThread.PartitionData (#5233)
Date Mon, 13 Aug 2018 17:41:39 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a2bc237  MINOR: Remove AbstractFetcherThread.PartitionData (#5233)
a2bc237 is described below

commit a2bc237cef86ca701e371196883429efb7f4074a
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Mon Aug 13 10:41:33 2018 -0700

    MINOR: Remove AbstractFetcherThread.PartitionData (#5233)
    
    Since ConsumerFetcherThread has been removed, we have
    an opportunity to simplify the *FetcherThread classes. This
    is an unambitious first step which removes the now unneeded
    `PartitionData` indirection.
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 33 +++++++++--------
 .../kafka/server/ReplicaAlterLogDirsThread.scala   | 42 ++++------------------
 .../scala/kafka/server/ReplicaFetcherThread.scala  | 29 ++-------------
 .../ReplicaFetcherThreadFatalErrorTest.scala       | 10 +++---
 .../kafka/server/AbstractFetcherThreadTest.scala   | 41 ++++++++++-----------
 5 files changed, 53 insertions(+), 102 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index e056ad6..fe9fc06 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.nio.ByteBuffer
 import java.util.concurrent.locks.ReentrantLock
 
 import kafka.cluster.{BrokerEndPoint, Replica}
@@ -37,8 +38,8 @@ import java.util.concurrent.atomic.AtomicLong
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
-import org.apache.kafka.common.record.MemoryRecords
-import org.apache.kafka.common.requests.EpochEndOffset
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
+import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse}
 
 import scala.math._
 
@@ -54,7 +55,7 @@ abstract class AbstractFetcherThread(name: String,
   extends ShutdownableThread(name, isInterruptible) {
 
   type REQ <: FetchRequest
-  type PD <: PartitionData
+  type PD = FetchResponse.PartitionData[Records]
 
   private[server] val partitionStates = new PartitionStates[PartitionFetchState]
   private val partitionMapLock = new ReentrantLock
@@ -67,7 +68,8 @@ abstract class AbstractFetcherThread(name: String,
   /* callbacks to be defined in subclass */
 
   // process fetched data
-  protected def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData:
PD)
+  protected def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData:
PD,
+                                     records: MemoryRecords)
 
   // handle a partition whose offset is out of range and return a new fetch offset
   protected def handleOffsetOutOfRange(topicPartition: TopicPartition): Long
@@ -177,13 +179,13 @@ abstract class AbstractFetcherThread(name: String,
               partitionData.error match {
                 case Errors.NONE =>
                   try {
-                    val records = partitionData.toRecords
+                    val records = toMemoryRecords(partitionData.records)
                     val newOffset = records.batches.asScala.lastOption.map(_.nextOffset).getOrElse(
                       currentPartitionFetchState.fetchOffset)
 
                     fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L,
partitionData.highWatermark - newOffset)
                     // Once we hand off the partition data to the subclass, we can't mess
with it any more in this thread
-                    processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset,
partitionData)
+                    processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset,
partitionData, records)
 
                     val validBytes = records.validBytes
                     // ReplicaDirAlterThread may have removed topicPartition from the partitionStates
after processing the partition data
@@ -227,7 +229,7 @@ abstract class AbstractFetcherThread(name: String,
 
                 case _ =>
                   error(s"Error for partition $topicPartition at offset ${currentPartitionFetchState.fetchOffset}",
-                    partitionData.exception.get)
+                    partitionData.error.exception)
                   partitionsWithError += topicPartition
               }
             })
@@ -400,6 +402,16 @@ abstract class AbstractFetcherThread(name: String,
     }.toMap
   }
 
+  private def toMemoryRecords(records: Records): MemoryRecords = {
+    records match {
+      case r: MemoryRecords => r
+      case r: FileRecords =>
+        val buffer = ByteBuffer.allocate(r.sizeInBytes)
+        r.readInto(buffer, 0)
+        MemoryRecords.readableRecords(buffer)
+    }
+  }
+
 }
 
 object AbstractFetcherThread {
@@ -411,13 +423,6 @@ object AbstractFetcherThread {
     def offset(topicPartition: TopicPartition): Long
   }
 
-  trait PartitionData {
-    def error: Errors
-    def exception: Option[Throwable]
-    def toRecords: MemoryRecords
-    def highWatermark: Long
-  }
-
 }
 
 object FetcherMetrics {
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index e46473b..08c4a17 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -17,20 +17,20 @@
 
 package kafka.server
 
-import java.nio.ByteBuffer
 import java.util
 
 import kafka.api.Request
 import kafka.cluster.BrokerEndPoint
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.server.QuotaFactory.UnboundedQuota
-import kafka.server.ReplicaAlterLogDirsThread.{FetchRequest, PartitionData}
+import kafka.server.ReplicaAlterLogDirsThread.FetchRequest
 import kafka.server.epoch.LeaderEpochCache
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
+import org.apache.kafka.common.record.{MemoryRecords, Records}
 import org.apache.kafka.common.requests.EpochEndOffset._
+import org.apache.kafka.common.requests.FetchResponse.PartitionData
 import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, FetchRequest =>
JFetchRequest}
 
 import scala.collection.JavaConverters._
@@ -50,13 +50,12 @@ class ReplicaAlterLogDirsThread(name: String,
                                 includeLogTruncation = true) {
 
   type REQ = FetchRequest
-  type PD = PartitionData
 
   private val replicaId = brokerConfig.brokerId
   private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
   private val fetchSize = brokerConfig.replicaFetchMaxBytes
 
-  def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
+  def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PD)] = {
     var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData[Records])] = null
     val request = fetchRequest.underlying.build()
 
@@ -83,16 +82,14 @@ class ReplicaAlterLogDirsThread(name: String,
     if (partitionData == null)
       throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}")
 
-    partitionData.map { case (key, value) =>
-      key -> new PartitionData(value)
-    }
+    partitionData
   }
 
   // process fetched data
-  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData:
PartitionData) {
+  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData:
PartitionData[Records],
+                           records: MemoryRecords) {
     val futureReplica = replicaMgr.getReplicaOrException(topicPartition, Request.FutureLocalReplicaId)
     val partition = replicaMgr.getPartition(topicPartition).get
-    val records = partitionData.toRecords
 
     if (fetchOffset != futureReplica.logEndOffset.messageOffset)
       throw new IllegalStateException("Offset mismatch for the future replica %s: fetched
offset = %d, log end offset = %d.".format(
@@ -254,29 +251,4 @@ object ReplicaAlterLogDirsThread {
     override def toString = underlying.toString
   }
 
-  private[server] class PartitionData(val underlying: FetchResponse.PartitionData[Records])
extends AbstractFetcherThread.PartitionData {
-
-    def error = underlying.error
-
-    def toRecords: MemoryRecords = {
-      if (underlying.records == MemoryRecords.EMPTY)
-        underlying.records.asInstanceOf[MemoryRecords]
-      else {
-        val buffer = ByteBuffer.allocate(underlying.records.sizeInBytes())
-        underlying.records.asInstanceOf[FileRecords].readInto(buffer, 0)
-        MemoryRecords.readableRecords(buffer)
-      }
-    }
-
-    def highWatermark: Long = underlying.highWatermark
-
-    def logStartOffset: Long = underlying.logStartOffset
-
-    def exception: Option[Throwable] = error match {
-      case Errors.NONE => None
-      case e => Some(e.exception)
-    }
-
-    override def toString = underlying.toString
-  }
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 27defd3..56335a6 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -57,7 +57,6 @@ class ReplicaFetcherThread(name: String,
                                 includeLogTruncation = true) {
 
   type REQ = FetchRequest
-  type PD = PartitionData
 
   private val replicaId = brokerConfig.brokerId
   private val logContext = new LogContext(s"[ReplicaFetcher replicaId=$replicaId, leaderId=${sourceBroker.id},
" +
@@ -110,10 +109,9 @@ class ReplicaFetcherThread(name: String,
   }
 
   // process fetched data
-  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData:
PartitionData) {
+  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData:
PD, records: MemoryRecords) {
     val replica = replicaMgr.getReplicaOrException(topicPartition)
     val partition = replicaMgr.getPartition(topicPartition).get
-    val records = partitionData.toRecords
 
     maybeWarnIfOversizedRecords(records, topicPartition)
 
@@ -235,16 +233,14 @@ class ReplicaFetcherThread(name: String,
       delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
   }
 
-  protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] =
{
+  protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PD)] = {
     try {
       val clientResponse = leaderEndpoint.sendRequest(fetchRequest.underlying)
       val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
       if (!fetchSessionHandler.handleResponse(fetchResponse)) {
         Nil
       } else {
-        fetchResponse.responseData.asScala.toSeq.map { case (key, value) =>
-          key -> new PartitionData(value)
-        }
+        fetchResponse.responseData.asScala.toSeq
       }
     } catch {
       case t: Throwable =>
@@ -403,23 +399,4 @@ object ReplicaFetcherThread {
     override def toString = underlying.toString
   }
 
-  private[server] class PartitionData(val underlying: FetchResponse.PartitionData[Records])
extends AbstractFetcherThread.PartitionData {
-
-    def error = underlying.error
-
-    def toRecords: MemoryRecords = {
-      underlying.records.asInstanceOf[MemoryRecords]
-    }
-
-    def highWatermark: Long = underlying.highWatermark
-
-    def logStartOffset: Long = underlying.logStartOffset
-
-    def exception: Option[Throwable] = error match {
-      case Errors.NONE => None
-      case e => Some(e.exception)
-    }
-
-    override def toString = underlying.toString
-  }
 }
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 5e81dc5..2f6db61 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -20,7 +20,7 @@ package kafka.server
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.BrokerEndPoint
-import kafka.server.ReplicaFetcherThread.{FetchRequest, PartitionData}
+import kafka.server.ReplicaFetcherThread.FetchRequest
 import kafka.utils.{Exit, TestUtils}
 import kafka.utils.TestUtils.createBrokerConfigs
 import kafka.zk.ZooKeeperTestHarness
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.requests.FetchResponse
 import org.apache.kafka.common.utils.Time
 import org.junit.{After, Test}
@@ -88,10 +89,11 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness
{
       import params._
       new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, config, replicaManager,
metrics, time, quotaManager) {
         override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = throw
new FatalExitError
-        override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)]
= {
+        override protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PD)]
= {
           fetchRequest.underlying.fetchData.asScala.keys.toSeq.map { tp =>
-            (tp, new PartitionData(new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE,
-              FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, null)))
+            (tp, new FetchResponse.PartitionData[Records](Errors.OFFSET_OUT_OF_RANGE,
+              FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+              FetchResponse.INVALID_LOG_START_OFFSET, null, null))
           }
         }
       }
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index 7ad9371..db98a87 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -20,12 +20,13 @@ package kafka.server
 import AbstractFetcherThread._
 import com.yammer.metrics.Metrics
 import kafka.cluster.BrokerEndPoint
-import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
+import kafka.server.AbstractFetcherThread.FetchRequest
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, SimpleRecord}
 import org.apache.kafka.common.requests.EpochEndOffset
+import org.apache.kafka.common.requests.FetchResponse.PartitionData
 import org.junit.Assert.{assertFalse, assertTrue}
 import org.junit.{Before, Test}
 
@@ -92,16 +93,6 @@ class AbstractFetcherThreadTest {
     override def offset(topicPartition: TopicPartition): Long = offsets(topicPartition)
   }
 
-  class TestPartitionData(records: MemoryRecords = MemoryRecords.EMPTY) extends PartitionData
{
-    override def error: Errors = Errors.NONE
-
-    override def toRecords: MemoryRecords = records
-
-    override def highWatermark: Long = 0L
-
-    override def exception: Option[Throwable] = None
-  }
-
   class DummyFetcherThread(name: String,
                            clientId: String,
                            sourceBroker: BrokerEndPoint,
@@ -109,18 +100,19 @@ class AbstractFetcherThreadTest {
     extends AbstractFetcherThread(name, clientId, sourceBroker, fetchBackOffMs, isInterruptible
= true, includeLogTruncation = false) {
 
     type REQ = DummyFetchRequest
-    type PD = PartitionData
 
     override def processPartitionData(topicPartition: TopicPartition,
                                       fetchOffset: Long,
-                                      partitionData: PartitionData): Unit = {}
+                                      partitionData: PD,
+                                      records: MemoryRecords): Unit = {}
 
     override def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = 0L
 
     override def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]): Unit =
{}
 
-    override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)]
=
-      fetchRequest.offsets.mapValues(_ => new TestPartitionData()).toSeq
+    override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, PD)]
=
+      fetchRequest.offsets.mapValues(_ => new PartitionData[Records](Errors.NONE, 0, 0,
0,
+        Seq.empty.asJava, MemoryRecords.EMPTY)).toSeq
 
     override protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition,
PartitionFetchState)]): ResultWithPartitions[DummyFetchRequest] =
       ResultWithPartitions(new DummyFetchRequest(partitionMap.map { case (k, v) => (k,
v.fetchOffset) }.toMap), Set())
@@ -166,14 +158,17 @@ class AbstractFetcherThreadTest {
     @volatile var logEndOffset = 0L
     @volatile var fetchCount = 0
 
-    private val normalPartitionDataSet = List(
-      new TestPartitionData(MemoryRecords.withRecords(0L, CompressionType.NONE, new SimpleRecord("hello".getBytes()))),
-      new TestPartitionData(MemoryRecords.withRecords(1L, CompressionType.NONE, new SimpleRecord("hello".getBytes())))
+    private val normalPartitionDataSet = List[PartitionData[Records]](
+      new PartitionData(Errors.NONE, 0L, 0L, 0L, Seq.empty.asJava,
+        MemoryRecords.withRecords(0L, CompressionType.NONE, new SimpleRecord("hello".getBytes))),
+      new PartitionData(Errors.NONE, 0L, 0L, 0L, Seq.empty.asJava,
+        MemoryRecords.withRecords(1L, CompressionType.NONE, new SimpleRecord("hello".getBytes)))
     )
 
     override def processPartitionData(topicPartition: TopicPartition,
                                       fetchOffset: Long,
-                                      partitionData: PartitionData): Unit = {
+                                      partitionData: PD,
+                                      records: MemoryRecords): Unit = {
       // Throw exception if the fetchOffset does not match the fetcherThread partition state
       if (fetchOffset != logEndOffset)
         throw new RuntimeException(
@@ -181,14 +176,13 @@ class AbstractFetcherThreadTest {
             .format(topicPartition, fetchOffset, logEndOffset))
 
       // Now check message's crc
-      val records = partitionData.toRecords
       for (batch <- records.batches.asScala) {
         batch.ensureValid()
         logEndOffset = batch.nextOffset
       }
     }
 
-    override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, TestPartitionData)]
= {
+    override protected def fetch(fetchRequest: DummyFetchRequest): Seq[(TopicPartition, PD)]
= {
       fetchCount += 1
       // Set the first fetch to get a corrupted message
       if (fetchCount == 1) {
@@ -199,7 +193,8 @@ class AbstractFetcherThreadTest {
         // flip some bits in the message to ensure the crc fails
         buffer.putInt(15, buffer.getInt(15) ^ 23422)
         buffer.putInt(30, buffer.getInt(30) ^ 93242)
-        fetchRequest.offsets.mapValues(_ => new TestPartitionData(records)).toSeq
+        fetchRequest.offsets.mapValues(_ => new PartitionData[Records](Errors.NONE, 0L,
0L, 0L,
+          Seq.empty.asJava, records)).toSeq
       } else {
         // Then, the following fetches get the normal data
         fetchRequest.offsets.mapValues(v => normalPartitionDataSet(v.toInt)).toSeq


Mime
View raw message