kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 1.1 updated: KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)
Date Mon, 11 Jun 2018 22:33:36 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 63515f94 KAFKA-6906: Fixed to commit transactions if data is produced via wall clock
punctuation (#5105)
63515f94 is described below

commit 63515f94d63aa0e470854c85eff11f07ce2931b9
Author: Jagadesh Adireddi <adireddijagadesh@gmail.com>
AuthorDate: Tue Jun 12 03:10:03 2018 +0530

    KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation
(#5105)
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
Guozhang Wang <guozhang@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    | 19 +++++++-----
 .../processor/internals/StreamTaskTest.java        | 36 +++++++++++++++++++++-
 .../processor/internals/StreamThreadTest.java      |  4 +--
 3 files changed, 49 insertions(+), 10 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index a033043..4795a05 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -355,17 +355,22 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
                 if (eosEnabled) {
                     producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
-                    producer.commitTransaction();
-                    transactionInFlight = false;
-                    if (startNewTransaction) {
-                        producer.beginTransaction();
-                        transactionInFlight = true;
-                    }
                 } else {
                     consumer.commitSync(consumedOffsetsAndMetadata);
                 }
                 commitOffsetNeeded = false;
-            } else if (eosEnabled && !startNewTransaction && transactionInFlight)
{ // need to make sure to commit txn for suspend case
+            }
+
+            if (eosEnabled) {
+                producer.commitTransaction();
+                transactionInFlight = false;
+                if (startNewTransaction) {
+                    producer.beginTransaction();
+                    transactionInFlight = true;
+                }
+            }
+
+            if (eosEnabled && !startNewTransaction && transactionInFlight)
{ // need to make sure to commit txn for suspend case
                 producer.commitTransaction();
                 transactionInFlight = false;
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index d6a5276..8cd53f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -920,6 +921,7 @@ public class StreamTaskTest {
     @Test
     public void shouldCloseProducerOnCloseWhenEosEnabled() {
         task = createStatelessTask(true);
+        task.initializeTopology();
         task.close(true, false);
         task = null;
 
@@ -1030,6 +1032,39 @@ public class StreamTaskTest {
         assertThat(map, equalTo(Collections.singletonMap(repartition, 11L)));
     }
 
+    @Test
+    public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
+        task = createStatelessTask(createConfig(true));
+        try {
+            task.close(true, false);
+            fail("should have throw IllegalStateException");
+        } catch (final IllegalStateException expected) {
+            // pass
+        }
+        task = null;
+
+        assertTrue(producer.closed());
+    }
+
+    @Test
+    public void shouldAlwaysCommitIfEosEnabled() {
+        final RecordCollectorImpl recordCollector =  new RecordCollectorImpl(producer, "StreamTask",
+                new LogContext("StreamTaskTest "), new DefaultProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
+
+        task = createStatelessTask(createConfig(true));
+        task.initializeStateStores();
+        task.initializeTopology();
+        task.punctuate(processorSystemTime, 5, PunctuationType.WALL_CLOCK_TIME, new Punctuator()
{
+            @Override
+            public void punctuate(final long timestamp) {
+                recordCollector.send("result-topic1", 3, 5, null, 0, time.milliseconds(),
+                        new IntegerSerializer(),  new IntegerSerializer());
+            }
+        });
+        task.commit();
+        assertEquals(1, producer.history().size());
+    }
+
     private StreamTask createStatefulTask(final boolean eosEnabled, final boolean logged)
{
         final ProcessorTopology topology = ProcessorTopology.with(
                 Utils.<ProcessorNode>mkList(source1, source2),
@@ -1112,5 +1147,4 @@ public class StreamTaskTest {
     private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[],
byte[]>... recs) {
         return Arrays.asList(recs);
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 17159d9..a71aaad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -630,7 +630,7 @@ public class StreamThreadTest {
             new TestCondition() {
                 @Override
                 public boolean conditionMet() {
-                    return producer.commitCount() == 1;
+                    return producer.commitCount() == 2;
                 }
             },
             "StreamsThread did not commit transaction.");
@@ -651,7 +651,7 @@ public class StreamThreadTest {
             },
             "StreamsThread did not remove fenced zombie task.");
 
-        assertThat(producer.commitCount(), equalTo(1L));
+        assertThat(producer.commitCount(), equalTo(2L));
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message