kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-4673: Fix thread-safety of Python VerifiableConsumer class
Date Tue, 24 Jan 2017 21:19:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 d436fa2ca -> 027455146


KAFKA-4673: Fix thread-safety of Python VerifiableConsumer class

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #2427 from ewencp/kafka-4673-verifiable-consumer-thread-safety

(cherry picked from commit 9b8d99b1b866a90b07c8a9a61fe79a75d01ae272)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/02745514
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/02745514
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/02745514

Branch: refs/heads/0.10.1
Commit: 02745514629a2c8c8956a9e741fa1a1020e4eb71
Parents: d436fa2
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Tue Jan 24 13:19:03 2017 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Tue Jan 24 13:19:34 2017 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/verifiable_consumer.py | 19 +++++++++++--------
 1 file changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/02745514/tests/kafkatest/services/verifiable_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py
index 9c6abdd..28175e2 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -160,10 +160,11 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
             node.version = version
 
     def _worker(self, idx, node):
-        if node not in self.event_handlers:
-            self.event_handlers[node] = ConsumerEventHandler(node)
+        with self.lock:
+            if node not in self.event_handlers:
+                self.event_handlers[node] = ConsumerEventHandler(node)
+            handler = self.event_handlers[node]
 
-        handler = self.event_handlers[node]
         node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)
 
         # Create and upload log properties
@@ -266,7 +267,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
         for pid in self.pids(node):
             node.account.signal(pid, sig, allow_fail)
 
-        self.event_handlers[node].handle_kill_process(clean_shutdown)
+        with self.lock:
+            self.event_handlers[node].handle_kill_process(clean_shutdown)
 
     def stop_node(self, node, clean_shutdown=True):
         self.kill_node(node, clean_shutdown=clean_shutdown)
@@ -292,10 +294,11 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
                 return None
 
     def owner(self, tp):
-        for handler in self.event_handlers.itervalues():
-            if tp in handler.current_assignment():
-                return handler.node
-        return None
+        with self.lock:
+            for handler in self.event_handlers.itervalues():
+                if tp in handler.current_assignment():
+                    return handler.node
+            return None
 
     def last_commit(self, tp):
         with self.lock:


Mime
View raw message