kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject kafka git commit: KAFKA-5516: Formatting verifiable producer/consumer output in a similar fashion
Date Mon, 07 Aug 2017 15:10:46 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2010aa067 -> f15cdc73d


KAFKA-5516: Formatting verifiable producer/consumer output in a similar fashion

Author: ppatierno <ppatierno@live.com>
Author: Paolo Patierno <ppatierno@live.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3434 from ppatierno/verifiable-consumer-producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f15cdc73
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f15cdc73
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f15cdc73

Branch: refs/heads/trunk
Commit: f15cdc73ddd06e0bf0f319b0d8ad8f63e80cf28d
Parents: 2010aa0
Author: ppatierno <ppatierno@live.com>
Authored: Mon Aug 7 08:10:42 2017 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Mon Aug 7 08:10:42 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/tools/VerifiableConsumer.java  |   3 +
 .../apache/kafka/tools/VerifiableProducer.java  | 370 ++++++++++++-------
 2 files changed, 241 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdc73/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 5364720..d926638 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -18,6 +18,7 @@ package org.apache.kafka.tools;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonSerializer;
@@ -256,6 +257,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback,
Cons
         }
     }
 
+    @JsonPropertyOrder({ "timestamp", "name" })
     private static abstract class ConsumerEvent {
         private final long timestamp = System.currentTimeMillis();
 
@@ -345,6 +347,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback,
Cons
         }
     }
 
+    @JsonPropertyOrder({ "timestamp", "name", "key", "value", "topic", "partition", "offset"
})
     public static class RecordData extends ConsumerEvent {
 
         private final ConsumerRecord<String, String> record;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f15cdc73/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 5ce6aba..1924755 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.tools;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -30,8 +32,6 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -40,6 +40,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Exit;
 
 /**
@@ -57,8 +58,9 @@ import org.apache.kafka.common.utils.Exit;
  */
 public class VerifiableProducer {
 
-    String topic;
-    private Producer<String, String> producer;
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final String topic;
+    private final Producer<String, String> producer;
     // If maxMessages < 0, produce until the process is killed externally
     private long maxMessages = -1;
 
@@ -69,22 +71,21 @@ public class VerifiableProducer {
     private long numSent = 0;
 
     // Throttle message throughput if this is set >= 0
-    private long throughput;
+    private final long throughput;
 
     // Hook to trigger producing thread to stop sending messages
     private boolean stopProducing = false;
 
     // Prefix (plus a dot separator) added to every value produced by verifiable producer
     // if null, then values are produced without a prefix
-    private Integer valuePrefix;
+    private final Integer valuePrefix;
 
-    public VerifiableProducer(
-            Properties producerProps, String topic, int throughput, int maxMessages, Integer
valuePrefix) {
+    public VerifiableProducer(KafkaProducer<String, String> producer, String topic,
int throughput, int maxMessages, Integer valuePrefix) {
 
         this.topic = topic;
         this.throughput = throughput;
         this.maxMessages = maxMessages;
-        this.producer = new KafkaProducer<>(producerProps);
+        this.producer = producer;
         this.valuePrefix = valuePrefix;
     }
 
@@ -134,7 +135,7 @@ public class VerifiableProducer {
                 .type(Integer.class)
                 .choices(0, 1, -1)
                 .metavar("ACKS")
-                .help("Acks required on each produced message. See Kafka docs on request.required.acks
for details.");
+                .help("Acks required on each produced message. See Kafka docs on acks for
details.");
 
         parser.addArgument("--producer.config")
                 .action(store())
@@ -157,9 +158,9 @@ public class VerifiableProducer {
     /**
      * Read a properties file from the given path
      * @param filename The path of the file to read
-     *                 
-     * Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate

-     * but *intentional*. In order to use VerifiableProducer in compatibility and upgrade
tests, 
+     *
+     * Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate
+     * but *intentional*. In order to use VerifiableProducer in compatibility and upgrade
tests,
      * we use VerifiableProducer from the development tools package, and run it against 0.8.X.X
kafka jars.
      * Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and
duplicate.
      */
@@ -170,63 +171,50 @@ public class VerifiableProducer {
         }
         return props;
     }
-    
-    /** Construct a VerifiableProducer object from command-line arguments. */
-    public static VerifiableProducer createFromArgs(String[] args) {
-        ArgumentParser parser = argParser();
-        VerifiableProducer producer = null;
-
-        try {
-            Namespace res;
-            res = parser.parseArgs(args);
-
-            int maxMessages = res.getInt("maxMessages");
-            String topic = res.getString("topic");
-            int throughput = res.getInt("throughput");
-            String configFile = res.getString("producer.config");
-            Integer valuePrefix = res.getInt("valuePrefix");
-
-            Properties producerProps = new Properties();
-            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
-            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                              "org.apache.kafka.common.serialization.StringSerializer");
-            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                              "org.apache.kafka.common.serialization.StringSerializer");
-            producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
-            // No producer retries
-            producerProps.put("retries", "0");
-            if (configFile != null) {
-                try {
-                    producerProps.putAll(loadProps(configFile));
-                } catch (IOException e) {
-                    throw new ArgumentParserException(e.getMessage(), parser);
-                }
-            }
 
-            producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages,
valuePrefix);
-        } catch (ArgumentParserException e) {
-            if (args.length == 0) {
-                parser.printHelp();
-                Exit.exit(0);
-            } else {
-                parser.handleError(e);
-                Exit.exit(1);
+    /** Construct a VerifiableProducer object from command-line arguments. */
+    public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] args)
throws ArgumentParserException {
+        Namespace res = parser.parseArgs(args);
+
+        int maxMessages = res.getInt("maxMessages");
+        String topic = res.getString("topic");
+        int throughput = res.getInt("throughput");
+        String configFile = res.getString("producer.config");
+        Integer valuePrefix = res.getInt("valuePrefix");
+
+        Properties producerProps = new Properties();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
+        // No producer retries
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, "0");
+        if (configFile != null) {
+            try {
+                producerProps.putAll(loadProps(configFile));
+            } catch (IOException e) {
+                throw new ArgumentParserException(e.getMessage(), parser);
             }
         }
 
-        return producer;
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps,
serializer, serializer);
+
+        return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix);
     }
 
     /** Produce a message with given key and value. */
     public void send(String key, String value) {
-        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
key, value);
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key,
value);
         numSent++;
         try {
             producer.send(record, new PrintInfoCallback(key, value));
         } catch (Exception e) {
 
             synchronized (System.out) {
-                System.out.println(errorString(e, key, value, System.currentTimeMillis()));
+                printJson(new FailedSend(key, value, topic, e));
             }
         }
     }
@@ -242,66 +230,174 @@ public class VerifiableProducer {
     /** Close the producer to flush any remaining messages. */
     public void close() {
         producer.close();
-        System.out.println(shutdownString());
+        printJson(new ShutdownComplete());
     }
 
-    String shutdownString() {
-        Map<String, Object> data = new HashMap<>();
-        data.put("name", "shutdown_complete");
-        return toJsonString(data);
+    @JsonPropertyOrder({ "timestamp", "name" })
+    private static abstract class ProducerEvent {
+        private final long timestamp = System.currentTimeMillis();
+
+        @JsonProperty
+        public abstract String name();
+
+        @JsonProperty
+        public long timestamp() {
+            return timestamp;
+        }
     }
 
-    String startupString() {
-        Map<String, Object> data = new HashMap<>();
-        data.put("name", "startup_complete");
-        return toJsonString(data);
+    private static class StartupComplete extends ProducerEvent {
+
+        @Override
+        public String name() {
+            return "startup_complete";
+        }
     }
 
-    /**
-     * Return JSON string encapsulating basic information about the exception, as well
-     * as the key and value which triggered the exception.
-     */
-    String errorString(Exception e, String key, String value, Long nowMs) {
-        assert e != null : "Expected non-null exception.";
+    private static class ShutdownComplete extends ProducerEvent {
 
-        Map<String, Object> errorData = new HashMap<>();
-        errorData.put("name", "producer_send_error");
+        @Override
+        public String name() {
+            return "shutdown_complete";
+        }
+    }
 
-        errorData.put("time_ms", nowMs);
-        errorData.put("exception", e.getClass().toString());
-        errorData.put("message", e.getMessage());
-        errorData.put("topic", this.topic);
-        errorData.put("key", key);
-        errorData.put("value", value);
+    private static class SuccessfulSend extends ProducerEvent {
 
-        return toJsonString(errorData);
+        private String key;
+        private String value;
+        private RecordMetadata recordMetadata;
+
+        public SuccessfulSend(String key, String value, RecordMetadata recordMetadata) {
+            assert recordMetadata != null : "Expected non-null recordMetadata object.";
+            this.key = key;
+            this.value = value;
+            this.recordMetadata = recordMetadata;
+        }
+
+        @Override
+        public String name() {
+            return "producer_send_success";
+        }
+
+        @JsonProperty
+        public String key() {
+            return key;
+        }
+
+        @JsonProperty
+        public String value() {
+            return value;
+        }
+
+        @JsonProperty
+        public String topic() {
+            return recordMetadata.topic();
+        }
+
+        @JsonProperty
+        public int partition() {
+            return recordMetadata.partition();
+        }
+
+        @JsonProperty
+        public long offset() {
+            return recordMetadata.offset();
+        }
     }
 
-    String successString(RecordMetadata recordMetadata, String key, String value, Long nowMs)
{
-        assert recordMetadata != null : "Expected non-null recordMetadata object.";
+    private static class FailedSend extends ProducerEvent {
 
-        Map<String, Object> successData = new HashMap<>();
-        successData.put("name", "producer_send_success");
+        private String topic;
+        private String key;
+        private String value;
+        private Exception exception;
 
-        successData.put("time_ms", nowMs);
-        successData.put("topic", this.topic);
-        successData.put("partition", recordMetadata.partition());
-        successData.put("offset", recordMetadata.offset());
-        successData.put("key", key);
-        successData.put("value", value);
+        public FailedSend(String key, String value, String topic, Exception exception) {
+            assert exception != null : "Expected non-null exception.";
+            this.key = key;
+            this.value = value;
+            this.topic = topic;
+            this.exception = exception;
+        }
 
-        return toJsonString(successData);
+        @Override
+        public String name() {
+            return "producer_send_error";
+        }
+
+        @JsonProperty
+        public String key() {
+            return key;
+        }
+
+        @JsonProperty
+        public String value() {
+            return value;
+        }
+
+        @JsonProperty
+        public String topic() {
+            return topic;
+        }
+
+        @JsonProperty
+        public String exception() {
+            return exception.getClass().toString();
+        }
+
+        @JsonProperty
+        public String message() {
+            return exception.getMessage();
+        }
+    }
+
+    private static class ToolData extends ProducerEvent {
+
+        private long sent;
+        private long acked;
+        private long targetThroughput;
+        private double avgThroughput;
+
+        public ToolData(long sent, long acked, long targetThroughput, double avgThroughput)
{
+            this.sent = sent;
+            this.acked = acked;
+            this.targetThroughput = targetThroughput;
+            this.avgThroughput = avgThroughput;
+        }
+
+        @Override
+        public String name() {
+            return "tool_data";
+        }
+
+        @JsonProperty
+        public long sent() {
+            return this.sent;
+        }
+
+        @JsonProperty
+        public long acked() {
+            return this.acked;
+        }
+
+        @JsonProperty("target_throughput")
+        public long targetThroughput() {
+            return this.targetThroughput;
+        }
+
+        @JsonProperty("avg_throughput")
+        public double avgThroughput() {
+            return this.avgThroughput;
+        }
     }
 
-    private String toJsonString(Map<String, Object> data) {
-        String json;
+    private void printJson(Object data) {
         try {
-            ObjectMapper mapper = new ObjectMapper();
-            json = mapper.writeValueAsString(data);
+            System.out.println(mapper.writeValueAsString(data));
         } catch (JsonProcessingException e) {
-            json = "Bad data can't be written as json: " + e.getMessage();
+            System.out.println("Bad data can't be written as json: " + e.getMessage());
         }
-        return json;
     }
 
     /** Callback which prints errors to stdout when the producer fails to send. */
@@ -319,54 +415,27 @@ public class VerifiableProducer {
             synchronized (System.out) {
                 if (e == null) {
                     VerifiableProducer.this.numAcked++;
-                    System.out.println(successString(recordMetadata, this.key, this.value,
System.currentTimeMillis()));
+                    printJson(new SuccessfulSend(this.key, this.value, recordMetadata));
                 } else {
-                    System.out.println(errorString(e, this.key, this.value, System.currentTimeMillis()));
+                    printJson(new FailedSend(this.key, this.value, topic, e));
                 }
             }
         }
     }
 
-    public static void main(String[] args) throws IOException {
-
-        final VerifiableProducer producer = createFromArgs(args);
-        final long startMs = System.currentTimeMillis();
-        boolean infinite = producer.maxMessages < 0;
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                // Trigger main thread to stop producing messages
-                producer.stopProducing = true;
+    public void run(ThroughputThrottler throttler) {
 
-                // Flush any remaining messages
-                producer.close();
+        printJson(new StartupComplete());
+        // negative maxMessages (-1) means "infinite"
+        long maxMessages = (this.maxMessages < 0) ? Long.MAX_VALUE : this.maxMessages;
 
-                // Print a summary
-                long stopMs = System.currentTimeMillis();
-                double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
-
-                Map<String, Object> data = new HashMap<>();
-                data.put("name", "tool_data");
-                data.put("sent", producer.numSent);
-                data.put("acked", producer.numAcked);
-                data.put("target_throughput", producer.throughput);
-                data.put("avg_throughput", avgThroughput);
-
-                System.out.println(producer.toJsonString(data));
-            }
-        });
-
-        ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
-        System.out.println(producer.startupString());
-        long maxMessages = infinite ? Long.MAX_VALUE : producer.maxMessages;
         for (long i = 0; i < maxMessages; i++) {
-            if (producer.stopProducing) {
+            if (this.stopProducing) {
                 break;
             }
             long sendStartMs = System.currentTimeMillis();
 
-            producer.send(null, producer.getValue(i));
+            this.send(null, this.getValue(i));
 
             if (throttler.shouldThrottle(i, sendStartMs)) {
                 throttler.throttle();
@@ -374,4 +443,41 @@ public class VerifiableProducer {
         }
     }
 
+    public static void main(String[] args) {
+        ArgumentParser parser = argParser();
+        if (args.length == 0) {
+            parser.printHelp();
+            Exit.exit(0);
+        }
+
+        try {
+            final VerifiableProducer producer = createFromArgs(parser, args);
+
+            final long startMs = System.currentTimeMillis();
+            ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput,
startMs);
+
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    // Trigger main thread to stop producing messages
+                    producer.stopProducing = true;
+
+                    // Flush any remaining messages
+                    producer.close();
+
+                    // Print a summary
+                    long stopMs = System.currentTimeMillis();
+                    double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs
- startMs));
+
+                    producer.printJson(new ToolData(producer.numSent, producer.numAcked,
producer.throughput, avgThroughput));
+                }
+            });
+
+            producer.run(throttler);
+        } catch (ArgumentParserException e) {
+            parser.handleError(e);
+            Exit.exit(1);
+        }
+    }
+
 }


Mime
View raw message