kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5756; Connect WorkerSourceTask synchronization issue on flush
Date Wed, 06 Sep 2017 18:26:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 fc3eeb004 -> d93bd1aff


KAFKA-5756; Connect WorkerSourceTask synchronization issue on flush

Author: oleg <oleg@nexla.com>

Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #3702 from oleg-smith/KAFKA-5756

(cherry picked from commit 51025764601760684f3dbdc0171fd7aa2ebe70e7)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d93bd1af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d93bd1af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d93bd1af

Branch: refs/heads/0.11.0
Commit: d93bd1aff96b418c22cdbf9af355c6185d1e9192
Parents: fc3eeb0
Author: oleg <oleg@nexla.com>
Authored: Wed Sep 6 11:12:23 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Wed Sep 6 11:25:26 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSourceTask.java |  2 +-
 .../connect/storage/OffsetStorageWriter.java    | 65 +++++++++++---------
 2 files changed, 36 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d93bd1af/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 5627145..163c7d0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -159,7 +159,7 @@ class WorkerSourceTask extends WorkerTask {
                 }
 
                 if (toSend == null) {
-                    log.debug("Nothing to send to Kafka. Polling source for additional records");
+                    log.trace("Nothing to send to Kafka. Polling source for additional records");
                     toSend = task.poll();
                 }
                 if (toSend == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d93bd1af/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index c5d1467..3239b67 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Future;
  * time.
  * </p>
  * <p>
- * This class is not thread-safe. It should only be accessed from a Task's processing thread.
+ * This class is thread-safe.
  * </p>
  */
 public class OffsetStorageWriter {
@@ -72,7 +72,6 @@ public class OffsetStorageWriter {
     // Offset data in Connect format
     private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>();
 
-    // Not synchronized, should only be accessed by flush thread
     private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
     // Unique ID for each flush request to handle callbacks after timeouts
     private long currentFlushId = 0;
@@ -129,44 +128,50 @@ public class OffsetStorageWriter {
      * @return a Future, or null if there are no offsets to commitOffsets
      */
     public Future<Void> doFlush(final Callback<Void> callback) {
-        final long flushId = currentFlushId;
 
+        final long flushId;
         // Serialize
-        Map<ByteBuffer, ByteBuffer> offsetsSerialized;
-        try {
-            offsetsSerialized = new HashMap<>();
-            for (Map.Entry<Map<String, Object>, Map<String, Object>> entry
: toFlush.entrySet()) {
-                // Offsets are specified as schemaless to the converter, using whatever internal
schema is appropriate
-                // for that data. The only enforcement of the format is here.
-                OffsetUtils.validateFormat(entry.getKey());
-                OffsetUtils.validateFormat(entry.getValue());
-                // When serializing the key, we add in the namespace information so the key
is [namespace, real key]
-                byte[] key = keyConverter.fromConnectData(namespace, null, Arrays.asList(namespace,
entry.getKey()));
-                ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
-                byte[] value = valueConverter.fromConnectData(namespace, null, entry.getValue());
-                ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
-                offsetsSerialized.put(keyBuffer, valueBuffer);
+        final Map<ByteBuffer, ByteBuffer> offsetsSerialized;
+
+        synchronized (this) {
+            flushId = currentFlushId;
+
+            try {
+                offsetsSerialized = new HashMap<>(toFlush.size());
+                for (Map.Entry<Map<String, Object>, Map<String, Object>>
entry : toFlush.entrySet()) {
+                    // Offsets are specified as schemaless to the converter, using whatever
internal schema is appropriate
+                    // for that data. The only enforcement of the format is here.
+                    OffsetUtils.validateFormat(entry.getKey());
+                    OffsetUtils.validateFormat(entry.getValue());
+                    // When serializing the key, we add in the namespace information so the
key is [namespace, real key]
+                    byte[] key = keyConverter.fromConnectData(namespace, null, Arrays.asList(namespace,
entry.getKey()));
+                    ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
+                    byte[] value = valueConverter.fromConnectData(namespace, null, entry.getValue());
+                    ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
+                    offsetsSerialized.put(keyBuffer, valueBuffer);
+                }
+            } catch (Throwable t) {
+                // Must handle errors properly here or the writer will be left mid-flush
forever and be
+                // unable to make progress.
+                log.error("CRITICAL: Failed to serialize offset data, making it impossible
to commit "
+                        + "offsets under namespace {}. This likely won't recover unless the
"
+                        + "unserializable partition or offset information is overwritten.",
namespace);
+                log.error("Cause of serialization failure:", t);
+                callback.onCompletion(t, null);
+                return null;
             }
-        } catch (Throwable t) {
-            // Must handle errors properly here or the writer will be left mid-flush forever
and be
-            // unable to make progress.
-            log.error("CRITICAL: Failed to serialize offset data, making it impossible to
commit "
-                    + "offsets under namespace {}. This likely won't recover unless the "
-                    + "unserializable partition or offset information is overwritten.", namespace);
-            log.error("Cause of serialization failure:", t);
-            callback.onCompletion(t, null);
-            return null;
+
+            // And submit the data
+            log.debug("Submitting {} entries to backing store. The offsets are: {}", offsetsSerialized.size(),
toFlush);
         }
 
-        // And submit the data
-        log.debug("Submitting {} entries to backing store", offsetsSerialized.size());
-        log.debug("The offsets are: " + toFlush.toString());
         return backingStore.set(offsetsSerialized, new Callback<Void>() {
             @Override
             public void onCompletion(Throwable error, Void result) {
                 boolean isCurrent = handleFinishWrite(flushId, error, result);
-                if (isCurrent && callback != null)
+                if (isCurrent && callback != null) {
                     callback.onCompletion(error, result);
+                }
             }
         });
     }


Mime
View raw message