kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 2.3 updated: KAFKA-8449: Restart tasks on reconfiguration under incremental cooperative rebalancing (#6850)
Date Mon, 03 Jun 2019 16:21:58 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 09313b5  KAFKA-8449: Restart tasks on reconfiguration under incremental cooperative
rebalancing (#6850)
09313b5 is described below

commit 09313b5fda6ecf629a29d2e8285c6b21c91ddff2
Author: Konstantine Karantasis <konstantine@confluent.io>
AuthorDate: Mon Jun 3 09:13:40 2019 -0700

    KAFKA-8449: Restart tasks on reconfiguration under incremental cooperative rebalancing
(#6850)
    
    Restart task on reconfiguration under incremental cooperative rebalancing, and keep execution
paths separate for config updates between eager and cooperative. Include the group generation
in the log message when the worker receives its assignment.
    
    Author: Konstantine Karantasis <konstantine@confluent.io>
    Reviewer: Randall Hauch <rhauch@gmail.com>
---
 .../runtime/distributed/DistributedHerder.java     | 203 ++++++++++++++++-----
 .../integration/MonitorableSourceConnector.java    |   5 +-
 .../RebalanceSourceConnectorsIntegrationTest.java  |  63 ++++++-
 3 files changed, 220 insertions(+), 51 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 585836e..52709f7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -80,6 +80,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
 
@@ -141,6 +143,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     // and the from other nodes are safe to process
     private boolean rebalanceResolved;
     private ExtendedAssignment runningAssignment = ExtendedAssignment.empty();
+    private Set<ConnectorTaskId> tasksToRestart = new HashSet<>();
     private ExtendedAssignment assignment;
     private boolean canReadConfigs;
     private ClusterConfigState configState;
@@ -151,6 +154,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
     // Config updates can be collected and applied together when possible. Also, we need
to take care to rebalance when
     // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
     private Set<String> connectorConfigUpdates = new HashSet<>();
+    private Set<ConnectorTaskId> taskConfigUpdates = new HashSet<>();
     // Similarly collect target state changes (when observed by the config storage listener)
for handling in the
     // herder's main thread.
     private Set<String> connectorTargetStateChanges = new HashSet<>();
@@ -304,51 +308,47 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
         }
 
         // Process any configuration updates
-        Set<String> connectorConfigUpdatesCopy = null;
-        Set<String> connectorTargetStateChangesCopy = null;
-        synchronized (this) {
-            if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty() || !connectorTargetStateChanges.isEmpty())
{
-                // Connector reconfigs only need local updates since there is no coordination
between workers required.
-                // However, if connectors were added or removed, work needs to be rebalanced
since we have more work
-                // items to distribute among workers.
-                configState = configBackingStore.snapshot();
-
-                if (needsReconfigRebalance) {
-                    // Task reconfigs require a rebalance. Request the rebalance, clean out
state, and then restart
-                    // this loop, which will then ensure the rebalance occurs without any
other requests being
-                    // processed until it completes.
-                    member.requestRejoin();
-                    // Any connector config updates or target state changes will be addressed
during the rebalance too
-                    connectorConfigUpdates.clear();
-                    connectorTargetStateChanges.clear();
-                    needsReconfigRebalance = false;
-                    log.debug("Requesting rebalance due to reconfiguration of tasks (needsReconfigRebalance:
{})",
-                            needsReconfigRebalance);
-                    return;
-                } else {
-                    if (!connectorConfigUpdates.isEmpty()) {
-                        // We can't start/stop while locked since starting connectors can
cause task updates that will
-                        // require writing configs, which in turn make callbacks into this
class from another thread that
-                        // require acquiring a lock. This leads to deadlock. Instead, just
copy the info we need and process
-                        // the updates after unlocking.
-                        connectorConfigUpdatesCopy = connectorConfigUpdates;
-                        connectorConfigUpdates = new HashSet<>();
-                    }
+        AtomicReference<Set<String>> connectorConfigUpdatesCopy = new AtomicReference<>();
+        AtomicReference<Set<String>> connectorTargetStateChangesCopy = new AtomicReference<>();
+        AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy = new AtomicReference<>();
+
+        boolean shouldReturn;
+        if (member.currentProtocolVersion() == CONNECT_PROTOCOL_V0) {
+            shouldReturn = updateConfigsWithEager(connectorConfigUpdatesCopy,
+                    connectorTargetStateChangesCopy);
+            // With eager protocol we should return immediately if needsReconfigRebalance
has
+            // been set to retain the old workflow
+            if (shouldReturn) {
+                return;
+            }
 
-                    if (!connectorTargetStateChanges.isEmpty()) {
-                        // Similarly for target state changes which can cause connectors
to be restarted
-                        connectorTargetStateChangesCopy = connectorTargetStateChanges;
-                        connectorTargetStateChanges = new HashSet<>();
-                    }
-                }
+            if (connectorConfigUpdatesCopy.get() != null) {
+                processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
             }
-        }
 
-        if (connectorConfigUpdatesCopy != null)
-            processConnectorConfigUpdates(connectorConfigUpdatesCopy);
+            if (connectorTargetStateChangesCopy.get() != null) {
+                processTargetStateChanges(connectorTargetStateChangesCopy.get());
+            }
+        } else {
+            shouldReturn = updateConfigsWithIncrementalCooperative(connectorConfigUpdatesCopy,
+                    connectorTargetStateChangesCopy, taskConfigUpdatesCopy);
+
+            if (connectorConfigUpdatesCopy.get() != null) {
+                processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
+            }
+
+            if (connectorTargetStateChangesCopy.get() != null) {
+                processTargetStateChanges(connectorTargetStateChangesCopy.get());
+            }
 
-        if (connectorTargetStateChangesCopy != null)
-            processTargetStateChanges(connectorTargetStateChangesCopy);
+            if (taskConfigUpdatesCopy.get() != null) {
+                processTaskConfigUpdatesWithIncrementalCooperative(taskConfigUpdatesCopy.get());
+            }
+
+            if (shouldReturn) {
+                return;
+            }
+        }
 
         // Let the group take any actions it needs to
         try {
@@ -360,6 +360,95 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
         }
     }
 
+    private synchronized boolean updateConfigsWithEager(AtomicReference<Set<String>>
connectorConfigUpdatesCopy,
+                                                        AtomicReference<Set<String>>
connectorTargetStateChangesCopy) {
+        // This branch is here to avoid creating a snapshot if not needed
+        if (needsReconfigRebalance
+                || !connectorConfigUpdates.isEmpty()
+                || !connectorTargetStateChanges.isEmpty()) {
+            // Connector reconfigs only need local updates since there is no coordination
between workers required.
+            // However, if connectors were added or removed, work needs to be rebalanced
since we have more work
+            // items to distribute among workers.
+            configState = configBackingStore.snapshot();
+
+            if (needsReconfigRebalance) {
+                // Task reconfigs require a rebalance. Request the rebalance, clean out state,
and then restart
+                // this loop, which will then ensure the rebalance occurs without any other
requests being
+                // processed until it completes.
+                log.debug("Requesting rebalance due to reconfiguration of tasks (needsReconfigRebalance:
{})",
+                        needsReconfigRebalance);
+                member.requestRejoin();
+                needsReconfigRebalance = false;
+                // Any connector config updates or target state changes will be addressed
during the rebalance too
+                connectorConfigUpdates.clear();
+                connectorTargetStateChanges.clear();
+                return true;
+            } else {
+                if (!connectorConfigUpdates.isEmpty()) {
+                    // We can't start/stop while locked since starting connectors can cause
task updates that will
+                    // require writing configs, which in turn make callbacks into this class
from another thread that
+                    // require acquiring a lock. This leads to deadlock. Instead, just copy
the info we need and process
+                    // the updates after unlocking.
+                    connectorConfigUpdatesCopy.set(connectorConfigUpdates);
+                    connectorConfigUpdates = new HashSet<>();
+                }
+
+                if (!connectorTargetStateChanges.isEmpty()) {
+                    // Similarly for target state changes which can cause connectors to be
restarted
+                    connectorTargetStateChangesCopy.set(connectorTargetStateChanges);
+                    connectorTargetStateChanges = new HashSet<>();
+                }
+            }
+        }
+        return false;
+    }
+
+    private synchronized boolean updateConfigsWithIncrementalCooperative(AtomicReference<Set<String>>
connectorConfigUpdatesCopy,
+                                                                         AtomicReference<Set<String>>
connectorTargetStateChangesCopy,
+                                                                         AtomicReference<Set<ConnectorTaskId>>
taskConfigUpdatesCopy) {
+        boolean retValue = false;
+        // This branch is here to avoid creating a snapshot if not needed
+        if (needsReconfigRebalance
+                || !connectorConfigUpdates.isEmpty()
+                || !connectorTargetStateChanges.isEmpty()
+                || !taskConfigUpdates.isEmpty()) {
+            // Connector reconfigs only need local updates since there is no coordination
between workers required.
+            // However, if connectors were added or removed, work needs to be rebalanced
since we have more work
+            // items to distribute among workers.
+            configState = configBackingStore.snapshot();
+
+            if (needsReconfigRebalance) {
+                log.debug("Requesting rebalance due to reconfiguration of tasks (needsReconfigRebalance:
{})",
+                        needsReconfigRebalance);
+                member.requestRejoin();
+                needsReconfigRebalance = false;
+                retValue = true;
+            }
+
+            if (!connectorConfigUpdates.isEmpty()) {
+                // We can't start/stop while locked since starting connectors can cause task
updates that will
+                // require writing configs, which in turn make callbacks into this class
from another thread that
+                // require acquiring a lock. This leads to deadlock. Instead, just copy the
info we need and process
+                // the updates after unlocking.
+                connectorConfigUpdatesCopy.set(connectorConfigUpdates);
+                connectorConfigUpdates = new HashSet<>();
+            }
+
+            if (!connectorTargetStateChanges.isEmpty()) {
+                // Similarly for target state changes which can cause connectors to be restarted
+                connectorTargetStateChangesCopy.set(connectorTargetStateChanges);
+                connectorTargetStateChanges = new HashSet<>();
+            }
+
+            if (!taskConfigUpdates.isEmpty()) {
+                // Similarly for task config updates
+                taskConfigUpdatesCopy.set(taskConfigUpdates);
+                taskConfigUpdates = new HashSet<>();
+            }
+        }
+        return retValue;
+    }
+
     private void processConnectorConfigUpdates(Set<String> connectorConfigUpdates)
{
         // If we only have connector config updates, we can just bounce the updated connectors
that are
         // currently assigned to this worker.
@@ -396,6 +485,21 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
         }
     }
 
+    private void processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId>
taskConfigUpdates) {
+        Set<ConnectorTaskId> localTasks = assignment == null
+                                          ? Collections.emptySet()
+                                          : new HashSet<>(assignment.tasks());
+        Set<String> connectorsWhoseTasksToStop = taskConfigUpdates.stream()
+                .map(ConnectorTaskId::connector).collect(Collectors.toSet());
+
+        List<ConnectorTaskId> tasksToStop = localTasks.stream()
+                .filter(taskId -> connectorsWhoseTasksToStop.contains(taskId.connector()))
+                .collect(Collectors.toList());
+        log.info("Handling task config update by restarting tasks {}", tasksToStop);
+        worker.stopAndAwaitTasks(tasksToStop);
+        tasksToRestart.addAll(tasksToStop);
+    }
+
     // public for testing
     public void halt() {
         synchronized (this) {
@@ -900,6 +1004,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
             callables.add(getConnectorStartingCallable(connectorName));
         }
 
+        // These tasks have been stopped by this worker due to task reconfiguration. In order
to
+        // restart them, they are removed just before the overall task startup from the set
of
+        // currently running tasks. Therefore, they'll be restarted only if they are included
in
+        // the assignment that was just received after rebalancing.
+        runningAssignment.tasks().removeAll(tasksToRestart);
+        tasksToRestart.clear();
         for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(), runningAssignment.tasks()))
{
             callables.add(getTaskStartingCallable(taskId));
         }
@@ -1172,12 +1282,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
         public void onTaskConfigUpdate(Collection<ConnectorTaskId> tasks) {
             log.info("Tasks {} configs updated", tasks);
 
-            // Stage the update and wake up the work thread. No need to record the set of
tasks here because task reconfigs
-            // always need a rebalance to ensure offsets get committed.
+            // Stage the update and wake up the work thread.
+            // The set of tasks is recorder for incremental cooperative rebalancing, in which
+            // tasks don't get restarted unless they are balanced between workers.
+            // With eager rebalancing there's no need to record the set of tasks because
task reconfigs
+            // always need a rebalance to ensure offsets get committed. In eager rebalancing
the
+            // recorded set of tasks remains unused.
             // TODO: As an optimization, some task config updates could avoid a rebalance.
In particular, single-task
             // connectors clearly don't need any coordination.
             synchronized (DistributedHerder.this) {
                 needsReconfigRebalance = true;
+                taskConfigUpdates.addAll(tasks);
             }
             member.wakeup();
         }
@@ -1279,7 +1394,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
             // catch up (or backoff if we fail) not executed in a callback, and so we'll
be able to invoke other
             // group membership actions (e.g., we may need to explicitly leave the group
if we cannot handle the
             // assigned tasks).
-            log.info("Joined group and got assignment: {}", assignment);
+            log.info("Joined group at generation {} and got assignment: {}", generation,
assignment);
             synchronized (DistributedHerder.this) {
                 DistributedHerder.this.assignment = assignment;
                 DistributedHerder.this.generation = generation;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 8bc8953..2ca7698 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -44,6 +44,7 @@ import java.util.stream.LongStream;
 public class MonitorableSourceConnector extends TestSourceConnector {
     private static final Logger log = LoggerFactory.getLogger(MonitorableSourceConnector.class);
 
+    public static final String TOPIC_CONFIG = "topic";
     private String connectorName;
     private ConnectorHandle connectorHandle;
     private Map<String, String> commonConfigs;
@@ -105,7 +106,7 @@ public class MonitorableSourceConnector extends TestSourceConnector {
         public void start(Map<String, String> props) {
             taskId = props.get("task.id");
             connectorName = props.get("connector.name");
-            topicName = props.getOrDefault("topic", "sequential-topic");
+            topicName = props.getOrDefault(TOPIC_CONFIG, "sequential-topic");
             throughput = Long.valueOf(props.getOrDefault("throughput", "-1"));
             batchSize = Integer.valueOf(props.getOrDefault("messages.per.poll", "1"));
             taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
@@ -113,7 +114,7 @@ public class MonitorableSourceConnector extends TestSourceConnector {
                     context.offsetStorageReader().offset(Collections.singletonMap("task.id",
taskId)))
                     .orElse(Collections.emptyMap());
             startingSeqno = Optional.ofNullable((Long) offset.get("saved")).orElse(0L);
-            log.info("Started {} task {}", this.getClass().getSimpleName(), taskId);
+            log.info("Started {} task {} with properties {}", this.getClass().getSimpleName(),
taskId, props);
             throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
         }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index b0125b2..d3cc8db 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -39,17 +39,18 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
-import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Integration tests for incremental cooperative rebalancing between Connect workers
@@ -109,7 +110,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
@@ -130,6 +131,58 @@ public class RebalanceSourceConnectorsIntegrationTest {
     }
 
     @Test
+    public void testReconfigConnector() throws Exception {
+        ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+
+        // create test topic
+        String anotherTopic = "another-topic";
+        connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+        connect.kafka().createTopic(anotherTopic, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put("throughput", String.valueOf(1));
+        props.put("messages.per.poll", String.valueOf(10));
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+
+        // start a source connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+
+        int numRecordsProduced = 100;
+        int recordTransferDurationMs = 5000;
+
+        // consume all records from the source topic or fail, to ensure that they were correctly
produced
+        int recordNum = connect.kafka().consume(numRecordsProduced, recordTransferDurationMs,
TOPIC_NAME).count();
+        assertTrue("Not enough records produced by source connector. Expected at least: "
+ numRecordsProduced + " + but got " + recordNum,
+                recordNum >= numRecordsProduced);
+
+        // Reconfigure the source connector by changing the Kafka topic used as output
+        props.put(TOPIC_CONFIG, anotherTopic);
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+
+        // expect all records to be produced by the connector
+        connectorHandle.expectedRecords(numRecordsProduced);
+
+        // expect all records to be produced by the connector
+        connectorHandle.expectedCommits(numRecordsProduced);
+
+        // consume all records from the source topic or fail, to ensure that they were correctly
produced
+        recordNum = connect.kafka().consume(numRecordsProduced, recordTransferDurationMs,
anotherTopic).count();
+        assertTrue("Not enough records produced by source connector. Expected at least: "
+ numRecordsProduced + " + but got " + recordNum,
+                recordNum >= numRecordsProduced);
+    }
+
+    @Test
     public void testDeleteConnector() throws Exception {
         // create test topic
         connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
@@ -140,7 +193,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
@@ -181,7 +234,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
@@ -224,7 +277,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 


Mime
View raw message