kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chia7...@apache.org
Subject [kafka] branch trunk updated: KAFKA-12662: add unit test for ProducerPerformance (#10588)
Date Thu, 17 Jun 2021 12:08:19 GMT
This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 580c111  KAFKA-12662: add unit test for ProducerPerformance (#10588)
580c111 is described below

commit 580c1112582ed6ddaca6c0402a1bb014c36235e5
Author: CHUN-HAO TANG <tang7526@gmail.com>
AuthorDate: Thu Jun 17 20:07:12 2021 +0800

    KAFKA-12662: add unit test for ProducerPerformance (#10588)
    
    Reviewers: Luke Chen <showuon@gmail.com>, wenbingshen <oliver.shen999@gmail.com>,
dengziming <dengziming1993@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
---
 build.gradle                                       |   2 +-
 gradle/dependencies.gradle                         |   1 +
 .../apache/kafka/tools/ProducerPerformance.java    | 110 +++++++++------
 .../kafka/tools/ProducerPerformanceTest.java       | 157 +++++++++++++++++++++
 4 files changed, 227 insertions(+), 43 deletions(-)

diff --git a/build.gradle b/build.gradle
index e579e7a..11f0218 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1582,7 +1582,7 @@ project(':tools') {
     testImplementation libs.junitJupiter
     testImplementation project(':clients').sourceSets.test.output
     testImplementation libs.mockitoInline // supports mocking static methods, final classes,
etc.
-
+    testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
     testRuntimeOnly libs.slf4jlog4j
   }
 
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 069d3f6..82d6798 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -173,6 +173,7 @@ libs += [
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
   mockitoCore: "org.mockito:mockito-core:$versions.mockito",
   mockitoInline: "org.mockito:mockito-inline:$versions.mockito",
+  mockitoJunitJupiter: "org.mockito:mockito-junit-jupiter:$versions.mockito",
   nettyHandler: "io.netty:netty-handler:$versions.netty",
   nettyTransportNativeEpoll: "io.netty:netty-transport-native-epoll:$versions.netty",
   powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock",
diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 0ddff32..1f47bbc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools;
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -46,6 +47,11 @@ import org.apache.kafka.common.utils.Utils;
 public class ProducerPerformance {
 
     public static void main(String[] args) throws Exception {
+        ProducerPerformance perf = new ProducerPerformance();
+        perf.start(args);
+    }
+    
+    void start(String[] args) throws IOException {
         ArgumentParser parser = argParser();
 
         try {
@@ -71,41 +77,11 @@ public class ProducerPerformance {
                 throw new ArgumentParserException("Either --producer-props or --producer.config
must be specified.", parser);
             }
 
-            List<byte[]> payloadByteList = new ArrayList<>();
-            if (payloadFilePath != null) {
-                Path path = Paths.get(payloadFilePath);
-                System.out.println("Reading payloads from: " + path.toAbsolutePath());
-                if (Files.notExists(path) || Files.size(path) == 0)  {
-                    throw new  IllegalArgumentException("File does not exist or empty file
provided.");
-                }
-
-                String[] payloadList = new String(Files.readAllBytes(path), StandardCharsets.UTF_8).split(payloadDelimiter);
-
-                System.out.println("Number of messages read: " + payloadList.length);
+            List<byte[]> payloadByteList = readPayloadFile(payloadFilePath, payloadDelimiter);
 
-                for (String payload : payloadList) {
-                    payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
-                }
-            }
+            Properties props = readProps(producerProps, producerConfig, transactionalId,
transactionsEnabled);
 
-            Properties props = new Properties();
-            if (producerConfig != null) {
-                props.putAll(Utils.loadProps(producerConfig));
-            }
-            if (producerProps != null)
-                for (String prop : producerProps) {
-                    String[] pieces = prop.split("=");
-                    if (pieces.length != 2)
-                        throw new IllegalArgumentException("Invalid property: " + prop);
-                    props.put(pieces[0], pieces[1]);
-                }
-
-            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-            if (transactionsEnabled)
-                props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-
-            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+            KafkaProducer<byte[], byte[]> producer = createKafkaProducer(props);
 
             if (transactionsEnabled)
                 producer.initTransactions();
@@ -126,14 +102,7 @@ public class ProducerPerformance {
             long transactionStartTime = 0;
             for (long i = 0; i < numRecords; i++) {
 
-                if (payloadFilePath != null) {
-                    payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
-                } else if (recordSize != null) {
-                    for (int j = 0; j < payload.length; ++j)
-                        payload[j] = (byte) (random.nextInt(26) + 65);
-                } else {
-                    throw new IllegalArgumentException("no payload File Path or record Size
provided");
-                }
+                payload = generateRandomPayload(recordSize, payloadByteList, payload, random);
 
                 if (transactionsEnabled && currentTransactionSize == 0) {
                     producer.beginTransaction();
@@ -190,8 +159,65 @@ public class ProducerPerformance {
 
     }
 
+    KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList,
byte[] payload,
+            Random random) {
+        if (!payloadByteList.isEmpty()) {
+            payload = payloadByteList.get(random.nextInt(payloadByteList.size()));
+        } else if (recordSize != null) {
+            for (int j = 0; j < payload.length; ++j)
+                payload[j] = (byte) (random.nextInt(26) + 65);
+        } else {
+            throw new IllegalArgumentException("no payload File Path or record Size provided");
+        }
+        return payload;
+    }
+    
+    static Properties readProps(List<String> producerProps, String producerConfig,
String transactionalId,
+            boolean transactionsEnabled) throws IOException {
+        Properties props = new Properties();
+        if (producerConfig != null) {
+            props.putAll(Utils.loadProps(producerConfig));
+        }
+        if (producerProps != null)
+            for (String prop : producerProps) {
+                String[] pieces = prop.split("=");
+                if (pieces.length != 2)
+                    throw new IllegalArgumentException("Invalid property: " + prop);
+                props.put(pieces[0], pieces[1]);
+            }
+
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        if (transactionsEnabled) props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        return props;
+    }
+
+    static List<byte[]> readPayloadFile(String payloadFilePath, String payloadDelimiter)
throws IOException {
+        List<byte[]> payloadByteList = new ArrayList<>();
+        if (payloadFilePath != null) {
+            Path path = Paths.get(payloadFilePath);
+            System.out.println("Reading payloads from: " + path.toAbsolutePath());
+            if (Files.notExists(path) || Files.size(path) == 0)  {
+                throw new IllegalArgumentException("File does not exist or empty file provided.");
+            }
+
+            String[] payloadList = new String(Files.readAllBytes(path), StandardCharsets.UTF_8).split(payloadDelimiter);
+
+            System.out.println("Number of messages read: " + payloadList.length);
+
+            for (String payload : payloadList) {
+                payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
+            }
+        }
+        return payloadByteList;
+    }
+
     /** Get the command-line argument parser. */
-    private static ArgumentParser argParser() {
+    static ArgumentParser argParser() {
         ArgumentParser parser = ArgumentParsers
                 .newArgumentParser("producer-performance")
                 .defaultHelp(true)
diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
new file mode 100644
index 0000000..be037bd
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+public class ProducerPerformanceTest {
+
+    @Mock
+    KafkaProducer<byte[], byte[]> producerMock;
+
+    @Spy
+    ProducerPerformance producerPerformanceSpy;
+
+    private File createTempFile(String contents) throws IOException {
+        File file = File.createTempFile("ProducerPerformanceTest", ".tmp");
+        file.deleteOnExit();
+        Files.write(file.toPath(), contents.getBytes());
+        return file;
+    }
+
+    @Test
+    public void testReadPayloadFile() throws Exception {
+        File payloadFile = createTempFile("Hello\nKafka");
+        String payloadFilePath = payloadFile.getAbsolutePath();
+        String payloadDelimiter = "\n";
+
+        List<byte[]> payloadByteList = ProducerPerformance.readPayloadFile(payloadFilePath,
payloadDelimiter);
+
+        assertEquals(2, payloadByteList.size());
+        assertEquals("Hello", new String(payloadByteList.get(0)));
+        assertEquals("Kafka", new String(payloadByteList.get(1)));
+    }
+
+    @Test
+    public void testReadProps() throws Exception {
+        List<String> producerProps = Collections.singletonList("bootstrap.servers=localhost:9000");
+        String producerConfig = createTempFile("acks=1").getAbsolutePath();
+        String transactionalId = "1234";
+        boolean transactionsEnabled = true;
+
+        Properties prop = ProducerPerformance.readProps(producerProps, producerConfig, transactionalId,
transactionsEnabled);
+
+        assertNotNull(prop);
+        assertEquals(5, prop.size());
+    }
+
+    @Test
+    public void testNumberOfCallsForSendAndClose() throws IOException {
+        doReturn(null).when(producerMock).send(any(), any());
+        doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
+
+        String[] args = new String[] {
+            "--topic", "Hello-Kafka", 
+            "--num-records", "5", 
+            "--throughput", "100", 
+            "--record-size", "100", 
+            "--producer-props", "bootstrap.servers=localhost:9000"};
+        producerPerformanceSpy.start(args);
+        verify(producerMock, times(5)).send(any(), any());
+        verify(producerMock, times(1)).close();
+    }
+
+    @Test
+    public void testUnexpectedArg() {
+        String[] args = new String[] {
+            "--test", "test", 
+            "--topic", "Hello-Kafka", 
+            "--num-records", "5", 
+            "--throughput", "100", 
+            "--record-size", "100", 
+            "--producer-props", "bootstrap.servers=localhost:9000"};
+        ArgumentParser parser = ProducerPerformance.argParser();
+        ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () ->
parser.parseArgs(args));
+        assertEquals("unrecognized arguments: '--test'", thrown.getMessage());
+    }
+
+    @Test
+    public void testGenerateRandomPayloadByPayloadFile() {
+        Integer recordSize = null;
+        String inputString = "Hello Kafka";
+        byte[] byteArray = inputString.getBytes(StandardCharsets.UTF_8);
+        List<byte[]> payloadByteList = new ArrayList<>();
+        payloadByteList.add(byteArray);
+        byte[] payload = null;
+        Random random = new Random(0);
+
+        payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList,
payload, random);
+        assertEquals(inputString, new String(payload));
+    }
+
+    @Test
+    public void testGenerateRandomPayloadByRecordSize() {
+        Integer recordSize = 100;
+        byte[] payload = new byte[recordSize];
+        List<byte[]> payloadByteList = new ArrayList<>();
+        Random random = new Random(0);
+
+        payload = ProducerPerformance.generateRandomPayload(recordSize, payloadByteList,
payload, random);
+        for (byte b : payload) {
+            assertNotEquals(0, b);
+        }
+    }
+
+    @Test
+    public void testGenerateRandomPayloadException() {
+        Integer recordSize = null;
+        byte[] payload = null;
+        List<byte[]> payloadByteList = new ArrayList<>();
+        Random random = new Random(0);
+
+        IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class, ()
-> ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload, random));
+        assertEquals("no payload File Path or record Size provided", thrown.getMessage());
+    }
+}

Mime
View raw message