kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch trunk updated: MINOR: Update JUnit to 4.13 and annotate log cleaner integration test (#6248)
Date Tue, 12 Feb 2019 06:06:27 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7f99bc  MINOR: Update JUnit to 4.13 and annotate log cleaner integration test (#6248)
c7f99bc is described below

commit c7f99bc2bd9af5eb6ca9e20a02d5806c52d434b3
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Mon Feb 11 22:06:14 2019 -0800

    MINOR: Update JUnit to 4.13 and annotate log cleaner integration test (#6248)
    
    JUnit 4.13 fixes the issue where `Category` and `Parameterized` annotations
    could not be used together. It also deprecates `ExpectedException` and
    `assertThat`. Given this, we:
    
    - Replace `ExpectedException` with the newly introduced `assertThrows`.
    - Replace `Assert.assertThat` with `MatcherAssert.assertThat`.
    - Annotate `AbstractLogCleanerIntegrationTest` with `IntegrationTest` category.
    
    Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, David Arthur <mumrah@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  11 +-
 .../kafka/clients/consumer/MockConsumerTest.java   |   2 +-
 .../kafka/common/metrics/KafkaMbeanTest.java       |   2 +-
 .../apache/kafka/common/record/KafkaLZ4Test.java   |  49 +++----
 .../common/record/MemoryRecordsBuilderTest.java    | 159 +++++++++++----------
 .../kafka/common/record/MemoryRecordsTest.java     | 140 +++++++++---------
 .../org/apache/kafka/connect/data/StructTest.java  |  36 +++--
 .../log/AbstractLogCleanerIntegrationTest.scala    |   3 +
 gradle/dependencies.gradle                         |   2 +-
 .../apache/kafka/streams/StreamsConfigTest.java    |   2 +-
 .../FineGrainedAutoResetIntegrationTest.java       |   2 +-
 .../integration/GlobalKTableIntegrationTest.java   |   2 +-
 .../integration/RegexSourceIntegrationTest.java    |   2 +-
 .../RepartitionOptimizingIntegrationTest.java      |   2 +-
 .../kafka/streams/internals/ApiUtilsTest.java      |   4 +-
 .../apache/kafka/streams/kstream/PrintedTest.java  |   2 +-
 .../kstream/RepartitionTopicNamingTest.java        |   2 +-
 .../internals/InternalStreamsBuilderTest.java      |   2 +-
 .../internals/KStreamFlatTransformTest.java        |   9 +-
 .../streams/kstream/internals/KStreamImplTest.java |  18 +--
 .../kstream/internals/KStreamKStreamJoinTest.java  |   2 +-
 .../kstream/internals/KTableSourceTest.java        |   2 +-
 .../internals/TransformerSupplierAdapterTest.java  |   2 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  |   2 +-
 .../internals/CompositeRestoreListenerTest.java    |   4 +-
 .../internals/GlobalProcessorContextImplTest.java  |   2 +-
 .../internals/InternalTopologyBuilderTest.java     |   2 +-
 .../processor/internals/ProcessorContextTest.java  |   2 +-
 .../processor/internals/StreamThreadTest.java      |   2 +-
 .../internals/StreamsPartitionAssignorTest.java    |   2 +-
 .../internals/metrics/StreamsMetricsImplTest.java  |   2 +-
 .../state/internals/AbstractKeyValueStoreTest.java |   2 +-
 .../state/internals/CachingKeyValueStoreTest.java  |   4 +-
 .../CompositeReadOnlyWindowStoreTest.java          |  15 +-
 .../state/internals/InMemoryLRUCacheStoreTest.java |   2 +-
 .../state/internals/MeteredKeyValueStoreTest.java  |   4 +-
 .../state/internals/MeteredSessionStoreTest.java   |   4 +-
 ...sToDbOptionsColumnFamilyOptionsAdapterTest.java |   2 +-
 .../internals/RocksDBSegmentedBytesStoreTest.java  |   7 +-
 .../streams/state/internals/RocksDBStoreTest.java  |   2 +-
 .../internals/RocksDBTimestampedStoreTest.java     |   2 +-
 .../kafka/streams/TopologyTestDriverTest.java      |   8 +-
 42 files changed, 251 insertions(+), 276 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 0f36781..138d206 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -78,9 +78,7 @@ import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.nio.ByteBuffer;
 import java.time.Duration;
@@ -115,6 +113,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -143,9 +142,6 @@ public class KafkaConsumerTest {
 
     private final String groupId = "mock-group";
 
-    @Rule
-    public ExpectedException expectedException = ExpectedException.none();
-
     @Test
     public void testMetricsReporterAutoGeneratedClientId() {
         Properties props = new Properties();
@@ -840,13 +836,12 @@ public class KafkaConsumerTest {
         // interrupt the thread and call poll
         try {
             Thread.currentThread().interrupt();
-            expectedException.expect(InterruptException.class);
-            consumer.poll(Duration.ZERO);
+            assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO));
         } finally {
             // clear interrupted state again since this thread may be reused by JUnit
             Thread.interrupted();
+            consumer.close(Duration.ofMillis(0));
         }
-        consumer.close(Duration.ofMillis(0));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 03013e6..aad4d29 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -27,9 +27,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 
 public class MockConsumerTest {
     
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
index 963a66a..dd494c8 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
@@ -33,8 +33,8 @@ import java.lang.management.ManagementFactory;
 import java.util.List;
 
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 public class KafkaMbeanTest {
diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
index 222599b..5e44c49 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
@@ -19,9 +19,7 @@ package org.apache.kafka.common.record;
 import net.jpountz.xxhash.XXHashFactory;
 
 import org.hamcrest.CoreMatchers;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -37,9 +35,11 @@ import java.util.List;
 import java.util.Random;
 
 import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(value = Parameterized.class)
@@ -71,9 +71,6 @@ public class KafkaLZ4Test {
         }
     }
 
-    @Rule
-    public ExpectedException thrown = ExpectedException.none();
-
     @Parameters(name = "{index} useBrokenFlagDescriptorChecksum={0}, ignoreFlagDescriptorChecksum={1}, blockChecksum={2}, close={3}, payload={4}")
     public static Collection<Object[]> data() {
         List<Payload> payloads = new ArrayList<>();
@@ -111,12 +108,10 @@ public class KafkaLZ4Test {
     }
 
     @Test
-    public void testHeaderPrematureEnd() throws Exception {
-        thrown.expect(IOException.class);
-        thrown.expectMessage(KafkaLZ4BlockInputStream.PREMATURE_EOS);
-
-        final ByteBuffer buffer = ByteBuffer.allocate(2);
-        makeInputStream(buffer);
+    public void testHeaderPrematureEnd() {
+        ByteBuffer buffer = ByteBuffer.allocate(2);
+        IOException e = assertThrows(IOException.class, () -> makeInputStream(buffer));
+        assertEquals(KafkaLZ4BlockInputStream.PREMATURE_EOS, e.getMessage());
     }
 
     private KafkaLZ4BlockInputStream makeInputStream(ByteBuffer buffer) throws IOException {
@@ -125,43 +120,41 @@ public class KafkaLZ4Test {
 
     @Test
     public void testNotSupported() throws Exception {
-        thrown.expect(IOException.class);
-        thrown.expectMessage(KafkaLZ4BlockInputStream.NOT_SUPPORTED);
-
         byte[] compressed = compressedBytes();
         compressed[0] = 0x00;
-
-        makeInputStream(ByteBuffer.wrap(compressed));
+        ByteBuffer buffer = ByteBuffer.wrap(compressed);
+        IOException e = assertThrows(IOException.class, () -> makeInputStream(buffer));
+        assertEquals(KafkaLZ4BlockInputStream.NOT_SUPPORTED, e.getMessage());
     }
 
     @Test
     public void testBadFrameChecksum() throws Exception {
-        if (!ignoreFlagDescriptorChecksum) {
-            thrown.expect(IOException.class);
-            thrown.expectMessage(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH);
-        }
-
         byte[] compressed = compressedBytes();
         compressed[6] = (byte) 0xFF;
+        ByteBuffer buffer = ByteBuffer.wrap(compressed);
 
-        makeInputStream(ByteBuffer.wrap(compressed));
+        if (ignoreFlagDescriptorChecksum) {
+            makeInputStream(buffer);
+        } else {
+            IOException e = assertThrows(IOException.class, () -> makeInputStream(buffer));
+            assertEquals(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH, e.getMessage());
+        }
     }
 
     @Test
     public void testBadBlockSize() throws Exception {
-        if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return;
-
-        thrown.expect(IOException.class);
-        thrown.expectMessage(CoreMatchers.containsString("exceeded max"));
+        if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum))
+            return;
 
         byte[] compressed = compressedBytes();
-        final ByteBuffer buffer = ByteBuffer.wrap(compressed).order(ByteOrder.LITTLE_ENDIAN);
+        ByteBuffer buffer = ByteBuffer.wrap(compressed).order(ByteOrder.LITTLE_ENDIAN);
 
         int blockSize = buffer.getInt(7);
         blockSize = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) | (1 << 24 & ~LZ4_FRAME_INCOMPRESSIBLE_MASK);
         buffer.putInt(7, blockSize);
 
-        testDecompression(buffer);
+        IOException e = assertThrows(IOException.class, () -> testDecompression(buffer));
+        assertThat(e.getMessage(), CoreMatchers.containsString("exceeded max"));
     }
 
 
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index c3560f9..522915f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -20,9 +20,7 @@ import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -30,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.function.Supplier;
 import java.util.List;
 import java.util.Random;
 
@@ -38,14 +37,13 @@ import static org.apache.kafka.common.utils.Utils.utf8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 @RunWith(value = Parameterized.class)
 public class MemoryRecordsBuilderTest {
-    @Rule
-    public ExpectedException exceptionRule = ExpectedException.none();
-
     private final CompressionType compressionType;
     private final int bufferOffset;
     private final Time time;
@@ -58,17 +56,25 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void testWriteEmptyRecordSet() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+        byte magic = RecordBatch.MAGIC_VALUE_V0;
+        assumeAtLeastV2OrNotZstd(magic);
 
         ByteBuffer buffer = ByteBuffer.allocate(128);
         buffer.position(bufferOffset);
 
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
-                TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
-                false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
-        MemoryRecords records = builder.build();
-        assertEquals(0, records.sizeInBytes());
-        assertEquals(bufferOffset, buffer.position());
+        Supplier<MemoryRecordsBuilder> builderSupplier = () -> new MemoryRecordsBuilder(buffer, magic,
+            compressionType, TimestampType.CREATE_TIME, 0L, 0L,
+            RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+            false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+
+        if (compressionType != CompressionType.ZSTD) {
+            MemoryRecords records = builderSupplier.get().build();
+            assertEquals(0, records.sizeInBytes());
+            assertEquals(bufferOffset, buffer.position());
+        } else {
+            Exception e = assertThrows(IllegalArgumentException.class, () -> builderSupplier.get().build());
+            assertEquals(e.getMessage(), "ZStandard compression is not supported for magic " + magic);
+        }
     }
 
     @Test
@@ -215,18 +221,19 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void testCompressionRateV0() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+        byte magic = RecordBatch.MAGIC_VALUE_V0;
+        assumeAtLeastV2OrNotZstd(magic);
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         buffer.position(bufferOffset);
 
         LegacyRecord[] records = new LegacyRecord[] {
-                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()),
-                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 1L, "b".getBytes(), "2".getBytes()),
-                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 2L, "c".getBytes(), "3".getBytes()),
+                LegacyRecord.create(magic, 0L, "a".getBytes(), "1".getBytes()),
+                LegacyRecord.create(magic, 1L, "b".getBytes(), "2".getBytes()),
+                LegacyRecord.create(magic, 2L, "c".getBytes(), "3".getBytes()),
         };
 
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
                 false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
 
@@ -272,18 +279,19 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void testCompressionRateV1() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
+        byte magic = RecordBatch.MAGIC_VALUE_V1;
+        assumeAtLeastV2OrNotZstd(magic);
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         buffer.position(bufferOffset);
 
         LegacyRecord[] records = new LegacyRecord[] {
-                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 0L, "a".getBytes(), "1".getBytes()),
-                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 1L, "b".getBytes(), "2".getBytes()),
-                LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 2L, "c".getBytes(), "3".getBytes()),
+                LegacyRecord.create(magic, 0L, "a".getBytes(), "1".getBytes()),
+                LegacyRecord.create(magic, 1L, "b".getBytes(), "2".getBytes()),
+                LegacyRecord.create(magic, 2L, "c".getBytes(), "3".getBytes()),
         };
 
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
                 false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
 
@@ -305,13 +313,14 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void buildUsingLogAppendTime() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
+        byte magic = RecordBatch.MAGIC_VALUE_V1;
+        assumeAtLeastV2OrNotZstd(magic);
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         buffer.position(bufferOffset);
 
         long logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
                 TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
                 RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
@@ -336,13 +345,14 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void buildUsingCreateTime() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
+        byte magic = RecordBatch.MAGIC_VALUE_V1;
+        assumeAtLeastV2OrNotZstd(magic);
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         buffer.position(bufferOffset);
 
         long logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
                 false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
@@ -369,7 +379,8 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void testAppendedChecksumConsistency() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+        assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V0);
+        assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V1);
 
         ByteBuffer buffer = ByteBuffer.allocate(512);
         for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
@@ -415,13 +426,14 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void writePastLimit() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
+        byte magic = RecordBatch.MAGIC_VALUE_V1;
+        assumeAtLeastV2OrNotZstd(magic);
 
         ByteBuffer buffer = ByteBuffer.allocate(64);
         buffer.position(bufferOffset);
 
         long logAppendTime = System.currentTimeMillis();
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
                 TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
                 false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.setEstimatedCompressionRatio(0.5f);
@@ -462,11 +474,6 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void convertV2ToV1UsingMixedCreateAndLogAppendTime() {
-        if (compressionType == CompressionType.ZSTD) {
-            exceptionRule.expect(UnsupportedCompressionTypeException.class);
-            exceptionRule.expectMessage("Down-conversion of zstandard-compressed batches is not supported");
-        }
-
         ByteBuffer buffer = ByteBuffer.allocate(512);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
                 compressionType, TimestampType.LOG_APPEND_TIME, 0L);
@@ -493,36 +500,44 @@ public class MemoryRecordsBuilderTest {
 
         buffer.flip();
 
-        ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer)
-                .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
-        MemoryRecords records = convertedRecords.records();
+        Supplier<ConvertedRecords<MemoryRecords>> convertedRecordsSupplier = () ->
+            MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
 
-        // Transactional markers are skipped when down converting to V1, so exclude them from size
-        verifyRecordsProcessingStats(convertedRecords.recordConversionStats(),
+        if (compressionType != CompressionType.ZSTD) {
+            ConvertedRecords<MemoryRecords> convertedRecords = convertedRecordsSupplier.get();
+            MemoryRecords records = convertedRecords.records();
+
+            // Transactional markers are skipped when down converting to V1, so exclude them from size
+            verifyRecordsProcessingStats(convertedRecords.recordConversionStats(),
                 3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers);
 
-        List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
-        if (compressionType != CompressionType.NONE) {
-            assertEquals(2, batches.size());
-            assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
-            assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
+            List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
+            if (compressionType != CompressionType.NONE) {
+                assertEquals(2, batches.size());
+                assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
+                assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
+            } else {
+                assertEquals(3, batches.size());
+                assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
+                assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
+                assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType());
+            }
+
+            List<Record> logRecords = Utils.toList(records.records().iterator());
+            assertEquals(3, logRecords.size());
+            assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key());
+            assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key());
+            assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
         } else {
-            assertEquals(3, batches.size());
-            assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
-            assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
-            assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType());
+            Exception e = assertThrows(UnsupportedCompressionTypeException.class, convertedRecordsSupplier::get);
+            assertEquals("Down-conversion of zstandard-compressed batches is not supported", e.getMessage());
         }
-
-        List<Record> logRecords = Utils.toList(records.records().iterator());
-        assertEquals(3, logRecords.size());
-        assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key());
-        assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key());
-        assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
     }
 
     @Test
     public void convertToV1WithMixedV0AndV2Data() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+        assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V0);
+        assumeAtLeastV2OrNotZstd(RecordBatch.MAGIC_VALUE_V1);
 
         ByteBuffer buffer = ByteBuffer.allocate(512);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0,
@@ -598,31 +613,28 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+        byte magic = RecordBatch.MAGIC_VALUE_V0;
+        assumeAtLeastV2OrNotZstd(magic);
 
         ByteBuffer buffer = ByteBuffer.allocate(128);
         buffer.position(bufferOffset);
 
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
                 RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.abort();
-        try {
-            builder.build();
-            fail("Should have thrown KafkaException");
-        } catch (IllegalStateException e) {
-            // ok
-        }
+        assertThrows(IllegalStateException.class, builder::build);
     }
 
     @Test
     public void shouldResetBufferToInitialPositionOnAbort() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+        byte magic = RecordBatch.MAGIC_VALUE_V0;
+        assumeAtLeastV2OrNotZstd(magic);
 
         ByteBuffer buffer = ByteBuffer.allocate(128);
         buffer.position(bufferOffset);
 
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
                                                                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
                                                                 false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.append(0L, "a".getBytes(), "1".getBytes());
@@ -632,12 +644,13 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+        byte magic = RecordBatch.MAGIC_VALUE_V0;
+        assumeAtLeastV2OrNotZstd(magic);
 
         ByteBuffer buffer = ByteBuffer.allocate(128);
         buffer.position(bufferOffset);
 
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
                                                                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
                                                                 false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.abort();
@@ -651,12 +664,13 @@ public class MemoryRecordsBuilderTest {
 
     @Test
     public void shouldThrowIllegalStateExceptionOnAppendWhenAborted() {
-        expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+        byte magic = RecordBatch.MAGIC_VALUE_V0;
+        assumeAtLeastV2OrNotZstd(magic);
 
         ByteBuffer buffer = ByteBuffer.allocate(128);
         buffer.position(bufferOffset);
 
-        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType,
                                                                 TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
                                                                 false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
         builder.abort();
@@ -734,10 +748,7 @@ public class MemoryRecordsBuilderTest {
         }
     }
 
-    private void expectExceptionWithZStd(CompressionType compressionType, byte magic) {
-        if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) {
-            exceptionRule.expect(IllegalArgumentException.class);
-            exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic);
-        }
+    private void assumeAtLeastV2OrNotZstd(byte magic) {
+        assumeTrue(compressionType != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2);
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 5f16acf..3d5a4f1 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -22,9 +22,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -32,6 +30,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.function.Supplier;
 import java.util.List;
 
 import static java.util.Arrays.asList;
@@ -40,14 +39,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 @RunWith(value = Parameterized.class)
 public class MemoryRecordsTest {
-    @Rule
-    public ExpectedException exceptionRule = ExpectedException.none();
-
     private CompressionType compression;
     private byte magic;
     private long firstOffset;
@@ -74,7 +72,7 @@ public class MemoryRecordsTest {
 
     @Test
     public void testIterator() {
-        expectExceptionWithZStd(compression, magic);
+        assumeAtLeastV2OrNotZstd();
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
 
@@ -159,8 +157,7 @@ public class MemoryRecordsTest {
 
     @Test
     public void testHasRoomForMethod() {
-        expectExceptionWithZStd(compression, magic);
-
+        assumeAtLeastV2OrNotZstd();
         MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression,
                 TimestampType.CREATE_TIME, 0L);
         builder.append(0L, "a".getBytes(), "1".getBytes());
@@ -447,60 +444,59 @@ public class MemoryRecordsTest {
 
     @Test
     public void testFilterToBatchDiscard() {
-        if (compression != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) {
-            expectExceptionWithZStd(compression, magic);
+        assumeAtLeastV2OrNotZstd();
+        assumeTrue(compression != CompressionType.NONE || magic >= MAGIC_VALUE_V2);
 
-            ByteBuffer buffer = ByteBuffer.allocate(2048);
-            MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
-            builder.append(10L, "1".getBytes(), "a".getBytes());
-            builder.close();
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, "1".getBytes(), "a".getBytes());
+        builder.close();
 
-            builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L);
-            builder.append(11L, "2".getBytes(), "b".getBytes());
-            builder.append(12L, "3".getBytes(), "c".getBytes());
-            builder.close();
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L);
+        builder.append(11L, "2".getBytes(), "b".getBytes());
+        builder.append(12L, "3".getBytes(), "c".getBytes());
+        builder.close();
 
-            builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L);
-            builder.append(13L, "4".getBytes(), "d".getBytes());
-            builder.append(20L, "5".getBytes(), "e".getBytes());
-            builder.append(15L, "6".getBytes(), "f".getBytes());
-            builder.close();
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L);
+        builder.append(13L, "4".getBytes(), "d".getBytes());
+        builder.append(20L, "5".getBytes(), "e".getBytes());
+        builder.append(15L, "6".getBytes(), "f".getBytes());
+        builder.close();
 
-            builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L);
-            builder.append(16L, "7".getBytes(), "g".getBytes());
-            builder.close();
+        builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L);
+        builder.append(16L, "7".getBytes(), "g".getBytes());
+        builder.close();
 
-            buffer.flip();
+        buffer.flip();
 
-            ByteBuffer filtered = ByteBuffer.allocate(2048);
-            MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
-                @Override
-                protected BatchRetention checkBatchRetention(RecordBatch batch) {
-                    // discard the second and fourth batches
-                    if (batch.lastOffset() == 2L || batch.lastOffset() == 6L)
-                        return BatchRetention.DELETE;
-                    return BatchRetention.DELETE_EMPTY;
-                }
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
+            @Override
+            protected BatchRetention checkBatchRetention(RecordBatch batch) {
+                // discard the second and fourth batches
+                if (batch.lastOffset() == 2L || batch.lastOffset() == 6L)
+                    return BatchRetention.DELETE;
+                return BatchRetention.DELETE_EMPTY;
+            }
 
-                @Override
-                protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
-                    return true;
-                }
-            }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+            @Override
+            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+                return true;
+            }
+        }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
 
-            filtered.flip();
-            MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+        filtered.flip();
+        MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
-            List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
-            assertEquals(2, batches.size());
-            assertEquals(0L, batches.get(0).lastOffset());
-            assertEquals(5L, batches.get(1).lastOffset());
-        }
+        List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
+        assertEquals(2, batches.size());
+        assertEquals(0L, batches.get(0).lastOffset());
+        assertEquals(5L, batches.get(1).lastOffset());
     }
 
     @Test
     public void testFilterToAlreadyCompactedLog() {
-        expectExceptionWithZStd(compression, magic);
+        assumeAtLeastV2OrNotZstd();
 
         ByteBuffer buffer = ByteBuffer.allocate(2048);
 
@@ -642,7 +638,7 @@ public class MemoryRecordsTest {
 
     @Test
     public void testFilterToWithUndersizedBuffer() {
-        expectExceptionWithZStd(compression, magic);
+        assumeAtLeastV2OrNotZstd();
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
@@ -694,7 +690,7 @@ public class MemoryRecordsTest {
 
     @Test
     public void testToString() {
-        expectExceptionWithZStd(compression, magic);
+        assumeAtLeastV2OrNotZstd();
 
         long timestamp = 1000000;
         MemoryRecords memoryRecords = MemoryRecords.withRecords(magic, compression,
@@ -726,7 +722,7 @@ public class MemoryRecordsTest {
 
     @Test
     public void testFilterTo() {
-        expectExceptionWithZStd(compression, magic);
+        assumeAtLeastV2OrNotZstd();
 
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
@@ -841,7 +837,7 @@ public class MemoryRecordsTest {
 
     @Test
     public void testFilterToPreservesLogAppendTime() {
-        expectExceptionWithZStd(compression, magic);
+        assumeAtLeastV2OrNotZstd();
 
         long logAppendTime = System.currentTimeMillis();
 
@@ -887,7 +883,7 @@ public class MemoryRecordsTest {
 
     @Test
     public void testNextBatchSize() {
-        expectExceptionWithZStd(compression, magic);
+        assumeAtLeastV2OrNotZstd();
 
         ByteBuffer buffer = ByteBuffer.allocate(2048);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
@@ -902,39 +898,39 @@ public class MemoryRecordsTest {
         assertEquals(0, buffer.position());
 
         buffer.limit(1); // size not in buffer
-        assertEquals(null, records.firstBatchSize());
+        assertNull(records.firstBatchSize());
         buffer.limit(Records.LOG_OVERHEAD); // magic not in buffer
-        assertEquals(null, records.firstBatchSize());
+        assertNull(records.firstBatchSize());
         buffer.limit(Records.HEADER_SIZE_UP_TO_MAGIC); // payload not in buffer
         assertEquals(size, records.firstBatchSize().intValue());
 
         buffer.limit(size);
         byte magic = buffer.get(Records.MAGIC_OFFSET);
         buffer.put(Records.MAGIC_OFFSET, (byte) 10);
-        try {
-            records.firstBatchSize();
-            fail("Did not fail with corrupt magic");
-        } catch (CorruptRecordException e) {
-            // Expected exception
-        }
+        assertThrows(CorruptRecordException.class, records::firstBatchSize);
         buffer.put(Records.MAGIC_OFFSET, magic);
 
         buffer.put(Records.SIZE_OFFSET + 3, (byte) 0);
-        try {
-            records.firstBatchSize();
-            fail("Did not fail with corrupt size");
-        } catch (CorruptRecordException e) {
-            // Expected exception
-        }
+        assertThrows(CorruptRecordException.class, records::firstBatchSize);
     }
 
-    private void expectExceptionWithZStd(CompressionType compressionType, byte magic) {
-        if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2) {
-            exceptionRule.expect(IllegalArgumentException.class);
-            exceptionRule.expectMessage("ZStandard compression is not supported for magic " + magic);
+    @Test
+    public void testWithRecords() {
+        Supplier<MemoryRecords> recordsSupplier = () -> MemoryRecords.withRecords(magic, compression,
+            new SimpleRecord(10L, "key1".getBytes(), "value1".getBytes()));
+        if (compression != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2) {
+            MemoryRecords memoryRecords = recordsSupplier.get();
+            String key = Utils.utf8(memoryRecords.batches().iterator().next().iterator().next().key());
+            assertEquals("key1", key);
+        } else {
+            assertThrows(IllegalArgumentException.class, recordsSupplier::get);
         }
     }
 
+    private void assumeAtLeastV2OrNotZstd() {
+        assumeTrue(compression != CompressionType.ZSTD || magic >= MAGIC_VALUE_V2);
+    }
+
     @Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1}, compressionType={2}")
     public static Collection<Object[]> data() {
         List<Object[]> values = new ArrayList<>();
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
index a46226d..415b295 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java
@@ -17,9 +17,7 @@
 package org.apache.kafka.connect.data;
 
 import org.apache.kafka.connect.errors.DataException;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -30,6 +28,7 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 
 
 public class StructTest {
@@ -285,9 +284,6 @@ public class StructTest {
         assertNotEquals(struct2.hashCode(), struct3.hashCode());
     }
 
-    @Rule
-    public ExpectedException thrown = ExpectedException.none();
-
     @Test
     public void testValidateStructWithNullValue() {
         Schema schema = SchemaBuilder.struct()
@@ -297,9 +293,9 @@ public class StructTest {
                 .build();
 
         Struct struct = new Struct(schema);
-        thrown.expect(DataException.class);
-        thrown.expectMessage("Invalid value: null used for required field: \"one\", schema type: STRING");
-        struct.validate();
+        Exception e = assertThrows(DataException.class, struct::validate);
+        assertEquals("Invalid value: null used for required field: \"one\", schema type: STRING",
+            e.getMessage());
     }
 
     @Test
@@ -307,13 +303,15 @@ public class StructTest {
         String fieldName = "field";
         FakeSchema fakeSchema = new FakeSchema();
 
-        thrown.expect(DataException.class);
-        thrown.expectMessage("Invalid Java object for schema type null: class java.lang.Object for field: \"field\"");
-        ConnectSchema.validateValue(fieldName, fakeSchema, new Object());
+        Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
+            fakeSchema, new Object()));
+        assertEquals("Invalid Java object for schema type null: class java.lang.Object for field: \"field\"",
+            e.getMessage());
 
-        thrown.expect(DataException.class);
-        thrown.expectMessage("Invalid Java object for schema type INT8: class java.lang.Object for field: \"field\"");
-        ConnectSchema.validateValue(fieldName, Schema.INT8_SCHEMA, new Object());
+        e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
+            Schema.INT8_SCHEMA, new Object()));
+        assertEquals("Invalid Java object for schema type INT8: class java.lang.Object for field: \"field\"",
+            e.getMessage());
     }
 
     @Test
@@ -323,9 +321,7 @@ public class StructTest {
             .field(fieldName, Schema.STRING_SCHEMA);
         Struct struct = new Struct(testSchema);
 
-        thrown.expect(DataException.class);
-        Field field = null;
-        struct.put(field, "valid");
+        assertThrows(DataException.class, () -> struct.put((Field) null, "valid"));
     }
 
     @Test
@@ -335,8 +331,8 @@ public class StructTest {
             .field(fieldName, Schema.STRING_SCHEMA);
         Struct struct = new Struct(testSchema);
 
-        thrown.expect(DataException.class);
-        thrown.expectMessage("Invalid value: null used for required field: \"fieldName\", schema type: STRING");
-        struct.put(fieldName, null);
+        Exception e = assertThrows(DataException.class, () -> struct.put(fieldName, null));
+        assertEquals("Invalid value: null used for required field: \"fieldName\", schema type: STRING",
+            e.getMessage());
     }
 }
diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index cc35f1d..e778336 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -26,12 +26,15 @@ import kafka.utils.Implicits._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.test.IntegrationTest
 import org.junit.After
+import org.junit.experimental.categories.Category
 
 import scala.collection.Seq
 import scala.collection.mutable.ListBuffer
 import scala.util.Random
 
+@Category(Array(classOf[IntegrationTest]))
 abstract class AbstractLogCleanerIntegrationTest {
 
   var cleaner: LogCleaner = _
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 9eac3d4..67b9c3a 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -64,7 +64,7 @@ versions += [
   jaxrs: "2.1.1",
   jfreechart: "1.0.0",
   jopt: "5.0.4",
-  junit: "4.12",
+  junit: "4.13-beta-2",
   kafka_0100: "0.10.0.1",
   kafka_0101: "0.10.1.1",
   kafka_0102: "0.10.2.2",
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 19d5ae0..afebfdb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -51,9 +51,9 @@ import static org.apache.kafka.streams.StreamsConfig.consumerPrefix;
 import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
 import static org.apache.kafka.test.StreamsTestUtils.getStreamsConfig;
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 6539168..a598400 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -59,7 +59,7 @@ import kafka.utils.MockTime;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.fail;
 
 @Category({IntegrationTest.class})
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 2190d70..5007fa9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -53,7 +53,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 2873593..264cd35 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -65,7 +65,7 @@ import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
  * End-to-end integration test based on using regex and named topics for creating sources, using
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
index ec9df19..6fa1bff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
@@ -61,8 +61,8 @@ import java.util.regex.Pattern;
 import static java.time.Duration.ofMillis;
 import static java.time.Duration.ofSeconds;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 @Category({IntegrationTest.class})
 public class RepartitionOptimizingIntegrationTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java
index 5fe30dd..6e4cdef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/internals/ApiUtilsTest.java
@@ -25,8 +25,8 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
 import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
 import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondInstant;
 import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 
@@ -107,4 +107,4 @@ public class ApiUtilsTest {
         assertThat(failMsgPrefix, containsString("variableName"));
         assertThat(failMsgPrefix, containsString("someValue"));
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
index 2526138..a113681 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/PrintedTest.java
@@ -36,7 +36,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 public class PrintedTest {
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index fe75191..0b8627f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -35,7 +35,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 8c14d17..d06b028 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -47,10 +47,10 @@ import java.util.regex.Pattern;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.streams.Topology.AutoOffsetReset;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
index bedf3d8..d18a7a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
@@ -25,9 +25,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -44,16 +42,13 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
 
     private KStreamFlatTransformProcessor<Number, Number, Integer, Integer> processor;
 
-    @Rule
-    public final ExpectedException exception = ExpectedException.none();
-
     @Before
     public void setUp() {
         inputKey = 1;
         inputValue = 10;
         transformer = mock(Transformer.class);
         context = strictMock(ProcessorContext.class);
-        processor = new KStreamFlatTransformProcessor<Number, Number, Integer, Integer>(transformer);
+        processor = new KStreamFlatTransformProcessor<>(transformer);
     }
 
     @Test
@@ -139,4 +134,4 @@ public class KStreamFlatTransformTest extends EasyMockSupport {
         verifyAll();
         assertTrue(processor instanceof KStreamFlatTransformProcessor);
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 14d92d1..76a01cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -53,9 +53,7 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.time.Duration;
 import java.util.Arrays;
@@ -71,9 +69,10 @@ import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -91,9 +90,6 @@ public class KStreamImplTest {
 
     private Serde<String> mySerde = new Serdes.StringSerde();
 
-    @Rule
-    public final ExpectedException exception = ExpectedException.none();
-
     @Before
     public void before() {
         builder = new StreamsBuilder();
@@ -547,16 +543,14 @@ public class KStreamImplTest {
 
     @Test
     public void shouldNotAllowNullTransformSupplierOnTransform() {
-        exception.expect(NullPointerException.class);
-        exception.expectMessage("transformerSupplier can't be null");
-        testStream.transform(null);
+        final Exception e = assertThrows(NullPointerException.class, () -> testStream.transform(null));
+        assertEquals("transformerSupplier can't be null", e.getMessage());
     }
 
     @Test
     public void shouldNotAllowNullTransformSupplierOnFlatTransform() {
-        exception.expect(NullPointerException.class);
-        exception.expectMessage("transformerSupplier can't be null");
-        testStream.flatTransform(null);
+        final Exception e = assertThrows(NullPointerException.class, () -> testStream.flatTransform(null));
+        assertEquals("transformerSupplier can't be null", e.getMessage());
     }
 
     @Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 133bd55..71367b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -43,8 +43,8 @@ import java.util.Set;
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 public class KStreamKStreamJoinTest {
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 0d42e44..ec739e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -40,9 +40,9 @@ import java.util.Properties;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public class KTableSourceTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
index def6ca1..9d4ba3d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TransformerSupplierAdapterTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.hamcrest.core.IsSame.sameInstance;
 import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 @SuppressWarnings("unchecked")
 public class TransformerSupplierAdapterTest extends EasyMockSupport {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 5e426d9..1612cb9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 
 import static java.time.Duration.ofMillis;
 import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.fail;
 
 public class GraphGraceSearchUtilTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
index 5d7078c..c981107 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
@@ -34,7 +34,7 @@ import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 
@@ -219,4 +219,4 @@ public class CompositeRestoreListenerTest {
         }
     }
 
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index 21b6623..6803978 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -32,8 +32,8 @@ import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 
 public class GlobalProcessorContextImplTest {
     private static final String GLOBAL_STORE_NAME = "global-store";
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 1be5231..1a46af8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -51,11 +51,11 @@ import static java.time.Duration.ofSeconds;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
index e652ee5..45e0165 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextTest.java
@@ -31,7 +31,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.fail;
 
 public class ProcessorContextTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index f3d151c..11485e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -91,12 +91,12 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME;
 import static org.apache.kafka.streams.processor.internals.StreamThread.getSharedAdminClientId;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 140c219..284e396 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -61,9 +61,9 @@ import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 @SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index cadfdb0..e1d6f0c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -34,10 +34,10 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 public class StreamsMetricsImplTest {
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 44ff3a2..9d1284e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -39,9 +39,9 @@ import java.util.List;
 import java.util.Map;
 
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 public abstract class AbstractKeyValueStoreTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index cf15f9d..3ac05a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -47,11 +47,11 @@ import java.util.Map;
 
 import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -346,4 +346,4 @@ public class CachingKeyValueStoreTest extends AbstractKeyValueStoreTest {
             forwarded.put(key, new Change<>(newValue, oldValue));
         }
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 915d2a3..2307b71 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -26,9 +26,7 @@ import org.apache.kafka.test.StateStoreProviderStub;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -40,10 +38,12 @@ import static java.util.Arrays.asList;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 public class CompositeReadOnlyWindowStoreTest {
 
     private static final long WINDOW_SIZE = 30_000;
+
     private final String storeName = "window-store";
     private StateStoreProviderStub stubProviderOne;
     private StateStoreProviderStub stubProviderTwo;
@@ -51,9 +51,6 @@ public class CompositeReadOnlyWindowStoreTest {
     private ReadOnlyWindowStoreStub<String, String> underlyingWindowStore;
     private ReadOnlyWindowStoreStub<String, String> otherUnderlyingStore;
 
-    @Rule
-    public final ExpectedException windowStoreIteratorException = ExpectedException.none();
-
     @Before
     public void before() {
         stubProviderOne = new StateStoreProviderStub(false);
@@ -151,9 +148,7 @@ public class CompositeReadOnlyWindowStoreTest {
         final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new
                 StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo");
         final WindowStoreIterator<Object> windowStoreIterator = store.fetch("key", ofEpochMilli(1), ofEpochMilli(10));
-
-        windowStoreIteratorException.expect(NoSuchElementException.class);
-        windowStoreIterator.peekNextKey();
+        assertThrows(NoSuchElementException.class, windowStoreIterator::peekNextKey);
     }
 
     @Test
@@ -161,9 +156,7 @@ public class CompositeReadOnlyWindowStoreTest {
         final CompositeReadOnlyWindowStore<Object, Object> store = new CompositeReadOnlyWindowStore<>(new
                 StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo");
         final WindowStoreIterator<Object> windowStoreIterator = store.fetch("key", ofEpochMilli(1), ofEpochMilli(10));
-
-        windowStoreIteratorException.expect(NoSuchElementException.class);
-        windowStoreIterator.next();
+        assertThrows(NoSuchElementException.class, windowStoreIterator::next);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 91eda9f..1bd4045 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -30,8 +30,8 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 0dca634..23effc9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -47,8 +47,8 @@ import java.util.Map;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(EasyMockRunner.class)
@@ -229,4 +229,4 @@ public class MeteredKeyValueStoreTest {
     }
 
 
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 92056fa..fd04968 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -48,8 +48,8 @@ import java.util.Map;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(EasyMockRunner.class)
@@ -248,4 +248,4 @@ public class MeteredSessionStoreTest {
         return this.metrics.metric(new MetricName(name, "stream-scope-metrics", "", this.tags));
     }
 
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
index 897f94f..74e5cd5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
@@ -51,9 +51,9 @@ import java.util.List;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.matchesPattern;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 8097d74..3be20de 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -377,7 +376,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
         // Bulk loading is enabled during recovery.
         for (final KeyValueSegment segment : bytesStore.getSegments()) {
-            Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
+            assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
         }
 
         final List<KeyValue<Windowed<String>, Long>> expected = new ArrayList<>();
@@ -401,12 +400,12 @@ public class RocksDBSegmentedBytesStoreTest {
         restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L);
 
         for (final KeyValueSegment segment : bytesStore.getSegments()) {
-            Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
+            assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
         }
 
         restoreListener.onRestoreEnd(null, bytesStore.name(), 0L);
         for (final KeyValueSegment segment : bytesStore.getSegments()) {
-            Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(4));
+            assertThat(segment.getOptions().level0FileNumCompactionTrigger(), equalTo(4));
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 4785673..db458eb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -51,9 +51,9 @@ import java.util.Set;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
index 3347d02..f49527b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreTest.java
@@ -36,9 +36,9 @@ import java.util.List;
 import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 
 public class RocksDBTimestampedStoreTest extends RocksDBStoreTest {
 
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 84aaa8a..9de7798 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -74,10 +74,10 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -808,7 +808,7 @@ public class TopologyTestDriverTest {
     public void shouldNotUpdateStoreForSmallerValue() {
         setup();
         testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L, 9999L));
-        Assert.assertThat(store.get("a"), equalTo(21L));
+        assertThat(store.get("a"), equalTo(21L));
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
         Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
     }
@@ -817,7 +817,7 @@ public class TopologyTestDriverTest {
     public void shouldNotUpdateStoreForLargerValue() {
         setup();
         testDriver.pipeInput(recordFactory.create("input-topic", "a", 42L, 9999L));
-        Assert.assertThat(store.get("a"), equalTo(42L));
+        assertThat(store.get("a"), equalTo(42L));
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 42L);
         Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));
     }
@@ -826,7 +826,7 @@ public class TopologyTestDriverTest {
     public void shouldUpdateStoreForNewKey() {
         setup();
         testDriver.pipeInput(recordFactory.create("input-topic", "b", 21L, 9999L));
-        Assert.assertThat(store.get("b"), equalTo(21L));
+        assertThat(store.get("b"), equalTo(21L));
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "a", 21L);
         OutputVerifier.compareKeyValue(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer), "b", 21L);
         Assert.assertNull(testDriver.readOutput("result-topic", stringDeserializer, longDeserializer));


Mime
View raw message