Repository: kafka
Updated Branches:
refs/heads/trunk eb59c8124 -> 54767bbba
KAFKA-4033; Revise partition assignment semantics on consumer subscription changes (KIP-70)
This PR changes topic subscription semantics so a change in subscription does not immediately cause a rebalance.
Instead, the next poll or the next scheduled metadata refresh will update the assigned partitions.
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Reviewers: Jason Gustafson
Closes #1726 from vahidhashemian/KAFKA-4033
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54767bbb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54767bbb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54767bbb
Branch: refs/heads/trunk
Commit: 54767bbba5bf18c01f50bb40c339433bfed09627
Parents: eb59c81
Author: Vahid Hashemian <vahidhashemian@us.ibm.com>
Authored: Thu Sep 8 19:56:53 2016 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Thu Sep 8 19:56:53 2016 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 13 +-
.../consumer/internals/ConsumerCoordinator.java | 36 +-
.../consumer/internals/SubscriptionState.java | 9 -
.../clients/consumer/KafkaConsumerTest.java | 653 ++++++++++++++-----
.../internals/SubscriptionStateTest.java | 63 +-
5 files changed, 568 insertions(+), 206 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 85d5194..ca8f1f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -552,7 +552,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* A consumer is instantiated by providing a {@link java.util.Properties} object as configuration.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
- *
+ *
* @param properties The consumer configuration properties
*/
public KafkaConsumer(Properties properties) {
@@ -706,9 +706,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
Metrics metrics,
SubscriptionState subscriptions,
Metadata metadata,
- boolean autoCommitEnabled,
- int autoCommitIntervalMs,
- int heartbeatIntervalMs,
long retryBackoffMs,
long requestTimeoutMs) {
this.clientId = clientId;
@@ -872,6 +869,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void unsubscribe() {
acquire();
try {
+ // make sure the offsets of topic partitions the consumer is unsubscribing from
+ // are committed since there will be no following rebalance
+ this.coordinator.maybeAutoCommitOffsetsNow();
+
log.debug("Unsubscribed all topics or patterns and assigned partitions");
this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
@@ -913,6 +914,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
topics.add(topic);
}
+ // make sure the offsets of topic partitions the consumer is unsubscribing from
+ // are committed since there will be no following rebalance
+ this.coordinator.maybeAutoCommitOffsetsNow();
+
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(new HashSet<>(partitions));
metadata.setTopics(topics);
http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b8df50e..ff0d669 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -514,22 +514,31 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
this.nextAutoCommitDeadline = now + retryBackoffMs;
} else if (now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
- commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- if (exception != null) {
- log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
- if (exception instanceof RetriableException)
- nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
- } else {
- log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId);
- }
- }
- });
+ doAutoCommitOffsetsAsync();
}
}
}
+ public void maybeAutoCommitOffsetsNow() {
+ if (autoCommitEnabled && !coordinatorUnknown())
+ doAutoCommitOffsetsAsync();
+ }
+
+ private void doAutoCommitOffsetsAsync() {
+ commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+ if (exception != null) {
+ log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
+ if (exception instanceof RetriableException)
+ nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
+ } else {
+ log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId);
+ }
+ }
+ });
+ }
+
private void maybeAutoCommitOffsetsSync() {
if (autoCommitEnabled) {
try {
@@ -807,7 +816,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
public void invoke() {
- callback.onComplete(offsets, exception);
+ if (callback != null)
+ callback.onComplete(offsets, exception);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6d4c01b..dd0bce9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@@ -128,13 +127,6 @@ public class SubscriptionState {
if (!this.subscription.equals(topicsToSubscribe)) {
this.subscription = topicsToSubscribe;
this.groupSubscription.addAll(topicsToSubscribe);
-
- // Remove any assigned partitions which are no longer subscribed to
- for (Iterator<TopicPartition> it = assignment.keySet().iterator(); it.hasNext(); ) {
- TopicPartition tp = it.next();
- if (!subscription.contains(tp.topic()))
- it.remove();
- }
}
}
@@ -216,7 +208,6 @@ public class SubscriptionState {
this.subscriptionType = SubscriptionType.NONE;
}
-
public Pattern getSubscribedPattern() {
return this.subscribedPattern;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8d2ac00..dbe3d67 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -35,11 +35,13 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.FetchResponse.PartitionData;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupResponse;
@@ -76,9 +78,15 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class KafkaConsumerTest {
-
private final String topic = "test";
- private final TopicPartition tp0 = new TopicPartition("test", 0);
+ private final TopicPartition tp0 = new TopicPartition(topic, 0);
+ private final TopicPartition tp1 = new TopicPartition(topic, 1);
+
+ private final String topic2 = "test2";
+ private final TopicPartition t2p0 = new TopicPartition(topic2, 0);
+
+ private final String topic3 = "test3";
+ private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
@Test
public void testConstructorClose() throws Exception {
@@ -318,9 +326,6 @@ public class KafkaConsumerTest {
@Test
public void verifyHeartbeatSent() throws Exception {
- String topic = "topic";
- TopicPartition partition = new TopicPartition(topic, 0);
-
int rebalanceTimeoutMs = 60000;
int sessionTimeoutMs = 30000;
int heartbeatIntervalMs = 1000;
@@ -336,47 +341,18 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
- rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
- consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- // set initial position so we don't need a lookup
- for (TopicPartition partition : partitions)
- consumer.seek(partition, 0);
- }
- });
-
- // lookup coordinator
- client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
-
- Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
-
- // join group
- client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
-
- // sync group
- client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator);
+ consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
+ Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
// initial fetch
- client.prepareResponseFrom(fetchResponse(partition, 0, 0), node);
+ client.prepareResponseFrom(fetchResponse(tp0, 0, 0), node);
consumer.poll(0);
- assertEquals(Collections.singleton(partition), consumer.assignment());
+ assertEquals(Collections.singleton(tp0), consumer.assignment());
- final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
- client.prepareResponseFrom(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(ClientRequest request) {
- heartbeatReceived.set(true);
- return true;
- }
- }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
+ AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator);
// heartbeat interval is 2 seconds
time.sleep(heartbeatIntervalMs);
@@ -389,9 +365,6 @@ public class KafkaConsumerTest {
@Test
public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception {
- String topic = "topic";
- TopicPartition partition = new TopicPartition(topic, 0);
-
int rebalanceTimeoutMs = 60000;
int sessionTimeoutMs = 30000;
int heartbeatIntervalMs = 1000;
@@ -407,47 +380,19 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
- rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
- consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- // set initial position so we don't need a lookup
- for (TopicPartition partition : partitions)
- consumer.seek(partition, 0);
- }
- });
-
- // lookup coordinator
- client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
-
- Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
-
- // join group
- client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
-
- // sync group
- client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator);
+ consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
+ Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
consumer.poll(0);
// respond to the outstanding fetch so that we have data available on the next poll
- client.respondFrom(fetchResponse(partition, 0, 5), node);
+ client.respondFrom(fetchResponse(tp0, 0, 5), node);
client.poll(0, time.milliseconds());
- client.prepareResponseFrom(fetchResponse(partition, 5, 0), node);
- final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
- client.prepareResponseFrom(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(ClientRequest request) {
- heartbeatReceived.set(true);
- return true;
- }
- }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
+ client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node);
+ AtomicBoolean heartbeatReceived = prepareHeartbeatResponse(client, coordinator);
time.sleep(heartbeatIntervalMs);
Thread.sleep(heartbeatIntervalMs);
@@ -459,8 +404,6 @@ public class KafkaConsumerTest {
@Test
public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
- String topic = "topic";
- final TopicPartition partition = new TopicPartition(topic, 0);
int rebalanceTimeoutMs = 60000;
int sessionTimeoutMs = 3000;
int heartbeatIntervalMs = 2000;
@@ -476,26 +419,22 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
- rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
- consumer.assign(Arrays.asList(partition));
- consumer.seekToBeginning(Arrays.asList(partition));
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+ consumer.assign(Arrays.asList(tp0));
+ consumer.seekToBeginning(Arrays.asList(tp0));
// there shouldn't be any need to lookup the coordinator or fetch committed offsets.
// we just lookup the starting position and send the record fetch.
- client.prepareResponse(listOffsetsResponse(Collections.singletonMap(partition, 50L), Errors.NONE.code()));
- client.prepareResponse(fetchResponse(partition, 50L, 5));
+ client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L), Errors.NONE.code()));
+ client.prepareResponse(fetchResponse(tp0, 50L, 5));
ConsumerRecords<String, String> records = consumer.poll(0);
assertEquals(5, records.count());
- assertEquals(55L, consumer.position(partition));
+ assertEquals(55L, consumer.position(tp0));
}
@Test
public void testCommitsFetchedDuringAssign() {
- String topic = "topic";
- final TopicPartition partition1 = new TopicPartition(topic, 0);
- final TopicPartition partition2 = new TopicPartition(topic, 1);
-
long offset1 = 10000;
long offset2 = 20000;
@@ -514,8 +453,8 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
- rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
- consumer.assign(Arrays.asList(partition1));
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+ consumer.assign(Arrays.asList(tp0));
// lookup coordinator
client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
@@ -523,28 +462,25 @@ public class KafkaConsumerTest {
// fetch offset for one topic
client.prepareResponseFrom(
- offsetResponse(Collections.singletonMap(partition1, offset1), Errors.NONE.code()),
+ offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE.code()),
coordinator);
- assertEquals(offset1, consumer.committed(partition1).offset());
+ assertEquals(offset1, consumer.committed(tp0).offset());
- consumer.assign(Arrays.asList(partition1, partition2));
+ consumer.assign(Arrays.asList(tp0, tp1));
// fetch offset for two topics
Map<TopicPartition, Long> offsets = new HashMap<>();
- offsets.put(partition1, offset1);
- offsets.put(partition2, offset2);
+ offsets.put(tp0, offset1);
+ offsets.put(tp1, offset2);
client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE.code()), coordinator);
- assertEquals(offset1, consumer.committed(partition1).offset());
- assertEquals(offset2, consumer.committed(partition2).offset());
+ assertEquals(offset1, consumer.committed(tp0).offset());
+ assertEquals(offset2, consumer.committed(tp1).offset());
}
@Test
public void testAutoCommitSentBeforePositionUpdate() {
- String topic = "topic";
- final TopicPartition partition = new TopicPartition(topic, 0);
-
int rebalanceTimeoutMs = 60000;
int sessionTimeoutMs = 30000;
int heartbeatIntervalMs = 3000;
@@ -563,56 +499,23 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
- rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
- consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-
- }
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- // set initial position so we don't need a lookup
- for (TopicPartition partition : partitions)
- consumer.seek(partition, 0);
- }
- });
-
- // lookup coordinator
- client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
-
- Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
-
- // join group
- client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
- // sync group
- client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator);
+ consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
+ Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
consumer.poll(0);
// respond to the outstanding fetch so that we have data available on the next poll
- client.respondFrom(fetchResponse(partition, 0, 5), node);
+ client.respondFrom(fetchResponse(tp0, 0, 5), node);
client.poll(0, time.milliseconds());
time.sleep(autoCommitIntervalMs);
- client.prepareResponseFrom(fetchResponse(partition, 5, 0), node);
+ client.prepareResponseFrom(fetchResponse(tp0, 5, 0), node);
// no data has been returned to the user yet, so the committed offset should be 0
- final AtomicBoolean commitReceived = new AtomicBoolean(false);
- client.prepareResponseFrom(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(ClientRequest request) {
- OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
- OffsetCommitRequest.PartitionData partitionData = commitRequest.offsetData().get(partition);
- if (partitionData.offset == 0) {
- commitReceived.set(true);
- return true;
- }
- return false;
- }
- }, new OffsetCommitResponse(Collections.singletonMap(partition, Errors.NONE.code())).toStruct(), coordinator);
+ AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 0);
consumer.poll(0);
@@ -621,9 +524,6 @@ public class KafkaConsumerTest {
@Test
public void testWakeupWithFetchDataAvailable() {
- String topic = "topic";
- final TopicPartition partition = new TopicPartition(topic, 0);
-
int rebalanceTimeoutMs = 60000;
int sessionTimeoutMs = 30000;
int heartbeatIntervalMs = 3000;
@@ -642,8 +542,360 @@ public class KafkaConsumerTest {
PartitionAssignor assignor = new RoundRobinAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
- rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
- consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+
+ consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
+ prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+
+ consumer.poll(0);
+
+ // respond to the outstanding fetch so that we have data available on the next poll
+ client.respondFrom(fetchResponse(tp0, 0, 5), node);
+ client.poll(0, time.milliseconds());
+
+ consumer.wakeup();
+
+ try {
+ consumer.poll(0);
+ fail();
+ } catch (WakeupException e) {
+ }
+
+ // make sure the position hasn't been updated
+ assertEquals(0, consumer.position(tp0));
+
+ // the next poll should return the completed fetch
+ ConsumerRecords<String, String> records = consumer.poll(0);
+ assertEquals(5, records.count());
+ }
+
+ /**
+ * Verify that when a consumer changes its topic subscription its assigned partitions
+ * do not immediately change, and the latest consumed offsets of its to-be-revoked
+ * partitions are properly committed (when auto-commit is enabled).
+ * Upon unsubscribing from subscribed topics the consumer subscription and assignment
+ * are both updated right away and its consumed offsets are committed (if auto-commit
+ * is enabled).
+ */
+ @Test
+ public void testSubscriptionChangesWithAutoCommitEnabled() {
+ int rebalanceTimeoutMs = 60000;
+ int sessionTimeoutMs = 30000;
+ int heartbeatIntervalMs = 3000;
+
+ // adjust auto commit interval lower than heartbeat so we don't need to deal with
+ // a concurrent heartbeat request
+ int autoCommitIntervalMs = 1000;
+
+ Time time = new MockTime();
+ MockClient client = new MockClient(time);
+ Map<String, Integer> tpCounts = new HashMap<>();
+ tpCounts.put(topic, 1);
+ tpCounts.put(topic2, 1);
+ tpCounts.put(topic3, 1);
+ Cluster cluster = TestUtils.singletonCluster(tpCounts);
+ Node node = cluster.nodes().get(0);
+ client.setNode(node);
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ metadata.update(cluster, time.milliseconds());
+ PartitionAssignor assignor = new RangeAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+
+ // initial subscription
+ consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
+
+ // verify that subscription has changed but assignment is still unchanged
+ assertTrue(consumer.subscription().size() == 2);
+ assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic2));
+ assertTrue(consumer.assignment().isEmpty());
+
+ // mock rebalance responses
+ Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
+
+ consumer.poll(0);
+
+ // verify that subscription is still the same, and now assignment has caught up
+ assertTrue(consumer.subscription().size() == 2);
+ assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic2));
+ assertTrue(consumer.assignment().size() == 2);
+ assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t2p0));
+
+ // mock a response to the outstanding fetch so that we have data available on the next poll
+ Map<TopicPartition, FetchInfo> fetches1 = new HashMap<>();
+ fetches1.put(tp0, new FetchInfo(0, 1));
+ fetches1.put(t2p0, new FetchInfo(0, 10));
+ client.respondFrom(fetchResponse(fetches1), node);
+ client.poll(0, time.milliseconds());
+
+ ConsumerRecords<String, String> records = consumer.poll(0);
+
+ // verify that the fetch occurred as expected
+ assertEquals(11, records.count());
+ assertEquals(1L, consumer.position(tp0));
+ assertEquals(10L, consumer.position(t2p0));
+
+
+ // subscription change
+ consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer));
+
+ // verify that subscription has changed but assignment is still unchanged
+ assertTrue(consumer.subscription().size() == 2);
+ assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic3));
+ assertTrue(consumer.assignment().size() == 2);
+ assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t2p0));
+
+ // mock the offset commit response for to be revoked partitions
+ Map<TopicPartition, Long> partitionOffsets1 = new HashMap<>();
+ partitionOffsets1.put(tp0, 1L);
+ partitionOffsets1.put(t2p0, 10L);
+ AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, partitionOffsets1);
+
+ // mock rebalance responses
+ prepareRebalance(client, node, assignor, Arrays.asList(tp0, t3p0), coordinator);
+
+ // mock a response to the outstanding fetch so that we have data available on the next poll
+ Map<TopicPartition, FetchInfo> fetches2 = new HashMap<>();
+ fetches2.put(tp0, new FetchInfo(1, 1));
+ fetches2.put(t3p0, new FetchInfo(0, 100));
+ client.respondFrom(fetchResponse(fetches2), node);
+ client.poll(0, time.milliseconds());
+ client.prepareResponse(fetchResponse(fetches2));
+
+ records = consumer.poll(0);
+
+ // verify that the fetch occurred as expected
+ assertEquals(101, records.count());
+ assertEquals(2L, consumer.position(tp0));
+ assertEquals(100L, consumer.position(t3p0));
+
+ // verify that the offset commits occurred as expected
+ assertTrue(commitReceived.get());
+
+ // verify that subscription is still the same, and now assignment has caught up
+ assertTrue(consumer.subscription().size() == 2);
+ assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic3));
+ assertTrue(consumer.assignment().size() == 2);
+ assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t3p0));
+
+
+ // mock the offset commit response for to be revoked partitions
+ Map<TopicPartition, Long> partitionOffsets2 = new HashMap<>();
+ partitionOffsets2.put(tp0, 2L);
+ partitionOffsets2.put(t3p0, 100L);
+ commitReceived = prepareOffsetCommitResponse(client, coordinator, partitionOffsets2);
+
+ // unsubscribe
+ consumer.unsubscribe();
+
+ // verify that subscription and assignment are both cleared
+ assertTrue(consumer.subscription().isEmpty());
+ assertTrue(consumer.assignment().isEmpty());
+
+ // verify that the offset commits occurred as expected
+ assertTrue(commitReceived.get());
+
+ consumer.close();
+ }
+
+ /**
+ * Verify that when a consumer changes its topic subscription its assigned partitions
+ * do not immediately change, and the consumed offsets of its to-be-revoked partitions
+ * are not committed (when auto-commit is disabled).
+ * Upon unsubscribing from subscribed topics, the assigned partitions immediately
+ * change but if auto-commit is disabled the consumer offsets are not committed.
+ */
+ @Test
+ public void testSubscriptionChangesWithAutoCommitDisabled() {
+ int rebalanceTimeoutMs = 60000;
+ int sessionTimeoutMs = 30000;
+ int heartbeatIntervalMs = 3000;
+ int autoCommitIntervalMs = 1000;
+
+ Time time = new MockTime();
+ MockClient client = new MockClient(time);
+ Map<String, Integer> tpCounts = new HashMap<>();
+ tpCounts.put(topic, 1);
+ tpCounts.put(topic2, 1);
+ Cluster cluster = TestUtils.singletonCluster(tpCounts);
+ Node node = cluster.nodes().get(0);
+ client.setNode(node);
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ metadata.update(cluster, time.milliseconds());
+ PartitionAssignor assignor = new RangeAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
+
+ // initial subscription
+ consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer));
+
+ // verify that subscription has changed but assignment is still unchanged
+ assertTrue(consumer.subscription().equals(Collections.singleton(topic)));
+ assertTrue(consumer.assignment().isEmpty());
+
+ // mock rebalance responses
+ prepareRebalance(client, node, assignor, Arrays.asList(tp0), null);
+
+ consumer.poll(0);
+
+ // verify that subscription is still the same, and now assignment has caught up
+ assertTrue(consumer.subscription().equals(Collections.singleton(topic)));
+ assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+
+ consumer.poll(0);
+
+ // subscription change
+ consumer.subscribe(Arrays.asList(topic2), getConsumerRebalanceListener(consumer));
+
+ // verify that subscription has changed but assignment is still unchanged
+ assertTrue(consumer.subscription().equals(Collections.singleton(topic2)));
+ assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+
+ // the auto commit is disabled, so no offset commit request should be sent
+ for (ClientRequest req: client.requests())
+ assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+
+ // subscription change
+ consumer.unsubscribe();
+
+ // verify that subscription and assignment are both updated
+ assertTrue(consumer.subscription().isEmpty());
+ assertTrue(consumer.assignment().isEmpty());
+
+ // the auto commit is disabled, so no offset commit request should be sent
+ for (ClientRequest req: client.requests())
+ assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+
+ consumer.close();
+ }
+
+ @Test
+ public void testManualAssignmentChangeWithAutoCommitEnabled() {
+ int rebalanceTimeoutMs = 60000;
+ int sessionTimeoutMs = 30000;
+ int heartbeatIntervalMs = 3000;
+ int autoCommitIntervalMs = 1000;
+
+ Time time = new MockTime();
+ MockClient client = new MockClient(time);
+ Map<String, Integer> tpCounts = new HashMap<>();
+ tpCounts.put(topic, 1);
+ tpCounts.put(topic2, 1);
+ Cluster cluster = TestUtils.singletonCluster(tpCounts);
+ Node node = cluster.nodes().get(0);
+ client.setNode(node);
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ metadata.update(cluster, time.milliseconds());
+ PartitionAssignor assignor = new RangeAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
+
+ // lookup coordinator
+ client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+ Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+ // manual assignment
+ consumer.assign(Arrays.asList(tp0));
+ consumer.seekToBeginning(Arrays.asList(tp0));
+
+ // fetch offset for one topic
+ client.prepareResponseFrom(
+ offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE.code()),
+ coordinator);
+ assertEquals(0, consumer.committed(tp0).offset());
+
+ // verify that assignment immediately changes
+ assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+
+ // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
+ // we just lookup the starting position and send the record fetch.
+ client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE.code()));
+ client.prepareResponse(fetchResponse(tp0, 10L, 1));
+
+ ConsumerRecords<String, String> records = consumer.poll(0);
+ assertEquals(1, records.count());
+ assertEquals(11L, consumer.position(tp0));
+
+ // mock the offset commit response for to be revoked partitions
+ AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, tp0, 11);
+
+ // new manual assignment
+ consumer.assign(Arrays.asList(t2p0));
+
+ // verify that assignment immediately changes
+ assertTrue(consumer.assignment().equals(Collections.singleton(t2p0)));
+ // verify that the offset commits occurred as expected
+ assertTrue(commitReceived.get());
+
+ consumer.close();
+ }
+
+ @Test
+ public void testManualAssignmentChangeWithAutoCommitDisabled() {
+ int rebalanceTimeoutMs = 60000;
+ int sessionTimeoutMs = 30000;
+ int heartbeatIntervalMs = 3000;
+ int autoCommitIntervalMs = 1000;
+
+ Time time = new MockTime();
+ MockClient client = new MockClient(time);
+ Map<String, Integer> tpCounts = new HashMap<>();
+ tpCounts.put(topic, 1);
+ tpCounts.put(topic2, 1);
+ Cluster cluster = TestUtils.singletonCluster(tpCounts);
+ Node node = cluster.nodes().get(0);
+ client.setNode(node);
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ metadata.update(cluster, time.milliseconds());
+ PartitionAssignor assignor = new RangeAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+ rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
+
+ // lookup coordinator
+ client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+ Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+
+ // manual assignment
+ consumer.assign(Arrays.asList(tp0));
+ consumer.seekToBeginning(Arrays.asList(tp0));
+
+ // fetch offset for one topic
+ client.prepareResponseFrom(
+ offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE.code()),
+ coordinator);
+ assertEquals(0, consumer.committed(tp0).offset());
+
+ // verify that assignment immediately changes
+ assertTrue(consumer.assignment().equals(Collections.singleton(tp0)));
+
+ // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
+ // we just lookup the starting position and send the record fetch.
+ client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 10L), Errors.NONE.code()));
+ client.prepareResponse(fetchResponse(tp0, 10L, 1));
+
+ ConsumerRecords<String, String> records = consumer.poll(0);
+ assertEquals(1, records.count());
+ assertEquals(11L, consumer.position(tp0));
+
+ // new manual assignment
+ consumer.assign(Arrays.asList(t2p0));
+
+ // verify that assignment immediately changes
+ assertTrue(consumer.assignment().equals(Collections.singleton(t2p0)));
+
+ // the auto commit is disabled, so no offset commit request should be sent
+ for (ClientRequest req: client.requests())
+ assertTrue(req.request().header().apiKey() != ApiKeys.OFFSET_COMMIT.id);
+
+ consumer.close();
+ }
+
+ private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<String, String> consumer) {
+ return new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
@@ -655,39 +907,68 @@ public class KafkaConsumerTest {
for (TopicPartition partition : partitions)
consumer.seek(partition, 0);
}
- });
-
- // lookup coordinator
- client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+ };
+ }
- Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+ private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
+ if (coordinator == null) {
+ // lookup coordinator
+ client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+ coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
+ }
// join group
client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, "memberId", "leaderId", Errors.NONE.code()), coordinator);
// sync group
- client.prepareResponseFrom(syncGroupResponse(Arrays.asList(partition), Errors.NONE.code()), coordinator);
+ client.prepareResponseFrom(syncGroupResponse(partitions, Errors.NONE.code()), coordinator);
- consumer.poll(0);
+ return coordinator;
+ }
- // respond to the outstanding fetch so that we have data available on the next poll
- client.respondFrom(fetchResponse(partition, 0, 5), node);
- client.poll(0, time.milliseconds());
+ private AtomicBoolean prepareHeartbeatResponse(MockClient client, Node coordinator) {
+ final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
+ client.prepareResponseFrom(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ heartbeatReceived.set(true);
+ return true;
+ }
+ }, new HeartbeatResponse(Errors.NONE.code()).toStruct(), coordinator);
+ return heartbeatReceived;
+ }
- consumer.wakeup();
+ private AtomicBoolean prepareOffsetCommitResponse(MockClient client, Node coordinator, final Map<TopicPartition, Long> partitionOffsets) {
+ final AtomicBoolean commitReceived = new AtomicBoolean(true);
+ Map<TopicPartition, Short> response = new HashMap<>();
+ for (TopicPartition partition : partitionOffsets.keySet())
+ response.put(partition, Errors.NONE.code());
- try {
- consumer.poll(0);
- fail();
- } catch (WakeupException e) {
- }
+ client.prepareResponseFrom(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
+ for (Map.Entry<TopicPartition, Long> partitionOffset : partitionOffsets.entrySet()) {
+ OffsetCommitRequest.PartitionData partitionData = commitRequest.offsetData().get(partitionOffset.getKey());
+ // verify that the expected offset has been committed
+ if (partitionData.offset != partitionOffset.getValue()) {
+ commitReceived.set(false);
+ return false;
+ }
+ }
+ return true;
+ }
+ }, offsetCommitResponse(response), coordinator);
+ return commitReceived;
+ }
- // make sure the position hasn't been updated
- assertEquals(0, consumer.position(partition));
+ private AtomicBoolean prepareOffsetCommitResponse(MockClient client, Node coordinator, final TopicPartition partition, final long offset) {
+ return prepareOffsetCommitResponse(client, coordinator, Collections.singletonMap(partition, offset));
+ }
- // the next poll should return the completed fetch
- ConsumerRecords<String, String> records = consumer.poll(0);
- assertEquals(5, records.count());
+ private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
+ OffsetCommitResponse response = new OffsetCommitResponse(responseData);
+ return response.toStruct();
}
private Struct joinGroupFollowerResponse(PartitionAssignor assignor, int generationId, String memberId, String leaderId, short error) {
@@ -717,16 +998,27 @@ public class KafkaConsumerTest {
return new ListOffsetResponse(partitionData).toStruct();
}
- private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) {
- MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
- for (int i = 0; i < count; i++)
- records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
- records.close();
- FetchResponse response = new FetchResponse(Collections.singletonMap(
- tp, new FetchResponse.PartitionData(Errors.NONE.code(), 5, records.buffer())), 0);
+ private Struct fetchResponse(Map<TopicPartition, FetchInfo> fetches) {
+ Map<TopicPartition, PartitionData> tpResponses = new HashMap<>();
+ for (Map.Entry<TopicPartition, FetchInfo> fetchEntry : fetches.entrySet()) {
+ TopicPartition partition = fetchEntry.getKey();
+ long fetchOffset = fetchEntry.getValue().offset;
+ int fetchCount = fetchEntry.getValue().count;
+ MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+ for (int i = 0; i < fetchCount; i++)
+ records.append(fetchOffset + i, 0L, ("key-" + i).getBytes(), ("value-" + i).getBytes());
+ records.close();
+ tpResponses.put(partition, new FetchResponse.PartitionData(Errors.NONE.code(), 0, records.buffer()));
+ }
+ FetchResponse response = new FetchResponse(tpResponses, 0);
return response.toStruct();
}
+ private Struct fetchResponse(TopicPartition partition, long fetchOffset, int count) {
+ FetchInfo fetchInfo = new FetchInfo(fetchOffset, count);
+ return fetchResponse(Collections.singletonMap(partition, fetchInfo));
+ }
+
private KafkaConsumer<String, String> newConsumer(Time time,
KafkaClient client,
Metadata metadata,
@@ -734,6 +1026,7 @@ public class KafkaConsumerTest {
int rebalanceTimeoutMs,
int sessionTimeoutMs,
int heartbeatIntervalMs,
+ boolean autoCommitEnabled,
int autoCommitIntervalMs) {
// create a consumer with mocked time and mocked network
@@ -742,7 +1035,6 @@ public class KafkaConsumerTest {
String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
long requestTimeoutMs = 30000;
- boolean autoCommitEnabled = true;
boolean excludeInternalTopics = true;
int minBytes = 1;
int maxWaitMs = 500;
@@ -808,10 +1100,17 @@ public class KafkaConsumerTest {
metrics,
subscriptions,
metadata,
- autoCommitEnabled,
- autoCommitIntervalMs,
- heartbeatIntervalMs,
retryBackoffMs,
requestTimeoutMs);
}
+
+ private static class FetchInfo {
+ long offset;
+ int count;
+
+ FetchInfo(long offset, int count) {
+ this.offset = offset;
+ this.count = count;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54767bbb/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 783f0e6..a950cad 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -39,8 +39,9 @@ public class SubscriptionStateTest {
private final SubscriptionState state = new SubscriptionState(OffsetResetStrategy.EARLIEST);
private final String topic = "test";
private final String topic1 = "test1";
- private final TopicPartition tp0 = new TopicPartition("test", 0);
- private final TopicPartition tp1 = new TopicPartition("test", 1);
+ private final TopicPartition tp0 = new TopicPartition(topic, 0);
+ private final TopicPartition tp1 = new TopicPartition(topic, 1);
+ private final TopicPartition t1p0 = new TopicPartition(topic1, 0);
private final MockRebalanceListener rebalanceListener = new MockRebalanceListener();
@Test
@@ -60,6 +61,62 @@ public class SubscriptionStateTest {
}
@Test
+ public void partitionAssignmentChangeOnTopicSubscription() {
+ state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
+ // assigned partitions should immediately change
+ assertEquals(2, state.assignedPartitions().size());
+ assertTrue(state.assignedPartitions().contains(tp0));
+ assertTrue(state.assignedPartitions().contains(tp1));
+
+ state.unsubscribe();
+ // assigned partitions should immediately change
+ assertTrue(state.assignedPartitions().isEmpty());
+
+ state.subscribe(singleton(topic1), rebalanceListener);
+ // assigned partitions should remain unchanged
+ assertTrue(state.assignedPartitions().isEmpty());
+
+ state.assignFromSubscribed(asList(t1p0));
+ // assigned partitions should immediately change
+ assertEquals(singleton(t1p0), state.assignedPartitions());
+
+ state.subscribe(singleton(topic), rebalanceListener);
+ // assigned partitions should remain unchanged
+ assertEquals(singleton(t1p0), state.assignedPartitions());
+
+ state.unsubscribe();
+ // assigned partitions should immediately change
+ assertTrue(state.assignedPartitions().isEmpty());
+ }
+
+ @Test
+ public void partitionAssignmentChangeOnPatternSubscription() {
+ state.subscribe(Pattern.compile(".*"), rebalanceListener);
+ // assigned partitions should remain unchanged
+ assertTrue(state.assignedPartitions().isEmpty());
+
+ state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1)));
+ // assigned partitions should remain unchanged
+ assertTrue(state.assignedPartitions().isEmpty());
+
+ state.assignFromSubscribed(asList(tp1));
+ // assigned partitions should immediately change
+ assertEquals(singleton(tp1), state.assignedPartitions());
+
+ state.subscribe(Pattern.compile(".*t"), rebalanceListener);
+ // assigned partitions should remain unchanged
+ assertEquals(singleton(tp1), state.assignedPartitions());
+
+ state.subscribeFromPattern(singleton(topic));
+ // assigned partitions should remain unchanged
+ assertEquals(singleton(tp1), state.assignedPartitions());
+
+ state.unsubscribe();
+ // assigned partitions should immediately change
+ assertTrue(state.assignedPartitions().isEmpty());
+ }
+
+ @Test
public void partitionReset() {
state.assignFromUser(singleton(tp0));
state.seek(tp0, 5);
@@ -217,5 +274,5 @@ public class SubscriptionStateTest {
}
}
-
+
}
|