kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223)
Date Mon, 09 Sep 2019 23:42:12 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 29514bd  KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223)
29514bd is described below

commit 29514bd45c8c2eadfb1bee112917ec0bbc68f884
Author: cpettitt-confluent <53191309+cpettitt-confluent@users.noreply.github.com>
AuthorDate: Mon Aug 26 13:59:49 2019 -0700

    KAFKA-8816: Make offsets immutable to users of RecordCollector.offsets (#7223)
    
    Make offsets immutable to users of RecordCollector.offsets. Fix up an
    existing case where offsets could be modified in this way. Add a simple
    test to verify offsets cannot be changed externally.
    
    Reviewers: Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confluent.io>,
Matthias J. Sax <matthias@confluent.io>
---
 .../processor/internals/RecordCollector.java       |  2 +-
 .../processor/internals/RecordCollectorImpl.java   |  3 ++-
 .../streams/processor/internals/StreamTask.java    |  3 ++-
 .../processor/internals/RecordCollectorTest.java   | 28 ++++++++++++++++++++++
 4 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index bbfb049..b8b99a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -63,7 +63,7 @@ public interface RecordCollector extends AutoCloseable {
     /**
      * The last acked offsets from the internal {@link Producer}.
      *
-     * @return the map from TopicPartition to offset
+     * @return an immutable map from TopicPartition to offset
      */
     Map<TopicPartition, Long> offsets();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 2e9ead8..45dda41 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.Collections;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -281,7 +282,7 @@ public class RecordCollectorImpl implements RecordCollector {
 
     @Override
     public Map<TopicPartition, Long> offsets() {
-        return offsets;
+        return Collections.unmodifiableMap(offsets);
     }
 
     // for testing only
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 3ab2524..067b36c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -504,7 +504,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
     @Override
     protected Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
-        final Map<TopicPartition, Long> checkpointableOffsets = recordCollector.offsets();
+        final Map<TopicPartition, Long> checkpointableOffsets =
+            new HashMap<>(recordCollector.offsets());
         for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet())
{
             checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index a7da2cb..22fb2ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -50,7 +50,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -145,6 +148,31 @@ public class RecordCollectorTest {
         assertEquals((Long) 0L, offsets.get(new TopicPartition("topic1", 2)));
     }
 
+    @Test
+    public void shouldNotAllowOffsetsToBeUpdatedExternally() {
+        final String topic = "topic1";
+        final TopicPartition topicPartition = new TopicPartition(topic, 0);
+
+        final RecordCollectorImpl collector = new RecordCollectorImpl(
+            "RecordCollectorTest-TestSpecificPartition",
+            new LogContext("RecordCollectorTest-TestSpecificPartition "),
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records")
+        );
+        collector.init(new MockProducer<>(cluster, true, new DefaultPartitioner(),
byteArraySerializer, byteArraySerializer));
+
+        collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer);
+        collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer);
+        collector.send(topic, "999", "0", null, 0, null, stringSerializer, stringSerializer);
+
+        final Map<TopicPartition, Long> offsets = collector.offsets();
+
+        assertThat(offsets.get(topicPartition), equalTo(2L));
+        assertThrows(UnsupportedOperationException.class, () -> offsets.put(new TopicPartition(topic,
0), 50L));
+
+        assertThat(collector.offsets().get(topicPartition), equalTo(2L));
+    }
+
     @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {


Mime
View raw message