kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkaranta...@apache.org
Subject [kafka] 02/02: KAFKA-10602: Make RetryWithToleranceOperator thread safe (#9422)
Date Thu, 15 Oct 2020 19:20:31 GMT
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e9b645bdd41795e6ab7c32a104614e605cd13053
Author: Tom Bentley <tombentley@users.noreply.github.com>
AuthorDate: Thu Oct 15 19:54:46 2020 +0100

    KAFKA-10602: Make RetryWithToleranceOperator thread safe (#9422)
    
    ErrantRecordReporter uses a RetryWithToleranceOperator instance, which is necessarily
stateful, having a ProcessingContext of which there's supposed to be one per task. That ProcessingContext
is used by both RetryWithToleranceOperator.executeFailed() and execute(), so it's not enough
to just synchronize executeFailed().
    
    So make all public methods of RetryWithToleranceOperator synchronized so that RetryWithToleranceOperator
is now threadsafe.
    
    Tested with the addition of a multithreaded test case that fails consistently if the methods
are not properly synchronized.
    
    Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
---
 .../runtime/errors/RetryWithToleranceOperator.java | 32 ++++---
 .../errors/RetryWithToleranceOperatorTest.java     | 99 ++++++++++++++++++++++
 2 files changed, 120 insertions(+), 11 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 4fb633d..ce4c1e2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -51,6 +51,9 @@ import java.util.concurrent.ThreadLocalRandom;
  * set appropriate error reason in the {@link ProcessingContext} and return null, or (2)
if the exception is not tolerated,
  * then it is wrapped into a ConnectException and rethrown to the caller.
  * <p>
+ *
+ * Instances of this class are thread safe.
+ * <p>
  */
 public class RetryWithToleranceOperator implements AutoCloseable {
 
@@ -74,17 +77,24 @@ public class RetryWithToleranceOperator implements AutoCloseable {
     private final Time time;
     private ErrorHandlingMetrics errorHandlingMetrics;
 
-    protected ProcessingContext context = new ProcessingContext();
+    protected final ProcessingContext context;
 
     public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
                                       ToleranceType toleranceType, Time time) {
+        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, new ProcessingContext());
+    }
+
+    RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
+                               ToleranceType toleranceType, Time time,
+                               ProcessingContext context) {
         this.errorRetryTimeout = errorRetryTimeout;
         this.errorMaxDelayInMillis = errorMaxDelayInMillis;
         this.errorToleranceType = toleranceType;
         this.time = time;
+        this.context = context;
     }
 
-    public Future<Void> executeFailed(Stage stage, Class<?> executingClass,
+    public synchronized Future<Void> executeFailed(Stage stage, Class<?> executingClass,
                                       ConsumerRecord<byte[], byte[]> consumerRecord,
                                       Throwable error) {
 
@@ -109,7 +119,7 @@ public class RetryWithToleranceOperator implements AutoCloseable {
      * @param <V> return type of the result of the operation.
      * @return result of the operation
      */
-    public <V> V execute(Operation<V> operation, Stage stage, Class<?>
executingClass) {
+    public synchronized <V> V execute(Operation<V> operation, Stage stage, Class<?>
executingClass) {
         context.currentContext(stage, executingClass);
 
         if (context.failed()) {
@@ -208,7 +218,7 @@ public class RetryWithToleranceOperator implements AutoCloseable {
     }
 
     @SuppressWarnings("fallthrough")
-    public boolean withinToleranceLimits() {
+    public synchronized boolean withinToleranceLimits() {
         switch (errorToleranceType) {
             case NONE:
                 if (totalFailures > 0) return false;
@@ -238,7 +248,7 @@ public class RetryWithToleranceOperator implements AutoCloseable {
         time.sleep(delay);
     }
 
-    public void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
+    public synchronized void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
         this.errorHandlingMetrics = errorHandlingMetrics;
     }
 
@@ -259,7 +269,7 @@ public class RetryWithToleranceOperator implements AutoCloseable {
      *
      * @param reporters the error reporters (should not be null).
      */
-    public void reporters(List<ErrorReporter> reporters) {
+    public synchronized void reporters(List<ErrorReporter> reporters) {
         this.context.reporters(reporters);
     }
 
@@ -268,7 +278,7 @@ public class RetryWithToleranceOperator implements AutoCloseable {
      *
      * @param preTransformRecord the source record
      */
-    public void sourceRecord(SourceRecord preTransformRecord) {
+    public synchronized void sourceRecord(SourceRecord preTransformRecord) {
         this.context.sourceRecord(preTransformRecord);
     }
 
@@ -277,14 +287,14 @@ public class RetryWithToleranceOperator implements AutoCloseable {
      *
      * @param consumedMessage the record
      */
-    public void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage) {
+    public synchronized void consumerRecord(ConsumerRecord<byte[], byte[]> consumedMessage)
{
         this.context.consumerRecord(consumedMessage);
     }
 
     /**
      * @return true, if the last operation encountered an error; false otherwise
      */
-    public boolean failed() {
+    public synchronized boolean failed() {
         return this.context.failed();
     }
 
@@ -293,12 +303,12 @@ public class RetryWithToleranceOperator implements AutoCloseable {
      *
      * @return the error encountered when processing the current stage
      */
-    public Throwable error() {
+    public synchronized Throwable error() {
         return this.context.error();
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
         this.context.close();
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index f4c4299..9e44321 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -41,8 +41,17 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
@@ -342,6 +351,96 @@ public class RetryWithToleranceOperatorTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testThreadSafety() throws Throwable {
+        long runtimeMs = 5_000;
+        int numThreads = 10;
+        // Check that multiple threads using RetryWithToleranceOperator concurrently
+        // can't corrupt the state of the ProcessingContext
+        AtomicReference<Throwable> failed = new AtomicReference<>(null);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0,
+                ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, new ProcessingContext() {
+                    private AtomicInteger count = new AtomicInteger();
+                    private AtomicInteger attempt = new AtomicInteger();
+
+                    @Override
+                    public void error(Throwable error) {
+                        if (count.getAndIncrement() > 0) {
+                            failed.compareAndSet(null, new AssertionError("Concurrent call
to error()"));
+                        }
+                        super.error(error);
+                    }
+
+                    @Override
+                    public Future<Void> report() {
+                        if (count.getAndSet(0) > 1) {
+                            failed.compareAndSet(null, new AssertionError("Concurrent call
to error() in report()"));
+                        }
+
+                        return super.report();
+                    }
+
+                    @Override
+                    public void currentContext(Stage stage, Class<?> klass) {
+                        this.attempt.set(0);
+                        super.currentContext(stage, klass);
+                    }
+
+                    @Override
+                    public void attempt(int attempt) {
+                        if (!this.attempt.compareAndSet(attempt - 1, attempt)) {
+                            failed.compareAndSet(null, new AssertionError(
+                                    "Concurrent call to attempt(): Attempts should increase
monotonically " +
+                                            "within the scope of a given currentContext()"));
+                        }
+                        super.attempt(attempt);
+                    }
+                });
+        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+        ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+        List<? extends Future<?>> futures = IntStream.range(0, numThreads).boxed()
+                .map(id ->
+                        pool.submit(() -> {
+                            long t0 = System.currentTimeMillis();
+                            long i = 0;
+                            while (true) {
+                                if (++i % 10000 == 0 && System.currentTimeMillis()
> t0 + runtimeMs) {
+                                    break;
+                                }
+                                if (failed.get() != null) {
+                                    break;
+                                }
+                                try {
+                                    if (id < numThreads / 2) {
+                                        retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
+                                                SinkTask.class, consumerRecord, new Throwable()).get();
+                                    } else {
+                                        retryWithToleranceOperator.execute(() -> null,
Stage.TRANSFORMATION,
+                                                SinkTask.class);
+                                    }
+                                } catch (Exception e) {
+                                    failed.compareAndSet(null, e);
+                                }
+                            }
+                        }))
+                .collect(Collectors.toList());
+        pool.shutdown();
+        pool.awaitTermination((long) (1.5 * runtimeMs), TimeUnit.MILLISECONDS);
+        futures.forEach(future -> {
+            try {
+                future.get();
+            } catch (Exception e) {
+                failed.compareAndSet(null, e);
+            }
+        });
+        Throwable exception = failed.get();
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+
     private static class ExceptionThrower implements Operation<Object> {
         private Exception e;
 


Mime
View raw message