kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: MINOR: hygene cleanup in TransactionManagerTest (#5951)
Date Mon, 03 Dec 2018 05:32:19 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c482264  MINOR: hygene cleanup in TransactionManagerTest (#5951)
c482264 is described below

commit c4822648ef1bb0cd6825d8dbe465c8a5a26a76bc
Author: Viktor Somogyi <viktorsomogyi@gmail.com>
AuthorDate: Mon Dec 3 06:32:05 2018 +0100

    MINOR: hygene cleanup in TransactionManagerTest (#5951)
    
    
    Reviewers: Andras Katona <41361962+akatona84@users.noreply.github.com>, Manikumar
Reddy <manikumar.reddy@gmail.com>
---
 .../producer/internals/TransactionManagerTest.java | 177 +++++++++------------
 1 file changed, 75 insertions(+), 102 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 72c0a0b..b8f5cae 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -42,7 +42,6 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
 import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
 import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
@@ -634,7 +633,7 @@ public class TransactionManagerTest {
         prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
         prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, txnOffsetCommitResponse);
 
-        assertEquals(null, transactionManager.coordinator(CoordinatorType.GROUP));
+        assertNull(transactionManager.coordinator(CoordinatorType.GROUP));
         sender.run(time.milliseconds());  // try to send TxnOffsetCommitRequest, but find
we don't have a group coordinator.
         sender.run(time.milliseconds());  // send find coordinator for group request
         assertNotNull(transactionManager.coordinator(CoordinatorType.GROUP));
@@ -671,14 +670,11 @@ public class TransactionManagerTest {
     @Test
     public void testUnsupportedFindCoordinator() {
         transactionManager.initializeTransactions();
-        client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest)
body;
-                assertEquals(findCoordinatorRequest.coordinatorType(), CoordinatorType.TRANSACTION);
-                assertEquals(findCoordinatorRequest.coordinatorKey(), transactionalId);
-                return true;
-            }
+        client.prepareUnsupportedVersionResponse(body -> {
+            FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
+            assertEquals(findCoordinatorRequest.coordinatorType(), CoordinatorType.TRANSACTION);
+            assertEquals(findCoordinatorRequest.coordinatorKey(), transactionalId);
+            return true;
         });
 
         sender.run(time.milliseconds()); // InitProducerRequest is queued
@@ -697,14 +693,11 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasError());
         assertNotNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
 
-        client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
-                assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
-                assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
-                return true;
-            }
+        client.prepareUnsupportedVersionResponse(body -> {
+            InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
+            assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
+            assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
+            return true;
         });
 
         sender.run(time.milliseconds()); // InitProducerRequest is dequeued
@@ -759,7 +752,7 @@ public class TransactionManagerTest {
         // FindCoordinator and InitPid requests.
         sender.run(time.milliseconds());
 
-        assertEquals(null, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
         assertFalse(transactionManager.hasProducerId());
 
@@ -795,7 +788,7 @@ public class TransactionManagerTest {
         sender.run(time.milliseconds());
         time.sleep(110);  // waiting for the blackout period for the node to expire.
 
-        assertEquals(null, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
         assertFalse(transactionManager.hasProducerId());
 
@@ -827,7 +820,7 @@ public class TransactionManagerTest {
         prepareInitPidResponse(Errors.NOT_COORDINATOR, false, pid, epoch);
         sender.run(time.milliseconds());  // send pid, get not coordinator. Should resend
the FindCoordinator and InitPid requests
 
-        assertEquals(null, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
         assertFalse(initPidResult.isCompleted());
         assertFalse(transactionManager.hasProducerId());
 
@@ -1402,6 +1395,7 @@ public class TransactionManagerTest {
             commitResult.await();
             fail();  // the get() must throw an exception.
         } catch (KafkaException e) {
+            // Expected
         }
 
         try {
@@ -1835,7 +1829,7 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() throws Exception
{
+    public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -1861,7 +1855,7 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void shouldFailAbortIfAddOffsetsFailsWithFatalError() throws Exception {
+    public void shouldFailAbortIfAddOffsetsFailsWithFatalError() {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -1908,7 +1902,7 @@ public class TransactionManagerTest {
         PartitionInfo part2 = new PartitionInfo(topic, 1, node2, null, null);
 
         Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1,
part2),
-                Collections.<String>emptySet(), Collections.<String>emptySet());
+                Collections.emptySet(), Collections.emptySet());
         Set<Node> nodes = new HashSet<>();
         nodes.add(node1);
         nodes.add(node2);
@@ -1945,8 +1939,8 @@ public class TransactionManagerTest {
         // Try to drain a message destined for tp1, it should get drained.
         Node node1 = new Node(1, "localhost", 1112);
         PartitionInfo part1 = new PartitionInfo(topic, 1, node1, null, null);
-        Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1),
-                Collections.<String>emptySet(), Collections.<String>emptySet());
+        Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1),
+                Collections.emptySet(), Collections.emptySet());
         accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
                 "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
         Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster,
Collections.singleton(node1),
@@ -1972,8 +1966,8 @@ public class TransactionManagerTest {
         Node node1 = new Node(0, "localhost", 1111);
         PartitionInfo part1 = new PartitionInfo(topic, 0, node1, null, null);
 
-        Cluster cluster = new Cluster(null, Arrays.asList(node1), Arrays.asList(part1),
-                Collections.<String>emptySet(), Collections.<String>emptySet());
+        Cluster cluster = new Cluster(null, Collections.singletonList(node1), Collections.singletonList(part1),
+                Collections.emptySet(), Collections.emptySet());
         Set<Node> nodes = new HashSet<>();
         nodes.add(node1);
         Map<Integer, List<ProducerBatch>> drainedBatches = accumulator.drain(cluster,
nodes, Integer.MAX_VALUE,
@@ -2012,7 +2006,7 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException,
ExecutionException {
+    public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException
{
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -2058,7 +2052,7 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException,
ExecutionException {
+    public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException
{
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -2123,7 +2117,7 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void testDropCommitOnBatchExpiry() throws InterruptedException, ExecutionException
{
+    public void testDropCommitOnBatchExpiry() throws InterruptedException {
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -2190,7 +2184,7 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException,
ExecutionException {
+    public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException
{
         final long pid = 13131L;
         final short epoch = 1;
 
@@ -2249,7 +2243,7 @@ public class TransactionManagerTest {
     }
 
     @Test
-    public void testShouldResetProducerStateAfterResolvingSequences() throws InterruptedException,
ExecutionException {
+    public void testShouldResetProducerStateAfterResolvingSequences() {
         // Create a TransactionManager without a transactionalId to test
         // shouldResetProducerStateAfterResolvingSequences.
         TransactionManager manager = new TransactionManager(logContext, null, transactionTimeoutMs,
@@ -2294,13 +2288,10 @@ public class TransactionManagerTest {
     }
 
     private void prepareAddPartitionsToTxn(final Map<TopicPartition, Errors> errors)
{
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) body;
-                assertEquals(new HashSet<>(request.partitions()), new HashSet<>(errors.keySet()));
-                return true;
-            }
+        client.prepareResponse(body -> {
+            AddPartitionsToTxnRequest request = (AddPartitionsToTxnRequest) body;
+            assertEquals(new HashSet<>(request.partitions()), new HashSet<>(errors.keySet()));
+            return true;
         }, new AddPartitionsToTxnResponse(0, errors));
     }
 
@@ -2311,26 +2302,20 @@ public class TransactionManagerTest {
     private void prepareFindCoordinatorResponse(Errors error, boolean shouldDisconnect,
                                                 final CoordinatorType coordinatorType,
                                                 final String coordinatorKey) {
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest)
body;
-                assertEquals(findCoordinatorRequest.coordinatorType(), coordinatorType);
-                assertEquals(findCoordinatorRequest.coordinatorKey(), coordinatorKey);
-                return true;
-            }
+        client.prepareResponse(body -> {
+            FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
+            assertEquals(findCoordinatorRequest.coordinatorType(), coordinatorType);
+            assertEquals(findCoordinatorRequest.coordinatorKey(), coordinatorKey);
+            return true;
         }, new FindCoordinatorResponse(error, brokerNode), shouldDisconnect);
     }
 
     private void prepareInitPidResponse(Errors error, boolean shouldDisconnect, long pid,
short epoch) {
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
-                assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
-                assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
-                return true;
-            }
+        client.prepareResponse(body -> {
+            InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body;
+            assertEquals(initProducerIdRequest.transactionalId(), transactionalId);
+            assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs);
+            return true;
         }, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
     }
 
@@ -2342,22 +2327,19 @@ public class TransactionManagerTest {
         client.prepareResponse(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0,
error, 0));
     }
     private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch)
{
-        return new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                ProduceRequest produceRequest = (ProduceRequest) body;
-                MemoryRecords records = produceRequest.partitionRecordsOrFail().get(tp0);
-                assertNotNull(records);
-                Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
-                assertTrue(batchIterator.hasNext());
-                MutableRecordBatch batch = batchIterator.next();
-                assertFalse(batchIterator.hasNext());
-                assertTrue(batch.isTransactional());
-                assertEquals(pid, batch.producerId());
-                assertEquals(epoch, batch.producerEpoch());
-                assertEquals(transactionalId, produceRequest.transactionalId());
-                return true;
-            }
+        return body -> {
+            ProduceRequest produceRequest = (ProduceRequest) body;
+            MemoryRecords records = produceRequest.partitionRecordsOrFail().get(tp0);
+            assertNotNull(records);
+            Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
+            assertTrue(batchIterator.hasNext());
+            MutableRecordBatch batch = batchIterator.next();
+            assertFalse(batchIterator.hasNext());
+            assertTrue(batch.isTransactional());
+            assertEquals(pid, batch.producerId());
+            assertEquals(epoch, batch.producerEpoch());
+            assertEquals(transactionalId, produceRequest.transactionalId());
+            return true;
         };
     }
 
@@ -2375,16 +2357,13 @@ public class TransactionManagerTest {
 
     private MockClient.RequestMatcher addPartitionsRequestMatcher(final TopicPartition topicPartition,
                                                                   final short epoch, final
long pid) {
-        return new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest)
body;
-                assertEquals(pid, addPartitionsToTxnRequest.producerId());
-                assertEquals(epoch, addPartitionsToTxnRequest.producerEpoch());
-                assertEquals(singletonList(topicPartition), addPartitionsToTxnRequest.partitions());
-                assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
-                return true;
-            }
+        return body -> {
+            AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest)
body;
+            assertEquals(pid, addPartitionsToTxnRequest.producerId());
+            assertEquals(epoch, addPartitionsToTxnRequest.producerEpoch());
+            assertEquals(singletonList(topicPartition), addPartitionsToTxnRequest.partitions());
+            assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
+            return true;
         };
     }
 
@@ -2397,16 +2376,13 @@ public class TransactionManagerTest {
     }
 
     private MockClient.RequestMatcher endTxnMatcher(final TransactionResult result, final
long pid, final short epoch) {
-        return new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                EndTxnRequest endTxnRequest = (EndTxnRequest) body;
-                assertEquals(transactionalId, endTxnRequest.transactionalId());
-                assertEquals(pid, endTxnRequest.producerId());
-                assertEquals(epoch, endTxnRequest.producerEpoch());
-                assertEquals(result, endTxnRequest.command());
-                return true;
-            }
+        return body -> {
+            EndTxnRequest endTxnRequest = (EndTxnRequest) body;
+            assertEquals(transactionalId, endTxnRequest.transactionalId());
+            assertEquals(pid, endTxnRequest.producerId());
+            assertEquals(epoch, endTxnRequest.producerEpoch());
+            assertEquals(result, endTxnRequest.command());
+            return true;
         };
     }
 
@@ -2414,16 +2390,13 @@ public class TransactionManagerTest {
                                                 final String consumerGroupId,
                                                 final long producerId,
                                                 final short producerEpoch) {
-        client.prepareResponse(new MockClient.RequestMatcher() {
-            @Override
-            public boolean matches(AbstractRequest body) {
-                AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest)
body;
-                assertEquals(consumerGroupId, addOffsetsToTxnRequest.consumerGroupId());
-                assertEquals(transactionalId, addOffsetsToTxnRequest.transactionalId());
-                assertEquals(producerId, addOffsetsToTxnRequest.producerId());
-                assertEquals(producerEpoch, addOffsetsToTxnRequest.producerEpoch());
-                return true;
-            }
+        client.prepareResponse(body -> {
+            AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) body;
+            assertEquals(consumerGroupId, addOffsetsToTxnRequest.consumerGroupId());
+            assertEquals(transactionalId, addOffsetsToTxnRequest.transactionalId());
+            assertEquals(producerId, addOffsetsToTxnRequest.producerId());
+            assertEquals(producerEpoch, addOffsetsToTxnRequest.producerEpoch());
+            return true;
         }, new AddOffsetsToTxnResponse(0, error));
     }
 


Mime
View raw message