kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 3.0 updated: KAFKA-12779: rename namedTopology in TaskId to topologyName (#11194)
Date Tue, 10 Aug 2021 00:49:15 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new e3bb30e  KAFKA-12779: rename namedTopology in TaskId to topologyName (#11194)
e3bb30e is described below

commit e3bb30eb6bdeba06b5f117bd902206b9fa037f27
Author: Walker Carlson <18128741+wcarlson5@users.noreply.github.com>
AuthorDate: Mon Aug 9 19:47:47 2021 -0500

    KAFKA-12779: rename namedTopology in TaskId to topologyName (#11194)
    
    Update to KIP-740.
    
    Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>,
Konstantine Karantasis <konstantine@confluent.io>, Israel Ekpo <israelekpo@gmail.com>
---
 .../org/apache/kafka/streams/processor/TaskId.java | 30 +++++++++++-----------
 .../processor/internals/StateDirectory.java        |  2 +-
 .../internals/StreamsPartitionAssignor.java        |  2 +-
 .../assignment/ConsumerProtocolUtils.java          | 16 ++++++------
 .../internals/assignment/SubscriptionInfo.java     |  6 ++---
 5 files changed, 28 insertions(+), 28 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index f4d8349..c718833 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -45,21 +45,21 @@ public class TaskId implements Comparable<TaskId> {
     public final int partition;
 
     /** The namedTopology that this task belongs to, or null if it does not belong to one
*/
-    private final String namedTopology;
+    private final String topologyName;
 
     public TaskId(final int topicGroupId, final int partition) {
         this(topicGroupId, partition, null);
     }
 
-    public TaskId(final int topicGroupId, final int partition, final String namedTopology)
{
+    public TaskId(final int topicGroupId, final int partition, final String topologyName)
{
         this.topicGroupId = topicGroupId;
         this.partition = partition;
-        if (namedTopology != null && namedTopology.length() == 0) {
+        if (topologyName != null && topologyName.length() == 0) {
             LOG.warn("Empty string passed in for task's namedTopology, since NamedTopology
name cannot be empty, we "
                          + "assume this task does not belong to a NamedTopology and downgrade
this to null");
-            this.namedTopology = null;
+            this.topologyName = null;
         } else {
-            this.namedTopology = namedTopology;
+            this.topologyName = topologyName;
         }
     }
 
@@ -74,13 +74,13 @@ public class TaskId implements Comparable<TaskId> {
     /**
      * Experimental feature -- will return null
      */
-    public String namedTopology() {
-        return namedTopology;
+    public String topologyName() {
+        return topologyName;
     }
 
     @Override
     public String toString() {
-        return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition
: topicGroupId + "_" + partition;
+        return topologyName != null ? topologyName + "_" + topicGroupId + "_" + partition
: topicGroupId + "_" + partition;
     }
 
     /**
@@ -160,26 +160,26 @@ public class TaskId implements Comparable<TaskId> {
             return false;
         }
 
-        if (namedTopology != null && taskId.namedTopology != null) {
-            return namedTopology.equals(taskId.namedTopology);
+        if (topologyName != null && taskId.topologyName != null) {
+            return topologyName.equals(taskId.topologyName);
         } else {
-            return namedTopology == null && taskId.namedTopology == null;
+            return topologyName == null && taskId.topologyName == null;
         }
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(topicGroupId, partition, namedTopology);
+        return Objects.hash(topicGroupId, partition, topologyName);
     }
 
     @Override
     public int compareTo(final TaskId other) {
-        if (namedTopology != null && other.namedTopology != null) {
-            final int comparingNamedTopologies = namedTopology.compareTo(other.namedTopology);
+        if (topologyName != null && other.topologyName != null) {
+            final int comparingNamedTopologies = topologyName.compareTo(other.topologyName);
             if (comparingNamedTopologies != 0) {
                 return comparingNamedTopologies;
             }
-        } else if (namedTopology != null || other.namedTopology != null) {
+        } else if (topologyName != null || other.topologyName != null) {
             LOG.error("Tried to compare this = {} with other = {}, but only one had a valid
named topology", this, other);
             throw new IllegalStateException("Can't compare a TaskId with a namedTopology
to one without");
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 4cc6f98..a710ac0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -259,7 +259,7 @@ public class StateDirectory {
     }
 
     private File getTaskDirectoryParentName(final TaskId taskId) {
-        final String namedTopology = taskId.namedTopology();
+        final String namedTopology = taskId.topologyName();
         if (namedTopology != null) {
             if (!hasNamedTopologies) {
                 throw new IllegalStateException("Tried to lookup taskId with named topology,
but StateDirectory thinks hasNamedTopologies = false");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 2767429..630a431 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -515,7 +515,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor,
Conf
             }
             allAssignedPartitions.addAll(partitions);
 
-            tasksForTopicGroup.computeIfAbsent(new Subtopology(id.subtopology(), id.namedTopology()),
k -> new HashSet<>()).add(id);
+            tasksForTopicGroup.computeIfAbsent(new Subtopology(id.subtopology(), id.topologyName()),
k -> new HashSet<>()).add(id);
         }
 
         checkAllPartitions(allSourceTopics, partitionsForTask, allAssignedPartitions, fullMetadata);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConsumerProtocolUtils.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConsumerProtocolUtils.java
index 467dbfa..8dbedb8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConsumerProtocolUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConsumerProtocolUtils.java
@@ -38,13 +38,13 @@ public class ConsumerProtocolUtils {
         out.writeInt(taskId.subtopology());
         out.writeInt(taskId.partition());
         if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
-            if (taskId.namedTopology() != null) {
-                out.writeInt(taskId.namedTopology().length());
-                out.writeChars(taskId.namedTopology());
+            if (taskId.topologyName() != null) {
+                out.writeInt(taskId.topologyName().length());
+                out.writeChars(taskId.topologyName());
             } else {
                 out.writeInt(0);
             }
-        } else if (taskId.namedTopology() != null) {
+        } else if (taskId.topologyName() != null) {
             throw new TaskAssignmentException("Named topologies are not compatible with protocol
version " + version);
         }
     }
@@ -73,15 +73,15 @@ public class ConsumerProtocolUtils {
         buf.putInt(taskId.subtopology());
         buf.putInt(taskId.partition());
         if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
-            if (taskId.namedTopology() != null) {
-                buf.putInt(taskId.namedTopology().length());
-                for (final char c : taskId.namedTopology().toCharArray()) {
+            if (taskId.topologyName() != null) {
+                buf.putInt(taskId.topologyName().length());
+                for (final char c : taskId.topologyName().toCharArray()) {
                     buf.putChar(c);
                 }
             } else {
                 buf.putInt(0);
             }
-        } else if (taskId.namedTopology() != null) {
+        } else if (taskId.topologyName() != null) {
             throw new TaskAssignmentException("Named topologies are not compatible with protocol
version " + version);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index cf9df3c..58d2dbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -136,7 +136,7 @@ public class SubscriptionInfo {
             final TaskId task = t.getKey();
             taskOffsetSum.setTopicGroupId(task.subtopology());
             taskOffsetSum.setPartition(task.partition());
-            taskOffsetSum.setNamedTopology(task.namedTopology());
+            taskOffsetSum.setNamedTopology(task.topologyName());
             taskOffsetSum.setOffsetSum(t.getValue());
             return taskOffsetSum;
         }).collect(Collectors.toList()));
@@ -147,7 +147,7 @@ public class SubscriptionInfo {
         final Map<Integer, List<SubscriptionInfoData.PartitionToOffsetSum>> topicGroupIdToPartitionOffsetSum
= new HashMap<>();
         for (final Map.Entry<TaskId, Long> taskEntry : taskOffsetSums.entrySet()) {
             final TaskId task = taskEntry.getKey();
-            if (task.namedTopology() != null) {
+            if (task.topologyName() != null) {
                 throw new TaskAssignmentException("Named topologies are not compatible with
older protocol versions");
             }
             topicGroupIdToPartitionOffsetSum.computeIfAbsent(task.subtopology(), t ->
new ArrayList<>()).add(
@@ -170,7 +170,7 @@ public class SubscriptionInfo {
         final Set<TaskId> standbyTasks = new HashSet<>();
 
         for (final Map.Entry<TaskId, Long> taskOffsetSum : taskOffsetSums.entrySet())
{
-            if (taskOffsetSum.getKey().namedTopology() != null) {
+            if (taskOffsetSum.getKey().topologyName() != null) {
                 throw new TaskAssignmentException("Named topologies are not compatible with
older protocol versions");
             }
             if (taskOffsetSum.getValue() == Task.LATEST_OFFSET) {

Mime
View raw message