kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: MINOR: Use functional patterns in PartitionStates (#6089)
Date Fri, 04 Jan 2019 22:42:07 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fcdde2c  MINOR: Use functional patterns in PartitionStates (#6089)
fcdde2c is described below

commit fcdde2c604b7b9488862b3a2dba3b47be7a3d239
Author: Jason Gustafson <jason@confluent.io>
AuthorDate: Fri Jan 4 14:41:59 2019 -0800

    MINOR: Use functional patterns in PartitionStates (#6089)
    
    * MINOR: Use functional patterns in PartitionStates
    
    * Can't use fluent consumers yet in scala 2.11
---
 .../consumer/internals/SubscriptionState.java      | 61 +++++++++-------------
 .../kafka/common/internals/PartitionStates.java    |  5 ++
 .../scala/kafka/server/AbstractFetcherThread.scala | 24 +++++----
 3 files changed, 43 insertions(+), 47 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 1a5dd92..45712b0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -32,7 +32,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
 
 /**
  * A class for tracking the topics, partitions, and offsets for the consumer. A partition
@@ -218,7 +221,7 @@ public class SubscriptionState {
         this.assignment.clear();
         this.subscribedPattern = null;
         this.subscriptionType = SubscriptionType.NONE;
-        fireOnAssignment(Collections.<TopicPartition>emptySet());
+        fireOnAssignment(Collections.emptySet());
     }
 
     public Pattern subscribedPattern() {
@@ -230,13 +233,7 @@ public class SubscriptionState {
     }
 
     public Set<TopicPartition> pausedPartitions() {
-        HashSet<TopicPartition> paused = new HashSet<>();
-        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates())
{
-            if (state.value().paused) {
-                paused.add(state.topicPartition());
-            }
-        }
-        return paused;
+        return collectPartitions(TopicPartitionState::isPaused, Collectors.toSet());
     }
 
     /**
@@ -280,12 +277,7 @@ public class SubscriptionState {
     }
 
     public List<TopicPartition> fetchablePartitions() {
-        List<TopicPartition> fetchable = new ArrayList<>(assignment.size());
-        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates())
{
-            if (state.value().isFetchable())
-                fetchable.add(state.topicPartition());
-        }
-        return fetchable;
+        return collectPartitions(TopicPartitionState::isFetchable, Collectors.toList());
     }
 
     public boolean partitionsAutoAssigned() {
@@ -327,10 +319,10 @@ public class SubscriptionState {
 
     public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
         Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
-        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates())
{
+        assignment.stream().forEach(state -> {
             if (state.value().hasValidPosition())
                 allConsumed.put(state.topicPartition(), new OffsetAndMetadata(state.value().position));
-        }
+        });
         return allConsumed;
     }
 
@@ -361,25 +353,23 @@ public class SubscriptionState {
     }
 
     public boolean hasAllFetchPositions() {
-        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates())
{
-            if (!state.value().hasValidPosition())
-                return false;
-        }
-        return true;
+        return assignment.stream().allMatch(state -> state.value().hasValidPosition());
     }
 
     public Set<TopicPartition> missingFetchPositions() {
-        Set<TopicPartition> missing = new HashSet<>();
-        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates())
{
-            if (state.value().isMissingPosition())
-                missing.add(state.topicPartition());
-        }
-        return missing;
+        return collectPartitions(TopicPartitionState::isMissingPosition, Collectors.toSet());
+    }
+
+    private <T extends Collection<TopicPartition>> T collectPartitions(Predicate<TopicPartitionState>
filter, Collector<TopicPartition, ?, T> collector) {
+        return assignment.stream()
+                .filter(state -> filter.test(state.value()))
+                .map(PartitionStates.PartitionState::topicPartition)
+                .collect(collector);
     }
 
     public void resetMissingPositions() {
         final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
-        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates())
{
+        assignment.stream().forEach(state -> {
             TopicPartition tp = state.topicPartition();
             TopicPartitionState partitionState = state.value();
             if (partitionState.isMissingPosition()) {
@@ -388,20 +378,15 @@ public class SubscriptionState {
                 else
                     partitionState.reset(defaultResetStrategy);
             }
-        }
+        });
 
         if (!partitionsWithNoOffsets.isEmpty())
             throw new NoOffsetForPartitionException(partitionsWithNoOffsets);
     }
 
     public Set<TopicPartition> partitionsNeedingReset(long nowMs) {
-        Set<TopicPartition> partitions = new HashSet<>();
-        for (PartitionStates.PartitionState<TopicPartitionState> state : assignment.partitionStates())
{
-            TopicPartitionState partitionState = state.value();
-            if (partitionState.awaitingReset() && partitionState.isResetAllowed(nowMs))
-                partitions.add(state.topicPartition());
-        }
-        return partitions;
+        return collectPartitions(state -> state.awaitingReset() && state.isResetAllowed(nowMs),
+                Collectors.toSet());
     }
 
     public boolean isAssigned(TopicPartition tp) {
@@ -506,6 +491,10 @@ public class SubscriptionState {
             return !hasValidPosition() && !awaitingReset();
         }
 
+        private boolean isPaused() {
+            return paused;
+        }
+
         private void seek(long offset) {
             this.position = offset;
             this.resetStrategy = null;
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
index 5339a28..22e183a 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
+++ b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Stream;
 
 /**
  * This class is a useful building block for doing fetch requests where topic partitions
have to be rotated via
@@ -95,6 +96,10 @@ public class PartitionStates<S> {
         return result;
     }
 
+    public Stream<PartitionState<S>> stream() {
+        return map.entrySet().stream().map(entry -> new PartitionState<>(entry.getKey(),
entry.getValue()));
+    }
+
     public LinkedHashMap<TopicPartition, S> partitionStateMap() {
         return new LinkedHashMap<>(map);
     }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 02158fa..342a392 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -35,6 +35,7 @@ import scala.collection.{Map, Set, mutable}
 import scala.collection.JavaConverters._
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicLong
+import java.util.function.Consumer
 
 import com.yammer.metrics.core.Gauge
 import kafka.log.LogAppendInfo
@@ -43,7 +44,6 @@ import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
 import org.apache.kafka.common.requests._
 
-
 import scala.math._
 
 /**
@@ -145,18 +145,20 @@ abstract class AbstractFetcherThread(name: String,
     val partitionsWithoutEpochs = mutable.Set.empty[TopicPartition]
     val partitionsWithEpochs = mutable.Map.empty[TopicPartition, EpochData]
 
-    partitionStates.partitionStates.asScala.foreach { state =>
-      val tp = state.topicPartition
-      if (state.value.isTruncating) {
-        latestEpoch(tp) match {
-          case Some(latestEpoch) =>
-            val partitionData = new EpochData(Optional.of(state.value.currentLeaderEpoch),
latestEpoch)
-            partitionsWithEpochs += tp -> partitionData
-          case None =>
-            partitionsWithoutEpochs += tp
+    partitionStates.stream().forEach(new Consumer[PartitionStates.PartitionState[PartitionFetchState]]
{
+      override def accept(state: PartitionStates.PartitionState[PartitionFetchState]): Unit
= {
+        val tp = state.topicPartition
+        if (state.value.isTruncating) {
+          latestEpoch(tp) match {
+            case Some(latestEpoch) =>
+              val partitionData = new EpochData(Optional.of(state.value.currentLeaderEpoch),
latestEpoch)
+              partitionsWithEpochs += tp -> partitionData
+            case None =>
+              partitionsWithoutEpochs += tp
+          }
         }
       }
-    }
+    })
 
     debug(s"Build leaderEpoch request $partitionsWithEpochs")
     ResultWithPartitions(partitionsWithEpochs, partitionsWithoutEpochs)


Mime
View raw message