kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gwens...@apache.org
Subject kafka git commit: KAFKA-2964: Split Security Rolling Upgrade Test by Client and Broker Protocols
Date Fri, 18 Dec 2015 02:02:53 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a2a417caf -> 9220df9f8


KAFKA-2964: Split Security Rolling Upgrade Test by Client and Broker Protocols

The core of this PR is to ensure we evaluate enabling security in a running cluster where
we have different broker and client protocols.
Also in this PR are some improvements to the validation process in produce_consume_validate.py
which make it easier to work out where missing messages have been lost:
- Fail fast if producer or consumer stop running.
- If messages go missing, check in the data files to see if the cause was data loss or the
consumer missing messages.
- Make it possible for the ConsoleConsumer to log both what it consumed and when it consumed
it (and enable this feature in produce_consume_validate tests)

Author: Ben Stopford <benstopford@gmail.com>

Reviewers: Gwen Shapira, Geoff Anderson

Closes #667 from benstopford/security-rolling_upgrade-additions


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9220df9f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9220df9f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9220df9f

Branch: refs/heads/trunk
Commit: 9220df9f8b58870a2282d3e4ceb2e003667d854b
Parents: a2a417c
Author: Ben Stopford <benstopford@gmail.com>
Authored: Thu Dec 17 18:02:38 2015 -0800
Committer: Gwen Shapira <cshapi@gmail.com>
Committed: Thu Dec 17 18:02:38 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 14 +++-
 tests/kafkatest/services/console_consumer.py    |  5 ++
 tests/kafkatest/services/kafka/kafka.py         | 33 +++++++-
 .../kafkatest/tests/produce_consume_validate.py | 80 +++++++++++++-------
 .../tests/security_rolling_upgrade_test.py      | 37 +++++----
 5 files changed, 119 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9220df9f/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index d560b72..73743aa 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -29,6 +29,7 @@ import kafka.utils._
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.utils.Utils
+import org.apache.log4j.Logger
 
 import scala.collection.JavaConversions._
 
@@ -333,7 +334,7 @@ object ConsoleConsumer extends Logging {
   }
 }
 
-trait MessageFormatter {
+trait MessageFormatter{
   def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
 
   def init(props: Properties) {}
@@ -365,6 +366,17 @@ class DefaultMessageFormatter extends MessageFormatter {
   }
 }
 
+class LoggingMessageFormatter extends MessageFormatter   {
+  private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
+  val logger = Logger.getLogger(getClass().getName)
+
+  def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream): Unit = {
+    defaultWriter.writeTo(key, value, output)
+    if(logger.isInfoEnabled)
+      logger.info(s"key:${if (key == null) "null" else new String(key)}, value:${if (value
== null) "null" else new String(value)}")
+  }
+}
+
 class NoOpMessageFormatter extends MessageFormatter {
   override def init(props: Properties) {}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9220df9f/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index b8ad8ab..4df42b7 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -135,6 +135,8 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
         self.client_id = client_id
         self.print_key = print_key
+        self.log_values = True if version == TRUNK else False
+        self.log_level = "TRACE"
 
     def prop_file(self, node):
         """Return a string which can be used to create a configuration file appropriate for
the given node."""
@@ -190,6 +192,9 @@ class ConsoleConsumer(JmxMixin, BackgroundThreadService):
         if self.print_key:
             cmd += " --property print.key=true"
 
+        if self.log_values:
+            cmd+=" --formatter kafka.tools.LoggingMessageFormatter"
+
         cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9220df9f/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index 4760065..9a9feda 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -114,7 +114,7 @@ class KafkaService(JmxMixin, Service):
         self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=False)
 
     def start_minikdc(self, add_principals=""):
-        if self.security_config.has_sasl_kerberos:
+        if self.security_config.has_sasl:
             if self.minikdc is None:
                 self.minikdc = MiniKdc(self.context, self.nodes, extra_principals = add_principals)
                 self.minikdc.start()
@@ -357,6 +357,37 @@ class KafkaService(JmxMixin, Service):
         self.logger.debug("Verify partition reassignment:")
         self.logger.debug(output)
 
+    def search_data_files(self, topic, messages):
+        """Check if a set of messages made it into the Kakfa data files. Note that
+        this method takes no account of replication. It simply looks for the
+        payload in all the partition files of the specified topic. 'messages' should be
+        an array of numbers. The list of missing messages is returned.
+        """
+        payload_match = "payload: " + "$|payload: ".join(str(x) for x in messages) + "$"
+        found = set([])
+
+        for node in self.nodes:
+            # Grab all .log files in directories prefixed with this topic
+            files = node.account.ssh_capture("find %s -regex  '.*/%s-.*/[^/]*.log'" % (KafkaService.DATA_LOG_DIR,
topic))
+
+            # Check each data file to see if it contains the messages we want
+            for log in files:
+                cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log
--files %s " \
+                      "| grep -E \"%s\"" % (kafka_dir(node), log.strip(), payload_match)
+
+                for line in node.account.ssh_capture(cmd, allow_fail=True):
+                    for val in messages:
+                        if line.strip().endswith("payload: "+str(val)):
+                            self.logger.debug("Found %s in data-file [%s] in line: [%s]"
% (val, log.strip(), line.strip()))
+                            found.add(val)
+
+        missing = list(set(messages) - found)
+
+        if len(missing) > 0:
+            self.logger.warn("The following values were not found in the data files: " +
str(missing))
+
+        return missing
+
     def restart_node(self, node, clean_shutdown=True):
         """Restart the given node."""
         self.stop_node(node, clean_shutdown)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9220df9f/tests/kafkatest/tests/produce_consume_validate.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py
index 00988da..f2da000 100644
--- a/tests/kafkatest/tests/produce_consume_validate.py
+++ b/tests/kafkatest/tests/produce_consume_validate.py
@@ -41,19 +41,25 @@ class ProduceConsumeValidateTest(Test):
         wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=30,
              err_msg="Consumer failed to start in a reasonable amount of time.")
 
-    def stop_producer_and_consumer(self):
+    def check_alive(self):
+        msg = ""
         for node in self.consumer.nodes:
             if not self.consumer.alive(node):
-                self.logger.warn("Consumer on %s is not alive and probably should be." %
str(node.account))
+                msg = "The consumer has terminated, or timed out, on node %s." % str(node.account)
         for node in self.producer.nodes:
             if not self.producer.alive(node):
-                self.logger.warn("Producer on %s is not alive and probably should be." %
str(node.account))
+                msg += "The producer has terminated, or timed out, on node %s." % str(node.account)
+        if len(msg) > 0:
+            raise Exception(msg)
 
-        # Check that producer is still successfully producing
+    def check_producing(self):
         currently_acked = self.producer.num_acked
         wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=30,
-             err_msg="Expected producer to still be producing.")
+                   err_msg="Expected producer to still be producing.")
 
+    def stop_producer_and_consumer(self):
+        self.check_alive()
+        self.check_producing()
         self.producer.stop()
         self.consumer.wait()
 
@@ -72,40 +78,58 @@ class ProduceConsumeValidateTest(Test):
                 self.mark_for_collect(s)
             raise e
 
+    @staticmethod
+    def annotate_missing_msgs(missing, acked, consumed, msg):
+        msg += "%s acked message did not make it to the Consumer. They are: " % len(missing)
+        if len(missing) < 20:
+            msg += str(missing) + ". "
+        else:
+            for i in range(20):
+                msg += str(missing.pop()) + ", "
+            msg += "...plus %s more. Total Acked: %s, Total Consumed: %s. " \
+                   % (len(missing) - 20, len(set(acked)), len(set(consumed)))
+        return msg
+
+    @staticmethod
+    def annotate_data_lost(data_lost, msg, number_validated):
+        print_limit = 10
+        if len(data_lost) > 0:
+            msg += "The first %s missing messages were validated to ensure they are in Kafka's
data files. " \
+                   "%s were missing. This suggests data loss. Here are some of the messages
not found in the data files: %s\n" \
+                   % (number_validated, len(data_lost), str(data_lost[0:print_limit]) if
len(data_lost) > print_limit else str(data_lost))
+        else:
+            msg += "We validated that the first %s of these missing messages correctly made
it into Kafka's data files. " \
+                   "This suggests they were lost on their way to the consumer." % number_validated
+        return msg
+
     def validate(self):
         """Check that each acked message was consumed."""
-
-        self.acked = self.producer.acked
-        self.not_acked = self.producer.not_acked
-
-        # Check produced vs consumed
-        self.consumed = self.consumer.messages_consumed[1]
-        self.logger.info("num consumed:  %d" % len(self.consumed))
-
         success = True
         msg = ""
+        acked = self.producer.acked
+        consumed = self.consumer.messages_consumed[1]
+        missing = set(acked) - set(consumed)
 
-        if len(set(self.consumed)) != len(self.consumed):
-            # There are duplicates. This is ok, so report it but don't fail the test
-            msg += "There are duplicate messages in the log\n"
+        self.logger.info("num consumed:  %d" % len(consumed))
 
-        if not set(self.consumed).issuperset(set(self.acked)):
-            # Every acked message must appear in the logs. I.e. consumed messages must be
superset of acked messages.
-            acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
+        # Were all acked messages consumed?
+        if len(missing) > 0:
+            msg = self.annotate_missing_msgs(missing, acked, consumed, msg)
             success = False
 
-            msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed:
"
-            if len(acked_minus_consumed) < 20:
-                msg += str(acked_minus_consumed)
-            else:
-                for i in range(20):
-                    msg += str(acked_minus_consumed.pop()) + ", "
-                msg += "...plus " + str(len(acked_minus_consumed) - 20) + " more"
+            #Did we miss anything due to data loss?
+            to_validate = list(missing)[0:1000 if len(missing) > 1000 else len(missing)]
+            data_lost = self.kafka.search_data_files(self.topic, to_validate)
+            msg = self.annotate_data_lost(data_lost, msg, len(to_validate))
+
 
-        # collect all logs if validation fails
+        # Are there duplicates?
+        if len(set(consumed)) != len(consumed):
+            msg += "(There are also %s duplicate messages in the log - but that is an acceptable
outcome)\n" % abs(len(set(consumed)) - len(consumed))
+
+        # Collect all logs if validation fails
         if not success:
             for s in self.test_context.services:
                 self.mark_for_collect(s)
 
         assert success, msg
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/9220df9f/tests/kafkatest/tests/security_rolling_upgrade_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/security_rolling_upgrade_test.py b/tests/kafkatest/tests/security_rolling_upgrade_test.py
index 1acf58b..bf01b8a 100644
--- a/tests/kafkatest/tests/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/security_rolling_upgrade_test.py
@@ -43,9 +43,6 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
             'configs': {"min.insync.replicas": 2}}})
         self.zk.start()
 
-        #reduce replica.lag.time.max.ms due to KAFKA-2827
-        self.kafka.replica_lag = 2000
-
     def create_producer_and_consumer(self):
         self.producer = VerifiableProducer(
             self.test_context, self.num_producers, self.kafka, self.topic,
@@ -58,32 +55,32 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.consumer.group_id = "unique-test-group-" + str(random.random())
 
     def bounce(self):
-        #Sleeps reduce the intermittent failures reported in KAFKA-2891. Should be removed
once resolved.
+        self.kafka.start_minikdc()
         for node in self.kafka.nodes:
             self.kafka.stop_node(node)
-            time.sleep(10)
             self.kafka.start_node(node)
             time.sleep(10)
 
-    def roll_in_secured_settings(self, upgrade_protocol):
-        self.kafka.interbroker_security_protocol = upgrade_protocol
+    def roll_in_secured_settings(self, client_protocol, broker_protocol):
 
         # Roll cluster to include inter broker security protocol.
-        self.kafka.open_port(upgrade_protocol)
+        self.kafka.interbroker_security_protocol = broker_protocol
+        self.kafka.open_port(client_protocol)
+        self.kafka.open_port(broker_protocol)
         self.bounce()
 
         # Roll cluster to disable PLAINTEXT port
         self.kafka.close_port('PLAINTEXT')
         self.bounce()
 
-    def open_secured_port(self, upgrade_protocol):
-        self.kafka.security_protocol = upgrade_protocol
-        self.kafka.open_port(upgrade_protocol)
+    def open_secured_port(self, client_protocol):
+        self.kafka.security_protocol = client_protocol
+        self.kafka.open_port(client_protocol)
         self.kafka.start_minikdc()
         self.bounce()
 
-    @matrix(upgrade_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
-    def test_rolling_upgrade_phase_one(self, upgrade_protocol):
+    @matrix(client_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
+    def test_rolling_upgrade_phase_one(self, client_protocol):
         """
         Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring
we could produce
         and consume throughout over PLAINTEXT. Finally check we can produce and consume the
new secured port.
@@ -92,19 +89,19 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.kafka.security_protocol = "PLAINTEXT"
         self.kafka.start()
 
-        #Create PLAINTEXT producer and consumer
+        # Create PLAINTEXT producer and consumer
         self.create_producer_and_consumer()
 
         # Rolling upgrade, opening a secure protocol, ensuring the Plaintext producer/consumer
continues to run
-        self.run_produce_consume_validate(self.open_secured_port, upgrade_protocol)
+        self.run_produce_consume_validate(self.open_secured_port, client_protocol)
 
         # Now we can produce and consume via the secured port
-        self.kafka.security_protocol = upgrade_protocol
+        self.kafka.security_protocol = client_protocol
         self.create_producer_and_consumer()
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
-    @matrix(upgrade_protocol=["SSL", "SASL_PLAINTEXT", "SASL_SSL"])
-    def test_rolling_upgrade_phase_two(self, upgrade_protocol):
+    @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL",
"SSL", "SASL_PLAINTEXT"])
+    def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
         """
         Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase
one).
         Start an Producer and Consumer via the SECURED port
@@ -113,7 +110,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         Ensure the producer and consumer ran throughout
         """
         #Given we have a broker that has both secure and PLAINTEXT ports open
-        self.kafka.security_protocol = upgrade_protocol
+        self.kafka.security_protocol = client_protocol
         self.kafka.interbroker_security_protocol = "PLAINTEXT"
         self.kafka.start()
 
@@ -121,4 +118,4 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.create_producer_and_consumer()
 
         #Roll in the security protocol. Disable Plaintext. Ensure we can produce and Consume
throughout
-        self.run_produce_consume_validate(self.roll_in_secured_settings, upgrade_protocol)
+        self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol,
broker_protocol)


Mime
View raw message