kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7500: MirrorMaker 2.0 (KIP-382)
Date Mon, 07 Oct 2019 08:31:55 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4ac892c  KAFKA-7500: MirrorMaker 2.0 (KIP-382)
4ac892c is described below

commit 4ac892ca783acab8e574b9b24d17e767eedb3d5f
Author: Ryanne Dolan <ryannedolan@gmail.com>
AuthorDate: Mon Oct 7 13:57:54 2019 +0530

    KAFKA-7500: MirrorMaker 2.0 (KIP-382)
    
    Implementation of [KIP-382 "MirrorMaker 2.0"](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0)
    
    Author: Ryanne Dolan <ryannedolan@gmail.com>
    Author: Arun Mathew <arunmathew88@gmail.com>
    Author: In Park <inpark@cloudera.com>
    Author: Andre Price <obsoleted@users.noreply.github.com>
    Author: christian.hagel@rio.cloud <christian.hagel@rio.cloud>
    
    Reviewers: Eno Thereska <eno.thereska@gmail.com>, William Hammond <william.t.hammond@gmail.com>, Viktor Somogyi <viktorsomogyi@gmail.com>, Jakub Korzeniowski, Tim Carey-Smith, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Arun Mathew, Jeremy-l-ford, vpernin, Oleg Kasian <oleg.kasian@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Qihong Chen, Sriharsha Chintalapani <sriharsha@apache.org>, Jun Rao <junrao@gmail.com>, Randall Hauch <rhauch@gmail.com>, Manikumar Reddy <ma [...]
    
    Closes #6295 from ryannedolan/KIP-382
---
 bin/connect-mirror-maker.sh                        |  45 ++
 bin/kafka-run-class.sh                             |   2 +-
 build.gradle                                       |  86 ++-
 checkstyle/import-control.xml                      |  15 +
 config/connect-mirror-maker.properties             |  37 ++
 .../apache/kafka/connect/source/SourceTask.java    |  38 +-
 .../apache/kafka/connect/mirror/Checkpoint.java    | 184 +++++++
 .../connect/mirror/DefaultReplicationPolicy.java   |  73 +++
 .../org/apache/kafka/connect/mirror/Heartbeat.java | 145 +++++
 .../apache/kafka/connect/mirror/MirrorClient.java  | 243 +++++++++
 .../kafka/connect/mirror/MirrorClientConfig.java   | 135 +++++
 .../kafka/connect/mirror/RemoteClusterUtils.java   |  97 ++++
 .../kafka/connect/mirror/ReplicationPolicy.java    |  60 ++
 .../kafka/connect/mirror/SourceAndTarget.java      |  52 ++
 .../kafka/connect/mirror/MirrorClientTest.java     | 163 ++++++
 connect/mirror/README.md                           | 222 ++++++++
 .../kafka/connect/mirror/ConfigPropertyFilter.java |  37 ++
 .../mirror/DefaultConfigPropertyFilter.java        |  77 +++
 .../kafka/connect/mirror/DefaultGroupFilter.java   |  91 ++++
 .../kafka/connect/mirror/DefaultTopicFilter.java   |  91 ++++
 .../apache/kafka/connect/mirror/GroupFilter.java   |  37 ++
 .../connect/mirror/MirrorCheckpointConnector.java  | 156 ++++++
 .../kafka/connect/mirror/MirrorCheckpointTask.java | 193 +++++++
 .../connect/mirror/MirrorConnectorConfig.java      | 601 +++++++++++++++++++++
 .../connect/mirror/MirrorHeartbeatConnector.java   |  71 +++
 .../kafka/connect/mirror/MirrorHeartbeatTask.java  |  84 +++
 .../apache/kafka/connect/mirror/MirrorMaker.java   | 309 +++++++++++
 .../kafka/connect/mirror/MirrorMakerConfig.java    | 255 +++++++++
 .../apache/kafka/connect/mirror/MirrorMetrics.java | 208 +++++++
 .../connect/mirror/MirrorSourceConnector.java      | 390 +++++++++++++
 .../kafka/connect/mirror/MirrorSourceTask.java     | 293 ++++++++++
 .../kafka/connect/mirror/MirrorTaskConfig.java     |  75 +++
 .../apache/kafka/connect/mirror/MirrorUtils.java   | 116 ++++
 .../apache/kafka/connect/mirror/OffsetSync.java    | 120 ++++
 .../kafka/connect/mirror/OffsetSyncStore.java      |  84 +++
 .../org/apache/kafka/connect/mirror/Scheduler.java | 115 ++++
 .../apache/kafka/connect/mirror/TopicFilter.java   |  37 ++
 .../kafka/connect/mirror/CheckpointTest.java       |  40 ++
 .../apache/kafka/connect/mirror/HeartbeatTest.java |  38 ++
 .../connect/mirror/MirrorCheckpointTaskTest.java   |  67 +++
 .../connect/mirror/MirrorConnectorConfigTest.java  | 109 ++++
 .../mirror/MirrorConnectorsIntegrationTest.java    | 302 +++++++++++
 .../connect/mirror/MirrorMakerConfigTest.java      | 234 ++++++++
 .../connect/mirror/MirrorSourceConnectorTest.java  | 115 ++++
 .../kafka/connect/mirror/MirrorSourceTaskTest.java |  99 ++++
 .../kafka/connect/mirror/OffsetSyncStoreTest.java  |  67 +++
 .../kafka/connect/mirror/OffsetSyncTest.java       |  39 ++
 connect/mirror/src/test/resources/log4j.properties |  34 ++
 .../kafka/connect/runtime/WorkerSourceTask.java    |   8 +-
 .../runtime/distributed/DistributedHerder.java     |   2 +-
 .../connect/runtime/isolation/PluginUtils.java     |   2 +
 .../integration/MonitorableSourceConnector.java    |   3 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |   6 +-
 .../connect/runtime/isolation/PluginUtilsTest.java |   6 +
 settings.gradle                                    |   2 +
 55 files changed, 6197 insertions(+), 13 deletions(-)

diff --git a/bin/connect-mirror-maker.sh b/bin/connect-mirror-maker.sh
new file mode 100755
index 0000000..a2c040d
--- /dev/null
+++ b/bin/connect-mirror-maker.sh
@@ -0,0 +1,45 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ $# -lt 1 ];
+then
+        echo "USAGE: $0 [-daemon] mm2.properties"
+        exit 1
+fi
+
+base_dir=$(dirname $0)
+
+if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
+    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
+fi
+
+if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
+  export KAFKA_HEAP_OPTS="-Xms256M -Xmx2G"
+fi
+
+EXTRA_ARGS=${EXTRA_ARGS-'-name mirrorMaker'}
+
+COMMAND=$1
+case $COMMAND in
+  -daemon)
+    EXTRA_ARGS="-daemon "$EXTRA_ARGS
+    shift
+    ;;
+  *)
+    ;;
+esac
+
+exec $(dirname $0)/kafka-run-class.sh $EXTRA_ARGS org.apache.kafka.connect.mirror.MirrorMaker "$@"
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 1221860..018e52f 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -139,7 +139,7 @@ do
   CLASSPATH="$CLASSPATH:$dir/*"
 done
 
-for cc_pkg in "api" "transforms" "runtime" "file" "json" "tools" "basic-auth-extension"
+for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client" "json" "tools" "basic-auth-extension"
 do
   for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
   do
diff --git a/build.gradle b/build.gradle
index 1c567e6..65e6a69 100644
--- a/build.gradle
+++ b/build.gradle
@@ -617,7 +617,9 @@ def connectPkgs = [
     'connect:file',
     'connect:json',
     'connect:runtime',
-    'connect:transforms'
+    'connect:transforms',
+    'connect:mirror',
+    'connect:mirror-client'
 ]
 
 def pkgs = [
@@ -860,6 +862,10 @@ project(':core') {
     from(project(':connect:file').configurations.runtime) { into("libs/") }
     from(project(':connect:basic-auth-extension').jar) { into("libs/") }
     from(project(':connect:basic-auth-extension').configurations.runtime) { into("libs/") }
+    from(project(':connect:mirror').jar) { into("libs/") }
+    from(project(':connect:mirror').configurations.runtime) { into("libs/") }
+    from(project(':connect:mirror-client').jar) { into("libs/") }
+    from(project(':connect:mirror-client').configurations.runtime) { into("libs/") }
     from(project(':streams').jar) { into("libs/") }
     from(project(':streams').configurations.runtime) { into("libs/") }
     from(project(':streams:streams-scala').jar) { into("libs/") }
@@ -1817,6 +1823,84 @@ project(':connect:basic-auth-extension') {
   }
 }
 
+project(':connect:mirror') {
+  archivesBaseName = "connect-mirror"
+
+  dependencies {
+    compile project(':connect:api')
+    compile project(':connect:runtime')
+    compile project(':connect:mirror-client')
+    compile project(':clients')
+    compile libs.argparse4j
+    compile libs.slf4jApi
+
+    testCompile libs.junit
+    testCompile project(':clients').sourceSets.test.output
+    testCompile project(':connect:runtime').sourceSets.test.output
+    testCompile project(':core')
+    testCompile project(':core').sourceSets.test.output
+
+    testRuntime project(':connect:runtime')
+    testRuntime libs.slf4jlog4j
+  }
+
+  javadoc {
+    enabled = false
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+      include('log4j*jar')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('connect-*')
+    }
+    into "$buildDir/dependant-libs"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+}
+
+project(':connect:mirror-client') {
+  archivesBaseName = "connect-mirror-client"
+
+  dependencies {
+    compile project(':clients')
+    compile libs.slf4jApi
+
+    testCompile libs.junit
+    testCompile project(':clients').sourceSets.test.output
+
+    testRuntime libs.slf4jlog4j
+  }
+
+  javadoc {
+    enabled = true
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+      include('log4j*jar')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('connect-*')
+    }
+    into "$buildDir/dependant-libs"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+}
+
 task aggregatedJavadoc(type: Javadoc) {
   def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
   source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5f27dde..17e6f57 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -342,6 +342,21 @@
       </subpackage>
     </subpackage>
 
+    <subpackage name="mirror">
+      <allow pkg="org.apache.kafka.clients.consumer" />
+      <allow pkg="org.apache.kafka.connect.source" />
+      <allow pkg="org.apache.kafka.connect.sink" />
+      <allow pkg="org.apache.kafka.connect.storage" />
+      <allow pkg="org.apache.kafka.connect.connector" />
+      <allow pkg="org.apache.kafka.connect.runtime" />
+      <allow pkg="org.apache.kafka.connect.runtime.distributed" />
+      <allow pkg="org.apache.kafka.connect.util" />
+      <allow pkg="org.apache.kafka.connect.converters" />
+      <allow pkg="net.sourceforge.argparse4j" />
+      <!-- for tests -->
+      <allow pkg="org.apache.kafka.connect.integration" />
+    </subpackage>
+
     <subpackage name="runtime">
       <allow pkg="org.apache.kafka.connect" />
       <allow pkg="org.reflections"/>
diff --git a/config/connect-mirror-maker.properties b/config/connect-mirror-maker.properties
new file mode 100644
index 0000000..16c1b79
--- /dev/null
+++ b/config/connect-mirror-maker.properties
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under A or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
+
+# Sample MirrorMaker 2.0 top-level configuration file
+# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties 
+
+# specify any number of cluster aliases
+clusters = A, B, C
+
+# connection information for each cluster
+A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092
+B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092
+C.bootstrap.servers = C_host1:9092, C_host2:9092, C_host3:9092
+
+# enable and configure individual replication flows
+A->B.enabled = true
+A->B.topics = foo-.*
+B->C.enabled = true
+B->C.topics = bar-.*
+
+# customize as needed
+# replication.policy.separator = _
+# sync.topic.acls.enabled = false
+# emit.heartbeats.interval.seconds = 5
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index 8767a62..4dea6ce 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.source;
 
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
 import java.util.Map;
@@ -88,7 +89,13 @@ public abstract class SourceTask implements Task {
 
     /**
      * <p>
-     * Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation.
+     * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
+     * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker.
+     * </p>
+     * <p>
+     * This is an alias for {@link commitRecord(SourceRecord, RecordMetadata)} for backwards compatibility. The default
+     * implementation of {@link commitRecord(SourceRecord, RecordMetadata)} just calls this method. It is not necessary
+     * to override both methods.
      * </p>
      * <p>
      * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
@@ -96,10 +103,37 @@ public abstract class SourceTask implements Task {
      * in their own system.
      * </p>
      *
-     * @param record {@link SourceRecord} that was successfully sent via the producer.
+     * @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
      * @throws InterruptedException
+     * @see commitRecord(SourceRecord, RecordMetadata)
      */
     public void commitRecord(SourceRecord record) throws InterruptedException {
         // This space intentionally left blank.
     }
+
+    /**
+     * <p>
+     * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
+     * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. In this case
+     * {@code metadata} will be null.
+     * </p>
+     * <p>
+     * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
+     * automatically. This hook is provided for systems that also need to store offsets internally
+     * in their own system.
+     * </p>
+     * <p>
+     * The default implementation just calls @{link commitRecord(SourceRecord)}, which is a nop by default. It is
+     * not necessary to implement both methods.
+     * </p>
+     *
+     * @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
+     * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered
+     * @throws InterruptedException
+     */
+    public void commitRecord(SourceRecord record, RecordMetadata metadata)
+            throws InterruptedException {
+        // by default, just call other method for backwards compatability
+        commitRecord(record);
+    }
 }
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
new file mode 100644
index 0000000..74db746
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.nio.ByteBuffer;
+
+/** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote consumer group state. */
+public class Checkpoint {
+    public static final String TOPIC_KEY = "topic";
+    public static final String PARTITION_KEY = "partition";
+    public static final String CONSUMER_GROUP_ID_KEY = "group";
+    public static final String UPSTREAM_OFFSET_KEY = "upstreamOffset";
+    public static final String DOWNSTREAM_OFFSET_KEY = "offset";
+    public static final String METADATA_KEY = "metadata";
+    public static final String VERSION_KEY = "version";
+    public static final short VERSION = 0;
+
+    public static final Schema VALUE_SCHEMA_V0 = new Schema(
+            new Field(UPSTREAM_OFFSET_KEY, Type.INT64),
+            new Field(DOWNSTREAM_OFFSET_KEY, Type.INT64),
+            new Field(METADATA_KEY, Type.STRING));
+
+    public static final Schema KEY_SCHEMA = new Schema(
+            new Field(CONSUMER_GROUP_ID_KEY, Type.STRING),
+            new Field(TOPIC_KEY, Type.STRING),
+            new Field(PARTITION_KEY, Type.INT32));
+
+    public static final Schema HEADER_SCHEMA = new Schema(
+            new Field(VERSION_KEY, Type.INT16));
+
+    private String consumerGroupId;
+    private TopicPartition topicPartition;
+    private long upstreamOffset;
+    private long downstreamOffset;
+    private String metadata;
+
+    public Checkpoint(String consumerGroupId, TopicPartition topicPartition, long upstreamOffset,
+            long downstreamOffset, String metadata) {
+        this.consumerGroupId = consumerGroupId;
+        this.topicPartition = topicPartition;
+        this.upstreamOffset = upstreamOffset;
+        this.downstreamOffset = downstreamOffset;
+        this.metadata = metadata;
+    }
+
+    public String consumerGroupId() {
+        return consumerGroupId;
+    }
+
+    public TopicPartition topicPartition() {
+        return topicPartition;
+    }
+
+    public long upstreamOffset() {
+        return upstreamOffset;
+    }
+
+    public long downstreamOffset() {
+        return downstreamOffset;
+    }
+
+    public String metadata() {
+        return metadata;
+    }
+
+    public OffsetAndMetadata offsetAndMetadata() {
+        return new OffsetAndMetadata(downstreamOffset, metadata);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Checkpoint{consumerGroupId=%s, topicPartition=%s, "
+            + "upstreamOffset=%d, downstreamOffset=%d, metatadata=%s}",
+            consumerGroupId, topicPartition, upstreamOffset, downstreamOffset, metadata);
+    }
+
+    ByteBuffer serializeValue(short version) {
+        Struct header = headerStruct(version);
+        Schema valueSchema = valueSchema(version);
+        Struct valueStruct = valueStruct(valueSchema);
+        ByteBuffer buffer = ByteBuffer.allocate(HEADER_SCHEMA.sizeOf(header) + valueSchema.sizeOf(valueStruct));
+        HEADER_SCHEMA.write(buffer, header);
+        valueSchema.write(buffer, valueStruct);
+        buffer.flip();
+        return buffer;
+    }
+
+    ByteBuffer serializeKey() {
+        Struct struct = keyStruct();
+        ByteBuffer buffer = ByteBuffer.allocate(KEY_SCHEMA.sizeOf(struct));
+        KEY_SCHEMA.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    public static Checkpoint deserializeRecord(ConsumerRecord<byte[], byte[]> record) {
+        ByteBuffer value = ByteBuffer.wrap(record.value());
+        Struct header = HEADER_SCHEMA.read(value);
+        short version = header.getShort(VERSION_KEY);
+        Schema valueSchema = valueSchema(version);
+        Struct valueStruct = valueSchema.read(value);
+        long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
+        long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
+        String metadata = valueStruct.getString(METADATA_KEY);
+        Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(record.key()));
+        String group = keyStruct.getString(CONSUMER_GROUP_ID_KEY);
+        String topic = keyStruct.getString(TOPIC_KEY);
+        int partition = keyStruct.getInt(PARTITION_KEY);
+        return new Checkpoint(group, new TopicPartition(topic, partition), upstreamOffset,
+            downstreamOffset, metadata);
+    }
+
+    private static Schema valueSchema(short version) {
+        assert version == 0;
+        return VALUE_SCHEMA_V0;
+    }
+
+    private Struct valueStruct(Schema schema) {
+        Struct struct = new Struct(schema);
+        struct.set(UPSTREAM_OFFSET_KEY, upstreamOffset);
+        struct.set(DOWNSTREAM_OFFSET_KEY, downstreamOffset);
+        struct.set(METADATA_KEY, metadata);
+        return struct;
+    }
+
+    private Struct keyStruct() {
+        Struct struct = new Struct(KEY_SCHEMA);
+        struct.set(CONSUMER_GROUP_ID_KEY, consumerGroupId);
+        struct.set(TOPIC_KEY, topicPartition.topic());
+        struct.set(PARTITION_KEY, topicPartition.partition());
+        return struct;
+    }
+
+    private Struct headerStruct(short version) {
+        Struct struct = new Struct(HEADER_SCHEMA);
+        struct.set(VERSION_KEY, version);
+        return struct;
+    }
+
+    Map<String, ?> connectPartition() {
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(CONSUMER_GROUP_ID_KEY, consumerGroupId);
+        partition.put(TOPIC_KEY, topicPartition.topic());
+        partition.put(PARTITION_KEY, topicPartition.partition());
+        return partition;
+    }
+
+    static String unwrapGroup(Map<String, ?> connectPartition) {
+        return connectPartition.get(CONSUMER_GROUP_ID_KEY).toString();
+    }
+
+    byte[] recordKey() {
+        return serializeKey().array();
+    }
+
+    byte[] recordValue() {
+        return serializeValue(VERSION).array();
+    }
+};
+
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
new file mode 100644
index 0000000..30d7534
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Defines remote topics like "us-west.topic1". The separator is customizable and defaults to a period. */
+public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable {
+    
+    private static final Logger log = LoggerFactory.getLogger(DefaultReplicationPolicy.class);
+
+    // In order to work with various metrics stores, we allow custom separators.
+    public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR;
+    public static final String SEPARATOR_DEFAULT = ".";
+
+    private String separator = SEPARATOR_DEFAULT;
+    private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT));
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        if (props.containsKey(SEPARATOR_CONFIG)) {
+            separator = (String) props.get(SEPARATOR_CONFIG);
+            log.info("Using custom remote topic separator: '{}'", separator);
+            separatorPattern = Pattern.compile(Pattern.quote(separator));
+        }
+    }
+
+    @Override
+    public String formatRemoteTopic(String sourceClusterAlias, String topic) {
+        return sourceClusterAlias + separator + topic;
+    }
+
+    @Override
+    public String topicSource(String topic) {
+        String[] parts = separatorPattern.split(topic);
+        if (parts.length < 2) {
+            // this is not a remote topic
+            return null;
+        } else {
+            return parts[0];
+        }
+    }
+
+    @Override
+    public String upstreamTopic(String topic) {
+        String source = topicSource(topic);
+        if (source == null) {
+            return null;
+        } else {
+            return topic.substring(source.length() + separator.length());
+        }
+    }
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java
new file mode 100644
index 0000000..a34ce9e
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.nio.ByteBuffer;
+
+/** Heartbeat message sent from MirrorHeartbeatTask to target cluster. Heartbeats are always replicated. */
+public class Heartbeat {
+    public static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias";
+    public static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias";
+    public static final String TIMESTAMP_KEY = "timestamp";
+    public static final String VERSION_KEY = "version";
+    public static final short VERSION = 0;
+
+    public static final Schema VALUE_SCHEMA_V0 = new Schema(
+            new Field(TIMESTAMP_KEY, Type.INT64));
+
+    public static final Schema KEY_SCHEMA = new Schema(
+            new Field(SOURCE_CLUSTER_ALIAS_KEY, Type.STRING),
+            new Field(TARGET_CLUSTER_ALIAS_KEY, Type.STRING));
+
+    public static final Schema HEADER_SCHEMA = new Schema(
+            new Field(VERSION_KEY, Type.INT16));
+
+    private String sourceClusterAlias;
+    private String targetClusterAlias;
+    private long timestamp;
+
+    public Heartbeat(String sourceClusterAlias, String targetClusterAlias, long timestamp) {
+        this.sourceClusterAlias = sourceClusterAlias;
+        this.targetClusterAlias = targetClusterAlias;
+        this.timestamp = timestamp;
+    }
+
+    public String sourceClusterAlias() {
+        return sourceClusterAlias;
+    }
+
+    public String targetClusterAlias() {
+        return targetClusterAlias;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Heartbeat{sourceClusterAlias=%s, targetClusterAlias=%s, timestamp=%d}",
+            sourceClusterAlias, targetClusterAlias, timestamp);
+    }
+
+    ByteBuffer serializeValue(short version) {
+        Schema valueSchema = valueSchema(version);
+        Struct header = headerStruct(version);
+        Struct value = valueStruct(valueSchema);
+        ByteBuffer buffer = ByteBuffer.allocate(HEADER_SCHEMA.sizeOf(header) + valueSchema.sizeOf(value));
+        HEADER_SCHEMA.write(buffer, header);
+        valueSchema.write(buffer, value);
+        buffer.flip();
+        return buffer;
+    }
+
+    ByteBuffer serializeKey() {
+        Struct struct = keyStruct();
+        ByteBuffer buffer = ByteBuffer.allocate(KEY_SCHEMA.sizeOf(struct));
+        KEY_SCHEMA.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    public static Heartbeat deserializeRecord(ConsumerRecord<byte[], byte[]> record) {
+        ByteBuffer value = ByteBuffer.wrap(record.value());
+        Struct headerStruct = HEADER_SCHEMA.read(value);
+        short version = headerStruct.getShort(VERSION_KEY);
+        Struct valueStruct = valueSchema(version).read(value);
+        long timestamp = valueStruct.getLong(TIMESTAMP_KEY);
+        Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(record.key()));
+        String sourceClusterAlias = keyStruct.getString(SOURCE_CLUSTER_ALIAS_KEY);
+        String targetClusterAlias = keyStruct.getString(TARGET_CLUSTER_ALIAS_KEY); 
+        return new Heartbeat(sourceClusterAlias, targetClusterAlias, timestamp);    
+    } 
+
+    private Struct headerStruct(short version) {
+        Struct struct = new Struct(HEADER_SCHEMA);
+        struct.set(VERSION_KEY, version);
+        return struct;
+    }
+
+    private Struct valueStruct(Schema schema) {
+        Struct struct = new Struct(schema);
+        struct.set(TIMESTAMP_KEY, timestamp);
+        return struct;
+    }
+
+    private Struct keyStruct() {
+        Struct struct = new Struct(KEY_SCHEMA);
+        struct.set(SOURCE_CLUSTER_ALIAS_KEY, sourceClusterAlias);
+        struct.set(TARGET_CLUSTER_ALIAS_KEY, targetClusterAlias);
+        return struct;
+    }
+
+    Map<String, ?> connectPartition() {
+        Map<String, Object> partition = new HashMap<>();
+        partition.put(SOURCE_CLUSTER_ALIAS_KEY, sourceClusterAlias);
+        partition.put(TARGET_CLUSTER_ALIAS_KEY, targetClusterAlias);
+        return partition;
+    }
+
+    byte[] recordKey() {
+        return serializeKey().array();
+    }
+
+    byte[] recordValue() {
+        return serializeValue(VERSION).array();
+    }
+
+    private static Schema valueSchema(short version) {
+        assert version == 0;
+        return VALUE_SCHEMA_V0;
+    }
+};
+
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
new file mode 100644
index 0000000..17d18ec
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.protocol.types.SchemaException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ 
+import java.time.Duration;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.concurrent.ExecutionException;
+
+/** Interprets MM2's internal topics (checkpoints, heartbeats) on a given cluster.
+ *  <p> 
+ *  Given a top-level "mm2.properties" configuration file, MirrorClients can be constructed
+ *  for individual clusters as follows:
+ *  </p> 
+ *  <pre>
+ *    MirrorMakerConfig mmConfig = new MirrorMakerConfig(props);
+ *    MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster");
+ *    MirrorClient mmClient = new Mirrorclient(mmClientConfig);
+ *  </pre>
+ */
+public class MirrorClient implements AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(MirrorClient.class);
+
+    private AdminClient adminClient;
+    private ReplicationPolicy replicationPolicy;
+    private Map<String, Object> consumerConfig;
+
+    public MirrorClient(Map<String, Object> props) {
+        this(new MirrorClientConfig(props));
+    }
+
+    public MirrorClient(MirrorClientConfig config) {
+        adminClient = AdminClient.create(config.adminConfig());
+        consumerConfig = config.consumerConfig();
+        replicationPolicy = config.replicationPolicy();
+    }
+
+    // for testing
+    MirrorClient(AdminClient adminClient, ReplicationPolicy replicationPolicy,
+            Map<String, Object> consumerConfig) {
+        this.adminClient = adminClient;
+        this.replicationPolicy = replicationPolicy;
+        this.consumerConfig = consumerConfig;
+    }
+
+    /** Close internal clients. */
+    public void close() {
+        adminClient.close();
+    }
+
+    /** Get the ReplicationPolicy instance used to interpret remote topics. This instance is constructed based on
+     *  relevant configuration properties, including {@code replication.policy.class}. */
+    public ReplicationPolicy replicationPolicy() {
+        return replicationPolicy;
+    }
+
+    /** Compute shortest number of hops from an upstream source cluster.
+     *  For example, given replication flow A-&gt;B-&gt;C, there are two hops from A to C.
+     *  Returns -1 if upstream cluster is unreachable.
+     */
+    public int replicationHops(String upstreamClusterAlias) throws InterruptedException {
+        return heartbeatTopics().stream()
+            .map(x -> countHopsForTopic(x, upstreamClusterAlias))
+            .filter(x -> x != -1)
+            .mapToInt(x -> x)
+            .min()
+            .orElse(-1);
+    }
+
+    /** Find all heartbeat topics on this cluster. Heartbeat topics are replicated from other clusters. */
+    public Set<String> heartbeatTopics() throws InterruptedException {
+        return listTopics().stream()
+            .filter(this::isHeartbeatTopic)
+            .collect(Collectors.toSet());
+    }
+
+    /** Find all checkpoint topics on this cluster. */
+    public Set<String> checkpointTopics() throws InterruptedException {
+        return listTopics().stream()
+            .filter(this::isCheckpointTopic)
+            .collect(Collectors.toSet());
+    }
+
+    /** Find upstream clusters, which may be multiple hops away, based on incoming heartbeats. */
+    public Set<String> upstreamClusters() throws InterruptedException {
+        return listTopics().stream()
+            .filter(this::isHeartbeatTopic)
+            .flatMap(x -> allSources(x).stream())
+            .distinct()
+            .collect(Collectors.toSet());
+    }
+
+    /** Find all remote topics on this cluster. This does not include internal topics (heartbeats, checkpoints). */
+    public Set<String> remoteTopics() throws InterruptedException {
+        return listTopics().stream()
+            .filter(this::isRemoteTopic)
+            .collect(Collectors.toSet());
+    }
+
+    /** Find all remote topics that have been replicated directly from the given source cluster. */
+    public Set<String> remoteTopics(String source) throws InterruptedException {
+        return listTopics().stream()
+            .filter(this::isRemoteTopic)
+            .filter(x -> source.equals(replicationPolicy.topicSource(x)))
+            .distinct()
+            .collect(Collectors.toSet());
+    }
+
+    /** Translate a remote consumer group's offsets into corresponding local offsets. Topics are automatically
+     *  renamed according to the ReplicationPolicy.
+     *  @param consumerGroupId group ID of remote consumer group
+     *  @param remoteClusterAlias alias of remote cluster
+     *  @param timeout timeout
+     */
+    public Map<TopicPartition, OffsetAndMetadata> remoteConsumerOffsets(String consumerGroupId,
+            String remoteClusterAlias, Duration timeout) {
+        long deadline = System.currentTimeMillis() + timeout.toMillis();
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig,
+            new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        try {
+            // checkpoint topics are not "remote topics", as they are not replicated. So we don't need
+            // to use ReplicationPolicy to create the checkpoint topic here.
+            String checkpointTopic = remoteClusterAlias + MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX;
+            List<TopicPartition> checkpointAssignment =
+                Collections.singletonList(new TopicPartition(checkpointTopic, 0));
+            consumer.assign(checkpointAssignment);
+            consumer.seekToBeginning(checkpointAssignment);
+            while (System.currentTimeMillis() < deadline && !endOfStream(consumer, checkpointAssignment)) {
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(timeout);
+                for (ConsumerRecord<byte[], byte[]> record : records) {
+                    try {
+                        Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
+                        if (checkpoint.consumerGroupId().equals(consumerGroupId)) {
+                            offsets.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
+                        }
+                    } catch (SchemaException e) {
+                        log.info("Could not deserialize record. Skipping.", e);
+                    }
+                }
+            }
+            log.info("Consumed {} checkpoint records for {} from {}.", offsets.size(),
+                consumerGroupId, checkpointTopic);
+        } finally {
+            consumer.close();
+        }
+        return offsets;
+    }
+
+    Set<String> listTopics() throws InterruptedException {
+        try {
+            return adminClient.listTopics().names().get();
+        } catch (ExecutionException e) {
+            throw new KafkaException(e.getCause());
+        }
+    }
+
+    int countHopsForTopic(String topic, String sourceClusterAlias) {
+        int hops = 0;
+        while (true) {
+            hops++;
+            String source = replicationPolicy.topicSource(topic);
+            if (source == null) {
+                return -1;
+            }
+            if (source.equals(sourceClusterAlias)) {
+                return hops;
+            }
+            topic = replicationPolicy.upstreamTopic(topic);
+        } 
+    }
+
+    boolean isHeartbeatTopic(String topic) {
+        // heartbeats are replicated, so we must use ReplicationPolicy here
+        return MirrorClientConfig.HEARTBEATS_TOPIC.equals(replicationPolicy.originalTopic(topic));
+    }
+
+    boolean isCheckpointTopic(String topic) {
+        // checkpoints are not replicated, so we don't need to use ReplicationPolicy here
+        return topic.endsWith(MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX);
+    }
+
+    boolean isRemoteTopic(String topic) {
+        return !replicationPolicy.isInternalTopic(topic)
+            && replicationPolicy.topicSource(topic) != null;
+    }
+
+    Set<String> allSources(String topic) {
+        Set<String> sources = new HashSet<>();
+        String source = replicationPolicy.topicSource(topic);
+        while (source != null) {
+            sources.add(source);
+            topic = replicationPolicy.upstreamTopic(topic);
+            source = replicationPolicy.topicSource(topic);
+        }
+        return sources;
+    }
+
+    static private boolean endOfStream(Consumer<?, ?> consumer, Collection<TopicPartition> assignments) {
+        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignments);
+        for (TopicPartition topicPartition : assignments) {
+            if (consumer.position(topicPartition) < endOffsets.get(topicPartition)) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
new file mode 100644
index 0000000..0c163d8
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.clients.CommonClientConfigs;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/** Configuration required for MirrorClient to talk to a given target cluster.
+ *  <p>
+ *  Generally, these properties come from an mm2.properties configuration file
+ *  (@see MirrorMakerConfig.clientConfig):
+ *  </p>
+ *  <pre>
+ *    MirrorMakerConfig mmConfig = new MirrorMakerConfig(props);
+ *    MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster");
+ *  </pre>
+ *  <p>
+ *  In addition to the properties defined here, sub-configs are supported for Admin, Consumer, and Producer clients.
+ *  For example:
+ *  </p>
+ *  <pre>
+ *      bootstrap.servers = host1:9092
+ *      consumer.client.id = mm2-client
+ *      replication.policy.separator = __
+ *  </pre>
+ */
+public class MirrorClientConfig extends AbstractConfig {
+    public static final String REPLICATION_POLICY_CLASS = "replication.policy.class";
+    private static final String REPLICATION_POLICY_CLASS_DOC = "Class which defines the remote topic naming convention.";
+    public static final Class REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class;
+    public static final String REPLICATION_POLICY_SEPARATOR = "replication.policy.separator";
+    private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention.";
+    public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
+        DefaultReplicationPolicy.SEPARATOR_DEFAULT;
+    
+    public static final String ADMIN_CLIENT_PREFIX = "admin.";
+    public static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    public static final String PRODUCER_CLIENT_PREFIX = "producer.";
+
+    static final String CHECKPOINTS_TOPIC_SUFFIX = ".checkpoints.internal"; // internal so not replicated
+    static final String HEARTBEATS_TOPIC = "heartbeats";
+ 
+    MirrorClientConfig(Map<?, ?> props) {
+        super(CONFIG_DEF, props, true);
+    }
+
+    public ReplicationPolicy replicationPolicy() {
+        return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class);
+    }
+
+    /** Sub-config for Admin clients. */
+    public Map<String, Object> adminConfig() {
+        return clientConfig(ADMIN_CLIENT_PREFIX);
+    }
+
+    /** Sub-config for Consumer clients. */
+    public Map<String, Object> consumerConfig() {
+        return clientConfig(CONSUMER_CLIENT_PREFIX);
+    }
+
+    /** Sub-config for Producer clients. */
+    public Map<String, Object> producerConfig() {
+        return clientConfig(PRODUCER_CLIENT_PREFIX);
+    }
+    
+    private Map<String, Object> clientConfig(String prefix) {
+        Map<String, Object> props = new HashMap<>();
+        props.putAll(valuesWithPrefixOverride(prefix));
+        props.keySet().retainAll(CLIENT_CONFIG_DEF.names());
+        props.entrySet().removeIf(x -> x.getValue() == null);
+        return props;
+    }
+
+    // Properties passed to internal Kafka clients
+    static final ConfigDef CLIENT_CONFIG_DEF = new ConfigDef()
+        .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+            Type.LIST,
+            null,
+            Importance.HIGH,
+            CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+        // security support
+        .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+            Type.STRING,
+            CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+            Importance.MEDIUM,
+            CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+        .withClientSslSupport()
+        .withClientSaslSupport();
+ 
+    static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+            Type.STRING,
+            null,
+            Importance.HIGH,
+            CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) 
+        .define(
+            REPLICATION_POLICY_CLASS,
+            ConfigDef.Type.CLASS,
+            REPLICATION_POLICY_CLASS_DEFAULT,
+            ConfigDef.Importance.LOW,
+            REPLICATION_POLICY_CLASS_DOC)
+        .define(
+            REPLICATION_POLICY_SEPARATOR,
+            ConfigDef.Type.STRING,
+            REPLICATION_POLICY_SEPARATOR_DEFAULT,
+            ConfigDef.Importance.LOW,
+            REPLICATION_POLICY_SEPARATOR_DOC)
+        .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+                Type.STRING,
+                CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+                Importance.MEDIUM,
+                CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+        .withClientSslSupport()
+        .withClientSaslSupport();
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
new file mode 100644
index 0000000..f934319
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.time.Duration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+ 
+/** Convenience methods for multi-cluster environments. Wraps MirrorClient (@see MirrorClient).
+ *  <p>
+ *  Properties passed to these methods are used to construct internal Admin and Consumer clients.
+ *  Sub-configs like "admin.xyz" are also supported. For example:
+ *  </p>
+ *  <pre>
+ *      bootstrap.servers = host1:9092
+ *      consumer.client.id = mm2-client
+ *  </pre>
+ *  <p>
+ *  @see MirrorClientConfig for additional properties used by the internal MirrorClient.
+ *  </p>
+ */
+public final class RemoteClusterUtils {
+    private static final Logger log = LoggerFactory.getLogger(RemoteClusterUtils.class);
+
+    // utility class
+    private RemoteClusterUtils() {}
+
+    /** Find shortest number of hops from an upstream cluster.
+     *  Returns -1 if the cluster is unreachable */ 
+    public static int replicationHops(Map<String, Object> properties, String upstreamClusterAlias)
+            throws InterruptedException, TimeoutException {
+        try (MirrorClient client = new MirrorClient(properties)) {
+            return client.replicationHops(upstreamClusterAlias);
+        }
+    }
+
+    /** Find all heartbeat topics */
+    public static Set<String> heartbeatTopics(Map<String, Object> properties)
+            throws InterruptedException, TimeoutException {
+        try (MirrorClient client = new MirrorClient(properties)) {
+            return client.heartbeatTopics();
+        }
+    }
+
+    /** Find all checkpoint topics */
+    public static Set<String> checkpointTopics(Map<String, Object> properties)
+            throws InterruptedException, TimeoutException {
+        try (MirrorClient client = new MirrorClient(properties)) {
+            return client.checkpointTopics();
+        }
+    }
+
+    /** Find all upstream clusters */
+    public static Set<String> upstreamClusters(Map<String, Object> properties)
+            throws InterruptedException, TimeoutException {
+        try (MirrorClient client = new MirrorClient(properties)) {
+            return client.upstreamClusters();
+        }
+    }
+
+    /** Translate a remote consumer group's offsets into corresponding local offsets. Topics are automatically
+     *  renamed according to the ReplicationPolicy.
+     *  @param properties @see MirrorClientConfig
+     *  @param consumerGroupId group ID of remote consumer group
+     *  @param remoteClusterAlias alias of remote cluster
+     *  @param timeout timeout
+     */
+    public static Map<TopicPartition, OffsetAndMetadata> translateOffsets(Map<String, Object> properties,
+            String remoteClusterAlias, String consumerGroupId, Duration timeout)
+            throws InterruptedException, TimeoutException {
+        try (MirrorClient client = new MirrorClient(properties)) {
+            return client.remoteConsumerOffsets(consumerGroupId, remoteClusterAlias, timeout);
+        }
+    }
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
new file mode 100644
index 0000000..11f73f5
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/** Defines which topics are "remote topics". e.g. "us-west.topic1". */
+@InterfaceStability.Evolving
+public interface ReplicationPolicy {
+
+    /** How to rename remote topics; generally should be like us-west.topic1. */
+    String formatRemoteTopic(String sourceClusterAlias, String topic);
+
+    /** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
+     *  Returns null if not a remote topic.
+     */
+    String topicSource(String topic);
+
+    /** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
+     *
+     *  Topics may be replicated multiple hops, so the immediately upstream topic
+     *  may itself be a remote topic.
+     *
+     *  Returns null if not a remote topic.
+     */
+    String upstreamTopic(String topic); 
+
+    /** The name of the original source-topic, which may have been replicated multiple hops.
+     *  Returns the topic if it is not a remote topic.
+     */
+    default String originalTopic(String topic) {
+        String upstream = upstreamTopic(topic);
+        if (upstream == null) {
+            return topic;
+        } else {
+            return originalTopic(upstream);
+        }
+    }
+
+    /** Internal topics are never replicated. */
+    default boolean isInternalTopic(String topic) {
+        return topic.endsWith(".internal") || topic.endsWith("-internal") || topic.startsWith("__")
+            || topic.startsWith(".");
+    }
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
new file mode 100644
index 0000000..f853dc4
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+/** Directional pair of clustes, where source is replicated to target. */
+public class SourceAndTarget {
+    private String source;
+    private String target;
+
+    public SourceAndTarget(String source, String target) {
+        this.source = source;
+        this.target = target;
+    }
+
+    public String source() {
+        return source;
+    }
+
+    public String target() {
+        return target;
+    }
+
+    @Override
+    public String toString() {
+        return source + "->" + target;
+    }
+
+    @Override
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        return other != null && toString().equals(other.toString());
+    }
+}
+
diff --git a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
new file mode 100644
index 0000000..c2536d5
--- /dev/null
+++ b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class MirrorClientTest {
+
+    private static class FakeMirrorClient extends MirrorClient {
+
+        List<String> topics;
+
+        FakeMirrorClient(List<String> topics) {
+            super(null, new DefaultReplicationPolicy(), null);
+            this.topics = topics;
+        }
+
+        FakeMirrorClient() {
+            this(Collections.emptyList());
+        } 
+
+        @Override
+        protected Set<String> listTopics() {
+            return new HashSet<>(topics);
+        }
+    }
+
+    @Test
+    public void testIsHeartbeatTopic() throws InterruptedException, TimeoutException {
+        MirrorClient client = new FakeMirrorClient();
+        assertTrue(client.isHeartbeatTopic("heartbeats"));
+        assertTrue(client.isHeartbeatTopic("source1.heartbeats"));
+        assertTrue(client.isHeartbeatTopic("source2.source1.heartbeats"));
+        assertFalse(client.isHeartbeatTopic("heartbeats!"));
+        assertFalse(client.isHeartbeatTopic("!heartbeats"));
+        assertFalse(client.isHeartbeatTopic("source1heartbeats"));
+        assertFalse(client.isHeartbeatTopic("source1-heartbeats"));
+    }
+
+    @Test
+    public void testIsCheckpointTopic() throws InterruptedException, TimeoutException {
+        MirrorClient client = new FakeMirrorClient();
+        assertTrue(client.isCheckpointTopic("source1.checkpoints.internal"));
+        assertFalse(client.isCheckpointTopic("checkpoints.internal"));
+        assertFalse(client.isCheckpointTopic("checkpoints-internal"));
+        assertFalse(client.isCheckpointTopic("checkpoints.internal!"));
+        assertFalse(client.isCheckpointTopic("!checkpoints.internal"));
+        assertFalse(client.isCheckpointTopic("source1checkpointsinternal"));
+    }
+
+    @Test
+    public void countHopsForTopicTest() throws InterruptedException, TimeoutException {
+        MirrorClient client = new FakeMirrorClient();
+        assertEquals(-1, client.countHopsForTopic("topic", "source"));
+        assertEquals(-1, client.countHopsForTopic("source", "source"));
+        assertEquals(-1, client.countHopsForTopic("sourcetopic", "source"));
+        assertEquals(-1, client.countHopsForTopic("source1.topic", "source2"));
+        assertEquals(1, client.countHopsForTopic("source1.topic", "source1"));
+        assertEquals(1, client.countHopsForTopic("source2.source1.topic", "source2"));
+        assertEquals(2, client.countHopsForTopic("source2.source1.topic", "source1"));
+        assertEquals(3, client.countHopsForTopic("source3.source2.source1.topic", "source1"));
+        assertEquals(-1, client.countHopsForTopic("source3.source2.source1.topic", "source4"));
+    }
+
+    @Test
+    public void heartbeatTopicsTest() throws InterruptedException, TimeoutException {
+        MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+            "source1.heartbeats", "source2.source1.heartbeats", "source3.heartbeats"));
+        Set<String> heartbeatTopics = client.heartbeatTopics();
+        assertEquals(heartbeatTopics, new HashSet<>(Arrays.asList("heartbeats", "source1.heartbeats",
+            "source2.source1.heartbeats", "source3.heartbeats")));
+    }
+
+    @Test
+    public void checkpointsTopicsTest() throws InterruptedException, TimeoutException {
+        MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "checkpoints.internal",
+            "source1.checkpoints.internal", "source2.source1.checkpoints.internal", "source3.checkpoints.internal"));
+        Set<String> checkpointTopics = client.checkpointTopics();
+        assertEquals(new HashSet<>(Arrays.asList("source1.checkpoints.internal",
+            "source2.source1.checkpoints.internal", "source3.checkpoints.internal")), checkpointTopics);
+    }
+
+    @Test
+    public void replicationHopsTest() throws InterruptedException, TimeoutException {
+        MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+            "source1.heartbeats", "source1.source2.heartbeats", "source3.heartbeats"));
+        assertEquals(1, client.replicationHops("source1"));
+        assertEquals(2, client.replicationHops("source2")); 
+        assertEquals(1, client.replicationHops("source3"));
+        assertEquals(-1, client.replicationHops("source4"));
+    }
+
+    @Test
+    public void upstreamClustersTest() throws InterruptedException {
+        MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+            "source1.heartbeats", "source1.source2.heartbeats", "source3.source4.source5.heartbeats"));
+        Set<String> sources = client.upstreamClusters();
+        assertTrue(sources.contains("source1"));
+        assertTrue(sources.contains("source2"));
+        assertTrue(sources.contains("source3"));
+        assertTrue(sources.contains("source4"));
+        assertTrue(sources.contains("source5"));
+        assertFalse(sources.contains("sourceX"));
+        assertFalse(sources.contains(""));
+        assertFalse(sources.contains(null));
+    }
+
+    @Test
+    public void remoteTopicsTest() throws InterruptedException {
+        MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
+            "source1.topic4", "source1.source2.topic5", "source3.source4.source5.topic6"));
+        Set<String> remoteTopics = client.remoteTopics();
+        assertFalse(remoteTopics.contains("topic1"));
+        assertFalse(remoteTopics.contains("topic2"));
+        assertFalse(remoteTopics.contains("topic3"));
+        assertTrue(remoteTopics.contains("source1.topic4"));
+        assertTrue(remoteTopics.contains("source1.source2.topic5"));
+        assertTrue(remoteTopics.contains("source3.source4.source5.topic6"));
+    }
+
+    @Test
+    public void remoteTopicsSeparatorTest() throws InterruptedException {
+        MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
+            "source1__topic4", "source1__source2__topic5", "source3__source4__source5__topic6"));
+        ((Configurable) client.replicationPolicy()).configure(
+            Collections.singletonMap("replication.policy.separator", "__"));
+        Set<String> remoteTopics = client.remoteTopics();
+        assertFalse(remoteTopics.contains("topic1"));
+        assertFalse(remoteTopics.contains("topic2"));
+        assertFalse(remoteTopics.contains("topic3"));
+        assertTrue(remoteTopics.contains("source1__topic4"));
+        assertTrue(remoteTopics.contains("source1__source2__topic5"));
+        assertTrue(remoteTopics.contains("source3__source4__source5__topic6"));
+    }
+
+}
diff --git a/connect/mirror/README.md b/connect/mirror/README.md
new file mode 100644
index 0000000..68e3536
--- /dev/null
+++ b/connect/mirror/README.md
@@ -0,0 +1,222 @@
+
+# MirrorMaker 2.0
+
+MM2 leverages the Connect framework to replicate topics between Kafka
+clusters. MM2 includes several new features, including:
+
+ - both topics and consumer groups are replicated
+ - topic configuration and ACLs are replicated
+ - cross-cluster offsets are synchronized
+ - partitioning is preserved
+
+## Replication flows
+
+MM2 replicates topics and consumer groups from upstream source clusters
+to downstream target clusters. These directional flows are notated
+`A->B`.
+
+It's possible to create complex replication topologies based on these
+`source->target` flows, including:
+
+ - *fan-out*, e.g. `K->A, K->B, K->C`
+ - *aggregation*, e.g. `A->K, B->K, C->K`
+ - *active/active*, e.g. `A->B, B->A`
+
+Each replication flow can be configured independently, e.g. to replicate
+specific topics or groups:
+
+    A->B.topics = topic-1, topic-2
+    A->B.groups = group-1, group-2
+
+By default, all topics and consumer groups are replicated (except
+blacklisted ones), across all enabled replication flows. Each
+replication flow must be explicitly enabled to begin replication:
+
+    A->B.enabled = true
+    B->A.enabled = true
+
+## Starting an MM2 process
+
+You can run any number of MM2 processes as needed. Any MM2 processes
+which are configured to replicate the same Kafka clusters will find each
+other, share configuration, load balance, etc.
+
+To start an MM2 process, first specify Kafka cluster information in a
+configuration file as follows:
+
+    # mm2.properties
+    clusters = us-west, us-east
+    us-west.bootstrap.servers = host1:9092
+    us-east.bootstrap.servers = host2:9092
+
+You can list any number of clusters this way.
+
+Optionally, you can override default MirrorMaker properties:
+
+    topics = .*
+    groups = group1, group2
+    emit.checkpoints.interval.seconds = 10
+
+These will apply to all replication flows. You can also override default
+properties for specific clusters or replication flows:
+
+    # configure a specific cluster
+    us-west.offset.storage.topic = mm2-offsets
+
+    # configure a specific source->target replication flow
+    us-west->us-east.emit.heartbeats = false
+
+Next, enable individual replication flows as follows:
+
+    us-west->us-east.enabled = true     # disabled by default
+
+Finally, launch one or more MirrorMaker processes with the `connect-mirror-maker.sh`
+script:
+
+    $ ./bin/connect-mirror-maker.sh mm2.properties
+
+## Multicluster environments
+
+MM2 supports replication between multiple Kafka clusters, whether in the
+same data center or across multiple data centers. A single MM2 cluster
+can span multiple data centers, but it is recommended to keep MM2's producers
+as close as possible to their target clusters. To do so, specify a subset
+of clusters for each MM2 node as follows:
+
+    # in west DC:
+    $ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2
+
+This signals to the node that the given clusters are nearby, and prevents the
+node from sending records or configuration to clusters in other data centers.
+
+### Example
+
+Say there are three data centers (west, east, north) with two Kafka
+clusters in each data center (west-1, west-2 etc). We can configure MM2
+for active/active replication within each data center, as well as cross data
+center replication (XDCR) as follows:
+
+    # mm2.properties
+    clusters: west-1, west-2, east-1, east-2, north-1, north-2
+
+    west-1.bootstrap.servers = ...
+    ---%<---
+
+    # active/active in west
+    west-1->west-2.enabled = true
+    west-2->west-1.enabled = true
+
+    # active/active in east
+    east-1->east-2.enabled = true
+    east-2->east-1.enabled = true
+
+    # active/active in north
+    north-1->north-2.enabled = true
+    north-2->north-1.enabled = true
+
+    # XDCR via west-1, east-1, north-1
+    west-1->east-1.enabled = true
+    west-1->north-1.enabled = true
+    east-1->west-1.enabled = true
+    east-1->north-1.enabled = true
+    north-1->west-1.enabled = true
+    north-1->east-1.enabled = true
+
+Then, launch MM2 in each data center as follows:
+
+    # in west:
+    $ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2
+
+    # in east:
+    $ ./bin/connect-mirror-maker.sh mm2.properties --clusters east-1 east-2
+
+    # in north:
+    $ ./bin/connect-mirror-maker.sh mm2.properties --clusters north-1 north-2
+    
+With this configuration, records produced to any cluster will be replicated
+within the data center, as well as across to other data centers. By providing
+the `--clusters` parameter, we ensure that each node only produces records to
+nearby clusters.
+
+N.B. that the `--clusters` parameter is not technically required here. MM2 will work fine without it; however, throughput may suffer from "producer lag" between
+data centers, and you may incur unnecessary data transfer costs.
+
+## Shared configuration
+
+MM2 processes share configuration via their target Kafka clusters. 
+For example, the following two processes would be racy:
+
+    # process1:
+    A->B.enabled = true
+    A->B.topics = foo
+
+    # process2:
+    A->B.enabled = true
+    A->B.topics = bar
+
+In this case, the two processes will share configuration via cluster `B`.
+Depending on which processes is elected "leader", the result will be
+that either `foo` or `bar` is replicated -- but not both. For this reason,
+it is important to keep configuration consistent across flows to the same
+target cluster. In most cases, your entire organization should use a single
+MM2 configuration file.
+
+## Remote topics
+
+MM2 employs a naming convention to ensure that records from different
+clusters are not written to the same partition. By default, replicated
+topics are renamed based on "source cluster aliases":
+
+    topic-1 --> source.topic-1
+
+This can be customized by overriding the `replication.policy.separator`
+property (default is a period). If you need more control over how
+remote topics are defined, you can implement a custom `ReplicationPolicy`
+and override `replication.policy.class` (default is
+`DefaultReplicationPolicy`).
+
+## Monitoring an MM2 process
+
+MM2 is built on the Connect framework and inherits all of Connect's metrics, e.g.
+`source-record-poll-rate`. In addition, MM2 produces its own metrics under the
+`kafka.connect.mirror` metric group. Metrics are tagged with the following properties:
+
+    - *target*: alias of target cluster
+    - *source*: alias of source cluster
+    - *topic*:  remote topic on target cluster 
+    - *partition*: partition being replicated
+
+Metrics are tracked for each *remote* topic. The source cluster can be inferred
+from the topic name. For example, replicating `topic1` from `A->B` will yield metrics
+like:
+
+    - `target=B`
+    - `topic=A.topic1`
+    - `partition=1`
+
+The following metrics are emitted:
+
+    # MBean: kafka.connect.mirror:type=MirrorSourceConnector,target=([-.w]+),topic=([-.w]+),partition=([0-9]+)
+
+    record-count            # number of records replicated source -> target
+    record-age-ms           # age of records when they are replicated
+    record-age-ms-min
+    record-age-ms-max
+    record-age-ms-avg
+    replication-latecny-ms  # time it takes records to propagate source->target
+    replication-latency-ms-min
+    replication-latency-ms-max
+    replication-latency-ms-avg
+    byte-rate               # average number of bytes/sec in replicated records
+
+
+    # MBean: kafka.connect.mirror:type=MirrorCheckpointConnector,source=([-.w]+),target=([-.w]+)
+
+    checkpoint-latency-ms   # time it takes to replicate consumer offsets
+    checkpoint-latency-ms-min
+    checkpoint-latency-ms-max
+    checkpoint-latency-ms-avg
+
+These metrics do not discern between created-at and log-append timestamps.
+
+
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java
new file mode 100644
index 0000000..ec6b3b9
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+/** Defines which topic configuration properties should be replicated. */
+@InterfaceStability.Evolving
+public interface ConfigPropertyFilter extends Configurable, AutoCloseable {
+
+    boolean shouldReplicateConfigProperty(String prop);
+
+    default void close() {
+        //nop
+    }
+
+    default void configure(Map<String, ?> props) {
+        //nop
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
new file mode 100644
index 0000000..f51db1c
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/** Uses a blacklist of property names or regexes. */
+public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
+    
+    public static final String CONFIG_PROPERTIES_BLACKLIST_CONFIG = "config.properties.blacklist";
+    private static final String CONFIG_PROPERTIES_BLACKLIST_DOC = "List of topic configuration properties and/or regexes "
+        + "that should not be replicated.";
+    public static final String CONFIG_PROPERTIES_BLACKLIST_DEFAULT = "follower\\.replication\\.throttled\\.replicas, "
+        + "leader\\.replication\\.throttled\\.replicas, "
+        + "message\\.timestamp\\.difference\\.max\\.ms, "
+        + "message\\.timestamp\\.type, "
+        + "unclean\\.leader\\.election\\.enable, "
+        + "min\\.insync\\.replicas";
+    private Pattern blacklistPattern = MirrorUtils.compilePatternList(CONFIG_PROPERTIES_BLACKLIST_DEFAULT);
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        ConfigPropertyFilterConfig config = new ConfigPropertyFilterConfig(props);
+        blacklistPattern = config.blacklistPattern();
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private boolean blacklisted(String prop) {
+        return blacklistPattern != null && blacklistPattern.matcher(prop).matches();
+    }
+
+    @Override
+    public boolean shouldReplicateConfigProperty(String prop) {
+        return !blacklisted(prop);
+    }
+
+    static class ConfigPropertyFilterConfig extends AbstractConfig {
+
+        static final ConfigDef DEF = new ConfigDef()
+            .define(CONFIG_PROPERTIES_BLACKLIST_CONFIG,
+                Type.LIST,
+                CONFIG_PROPERTIES_BLACKLIST_DEFAULT,
+                Importance.HIGH,
+                CONFIG_PROPERTIES_BLACKLIST_DOC);
+
+        ConfigPropertyFilterConfig(Map<?, ?> props) {
+            super(DEF, props, false);
+        }
+
+        Pattern blacklistPattern() {
+            return MirrorUtils.compilePatternList(getList(CONFIG_PROPERTIES_BLACKLIST_CONFIG));
+        }
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java
new file mode 100644
index 0000000..acf5115
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/** Uses a whitelist and blacklist. */
+public class DefaultGroupFilter implements GroupFilter {
+
+    public static final String GROUPS_WHITELIST_CONFIG = "groups";
+    private static final String GROUPS_WHITELIST_DOC = "List of consumer group names and/or regexes to replicate.";
+    public static final String GROUPS_WHITELIST_DEFAULT = ".*";
+
+    public static final String GROUPS_BLACKLIST_CONFIG = "groups.blacklist";
+    private static final String GROUPS_BLACKLIST_DOC = "List of consumer group names and/or regexes that should not be replicated.";
+    public static final String GROUPS_BLACKLIST_DEFAULT = "console-consumer-.*, connect-.*, __.*";
+
+    private Pattern whitelistPattern;
+    private Pattern blacklistPattern;
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        GroupFilterConfig config = new GroupFilterConfig(props);
+        whitelistPattern = config.whitelistPattern();
+        blacklistPattern = config.blacklistPattern();
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private boolean whitelisted(String group) {
+        return whitelistPattern != null && whitelistPattern.matcher(group).matches();
+    }
+
+    private boolean blacklisted(String group) {
+        return blacklistPattern != null && blacklistPattern.matcher(group).matches();
+    }
+
+    @Override
+    public boolean shouldReplicateGroup(String group) {
+        return whitelisted(group) && !blacklisted(group);
+    }
+
+    static class GroupFilterConfig extends AbstractConfig {
+
+        static final ConfigDef DEF = new ConfigDef()
+            .define(GROUPS_WHITELIST_CONFIG,
+                Type.LIST,
+                GROUPS_WHITELIST_DEFAULT,
+                Importance.HIGH,
+                GROUPS_WHITELIST_DOC) 
+            .define(GROUPS_BLACKLIST_CONFIG,
+                Type.LIST,
+                GROUPS_BLACKLIST_DEFAULT,
+                Importance.HIGH,
+                GROUPS_BLACKLIST_DOC);
+
+        GroupFilterConfig(Map<?, ?> props) {
+            super(DEF, props, false);
+        }
+
+        Pattern whitelistPattern() {
+            return MirrorUtils.compilePatternList(getList(GROUPS_WHITELIST_CONFIG));
+        }
+
+        Pattern blacklistPattern() {
+            return MirrorUtils.compilePatternList(getList(GROUPS_BLACKLIST_CONFIG));
+        }
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
new file mode 100644
index 0000000..308bdbf
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/** Uses a whitelist and blacklist. */
+public class DefaultTopicFilter implements TopicFilter {
+    
+    public static final String TOPICS_WHITELIST_CONFIG = "topics";
+    private static final String TOPICS_WHITELIST_DOC = "List of topics and/or regexes to replicate.";
+    public static final String TOPICS_WHITELIST_DEFAULT = ".*";
+
+    public static final String TOPICS_BLACKLIST_CONFIG = "topics.blacklist";
+    private static final String TOPICS_BLACKLIST_DOC = "List of topics and/or regexes that should not be replicated.";
+    public static final String TOPICS_BLACKLIST_DEFAULT = ".*[\\-\\.]internal, .*\\.replica, __.*";
+
+    private Pattern whitelistPattern;
+    private Pattern blacklistPattern;
+
+    @Override
+    public void configure(Map<String, ?> props) {
+        TopicFilterConfig config = new TopicFilterConfig(props);
+        whitelistPattern = config.whitelistPattern();
+        blacklistPattern = config.blacklistPattern();
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private boolean whitelisted(String topic) {
+        return whitelistPattern != null && whitelistPattern.matcher(topic).matches();
+    }
+
+    private boolean blacklisted(String topic) {
+        return blacklistPattern != null && blacklistPattern.matcher(topic).matches();
+    }
+
+    @Override
+    public boolean shouldReplicateTopic(String topic) {
+        return whitelisted(topic) && !blacklisted(topic);
+    }
+
+    static class TopicFilterConfig extends AbstractConfig {
+
+        static final ConfigDef DEF = new ConfigDef()
+            .define(TOPICS_WHITELIST_CONFIG,
+                Type.LIST,
+                TOPICS_WHITELIST_DEFAULT,
+                Importance.HIGH,
+                TOPICS_WHITELIST_DOC) 
+            .define(TOPICS_BLACKLIST_CONFIG,
+                Type.LIST,
+                TOPICS_BLACKLIST_DEFAULT,
+                Importance.HIGH,
+                TOPICS_BLACKLIST_DOC);
+
+        TopicFilterConfig(Map<?, ?> props) {
+            super(DEF, props, false);
+        }
+
+        Pattern whitelistPattern() {
+            return MirrorUtils.compilePatternList(getList(TOPICS_WHITELIST_CONFIG));
+        }
+
+        Pattern blacklistPattern() {
+            return MirrorUtils.compilePatternList(getList(TOPICS_BLACKLIST_CONFIG));
+        }
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/GroupFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/GroupFilter.java
new file mode 100644
index 0000000..0202dd5
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/GroupFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+/** Defines which consumer groups should be replicated. */
+@InterfaceStability.Evolving
+public interface GroupFilter extends Configurable, AutoCloseable {
+
+    boolean shouldReplicateGroup(String group);
+
+    default void close() {
+        //nop
+    }
+
+    default void configure(Map<String, ?> props) {
+        //nop
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
new file mode 100644
index 0000000..a358584
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/** Replicate consumer group state between clusters. Emits checkpoint records.
+ *
+ *  @see MirrorConnectorConfig for supported config properties.
+ */
+public class MirrorCheckpointConnector extends SourceConnector {
+
+    private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointConnector.class);
+
+    private Scheduler scheduler;
+    private MirrorConnectorConfig config;
+    private GroupFilter groupFilter;
+    private AdminClient sourceAdminClient;
+    private SourceAndTarget sourceAndTarget;
+    private String connectorName;
+    private List<String> knownConsumerGroups = Collections.emptyList();
+
+    @Override
+    public void start(Map<String, String> props) {
+        config = new MirrorConnectorConfig(props);
+        if (!config.enabled()) {
+            return;
+        }
+        connectorName = config.connectorName();
+        sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
+        groupFilter = config.groupFilter();
+        sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
+        scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout());
+        scheduler.execute(this::createInternalTopics, "creating internal topics");
+        scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
+        scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, config.refreshGroupsInterval(),
+                "refreshing consumer groups");
+        log.info("Started {} with {} consumer groups.", connectorName, knownConsumerGroups.size());
+        log.debug("Started {} with consumer groups: {}", connectorName, knownConsumerGroups);
+    }
+
+    @Override
+    public void stop() {
+        if (!config.enabled()) {
+            return;
+        }
+        Utils.closeQuietly(scheduler, "scheduler");
+        Utils.closeQuietly(groupFilter, "group filter");
+        Utils.closeQuietly(sourceAdminClient, "source admin client");
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MirrorCheckpointTask.class;
+    }
+
+    // divide consumer groups among tasks
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        if (!config.enabled() || knownConsumerGroups.isEmpty()) {
+            return Collections.emptyList();
+        }
+        int numTasks = Math.min(maxTasks, knownConsumerGroups.size());
+        return ConnectorUtils.groupPartitions(knownConsumerGroups, numTasks).stream()
+                .map(config::taskConfigForConsumerGroups)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public ConfigDef config() {
+        return MirrorConnectorConfig.CONNECTOR_CONFIG_DEF;
+    }
+
+    @Override
+    public String version() {
+        return "1";
+    }
+
+    private void refreshConsumerGroups()
+            throws InterruptedException, ExecutionException {
+        List<String> consumerGroups = findConsumerGroups();
+        Set<String> newConsumerGroups = new HashSet<>();
+        newConsumerGroups.addAll(consumerGroups);
+        newConsumerGroups.removeAll(knownConsumerGroups);
+        Set<String> deadConsumerGroups = new HashSet<>();
+        deadConsumerGroups.addAll(knownConsumerGroups);
+        deadConsumerGroups.removeAll(consumerGroups);
+        if (!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {
+            log.info("Found {} consumer groups for {}. {} are new. {} were removed. Previously had {}.",
+                    consumerGroups.size(), sourceAndTarget, newConsumerGroups.size(), deadConsumerGroups.size(),
+                    knownConsumerGroups.size());
+            log.debug("Found new consumer groups: {}", newConsumerGroups);
+            knownConsumerGroups = consumerGroups;
+            context.requestTaskReconfiguration();
+        }
+    }
+
+    private void loadInitialConsumerGroups()
+            throws InterruptedException, ExecutionException {
+        knownConsumerGroups = findConsumerGroups();
+    }
+
+    private List<String> findConsumerGroups()
+            throws InterruptedException, ExecutionException {
+        return listConsumerGroups().stream()
+                .filter(x -> !x.isSimpleConsumerGroup())
+                .map(x -> x.groupId())
+                .filter(this::shouldReplicate)
+                .collect(Collectors.toList());
+    }
+
+    private Collection<ConsumerGroupListing> listConsumerGroups()
+            throws InterruptedException, ExecutionException {
+        return sourceAdminClient.listConsumerGroups().valid().get();
+    }
+
+    private void createInternalTopics() {
+        MirrorUtils.createSinglePartitionCompactedTopic(config.checkpointsTopic(),
+            config.checkpointsTopicReplicationFactor(), config.targetAdminConfig());
+    } 
+
+    boolean shouldReplicate(String group) {
+        return groupFilter.shouldReplicateGroup(group);
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
new file mode 100644
index 0000000..47a0569
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.concurrent.ExecutionException;
+import java.time.Duration;
+
+/** Emits checkpoints for upstream consumer groups. */
+public class MirrorCheckpointTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointTask.class);
+
+    private AdminClient sourceAdminClient;
+    private String sourceClusterAlias;
+    private String targetClusterAlias;
+    private String checkpointsTopic;
+    private Duration interval;
+    private Duration pollTimeout;
+    private Duration adminTimeout;
+    private TopicFilter topicFilter;
+    private Set<String> consumerGroups;
+    private ReplicationPolicy replicationPolicy;
+    private OffsetSyncStore offsetSyncStore;
+    private boolean stopping;
+    private MirrorMetrics metrics;
+
+    public MirrorCheckpointTask() {}
+
+    // for testing
+    MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
+            ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore) {
+        this.sourceClusterAlias = sourceClusterAlias;
+        this.targetClusterAlias = targetClusterAlias;
+        this.replicationPolicy = replicationPolicy;
+        this.offsetSyncStore = offsetSyncStore;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        MirrorTaskConfig config = new MirrorTaskConfig(props);
+        stopping = false;
+        sourceClusterAlias = config.sourceClusterAlias();
+        targetClusterAlias = config.targetClusterAlias();
+        consumerGroups = config.taskConsumerGroups();
+        checkpointsTopic = config.checkpointsTopic();
+        topicFilter = config.topicFilter();
+        replicationPolicy = config.replicationPolicy();
+        interval = config.emitCheckpointsInterval();
+        pollTimeout = config.consumerPollTimeout();
+        adminTimeout = config.adminTimeout();
+        offsetSyncStore = new OffsetSyncStore(config);
+        sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
+        metrics = config.metrics();
+    }
+
+    @Override
+    public void commit() throws InterruptedException {
+        // nop
+    }
+
+    @Override
+    public void stop() {
+        long start = System.currentTimeMillis();
+        stopping = true;
+        Utils.closeQuietly(offsetSyncStore, "offset sync store");
+        Utils.closeQuietly(sourceAdminClient, "source admin client");
+        Utils.closeQuietly(metrics, "metrics");
+        log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), System.currentTimeMillis() - start);
+    }
+
+    @Override
+    public String version() {
+        return "1";
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        try { 
+            long deadline = System.currentTimeMillis() + interval.toMillis();
+            while (!stopping && System.currentTimeMillis() < deadline) {
+                offsetSyncStore.update(pollTimeout);
+            }
+            List<SourceRecord> records = new ArrayList<>();
+            for (String group : consumerGroups) {
+                records.addAll(checkpointsForGroup(group));
+            }
+            if (records.isEmpty()) {
+                // WorkerSourceTask expects non-zero batches or null
+                return null;
+            } else {
+                return records;
+            }
+        } catch (Throwable e) {
+            log.warn("Failure polling consumer state for checkpoints.", e);
+            return null;
+        }
+    }
+
+    private List<SourceRecord> checkpointsForGroup(String group) throws InterruptedException {
+        try {
+            long timestamp = System.currentTimeMillis();
+            return listConsumerGroupOffsets(group).entrySet().stream()
+                .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
+                .map(x -> checkpoint(group, x.getKey(), x.getValue()))
+                .filter(x -> x.downstreamOffset() > 0)  // ignore offsets we cannot translate accurately
+                .map(x -> checkpointRecord(x, timestamp))
+                .collect(Collectors.toList());
+        } catch (ExecutionException e) {
+            log.error("Error querying offsets for consumer group {} on cluster {}.",  group, sourceClusterAlias, e);
+            return Collections.emptyList();
+        }
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group)
+            throws InterruptedException, ExecutionException {
+        if (stopping) {
+            // short circuit if stopping
+            return Collections.emptyMap();
+        }
+        return sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get();
+    }
+
+    Checkpoint checkpoint(String group, TopicPartition topicPartition,
+            OffsetAndMetadata offsetAndMetadata) {
+        long upstreamOffset = offsetAndMetadata.offset();
+        long downstreamOffset = offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
+        return new Checkpoint(group, renameTopicPartition(topicPartition),
+            upstreamOffset, downstreamOffset, offsetAndMetadata.metadata());
+    }
+
+    SourceRecord checkpointRecord(Checkpoint checkpoint, long timestamp) {
+        return new SourceRecord(
+            checkpoint.connectPartition(), MirrorUtils.wrapOffset(0),
+            checkpointsTopic, 0,
+            Schema.BYTES_SCHEMA, checkpoint.recordKey(),
+            Schema.BYTES_SCHEMA, checkpoint.recordValue(),
+            timestamp);
+    }
+
+    TopicPartition renameTopicPartition(TopicPartition upstreamTopicPartition) {
+        if (targetClusterAlias.equals(replicationPolicy.topicSource(upstreamTopicPartition.topic()))) {
+            // this topic came from the target cluster, so we rename like us-west.topic1 -> topic1
+            return new TopicPartition(replicationPolicy.originalTopic(upstreamTopicPartition.topic()),
+                upstreamTopicPartition.partition());
+        } else {
+            // rename like topic1 -> us-west.topic1
+            return new TopicPartition(replicationPolicy.formatRemoteTopic(sourceClusterAlias,
+                upstreamTopicPartition.topic()), upstreamTopicPartition.partition());
+        }
+    }
+
+    boolean shouldCheckpointTopic(String topic) {
+        return topicFilter.shouldReplicateTopic(topic);
+    }
+
+    @Override
+    public void commitRecord(SourceRecord record) {
+        metrics.checkpointLatency(MirrorUtils.unwrapPartition(record.sourcePartition()),
+            Checkpoint.unwrapGroup(record.sourcePartition()),
+            System.currentTimeMillis() - record.timestamp());
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
new file mode 100644
index 0000000..d922ead
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.time.Duration;
+
+/** Shared config properties used by MirrorSourceConnector, MirrorCheckpointConnector, and MirrorHeartbeatConnector.
+ *  <p>
+ *  Generally, these properties are filled-in automatically by MirrorMaker based on a top-level mm2.properties file.
+ *  However, when running MM2 connectors as plugins on a Connect-as-a-Service cluster, these properties must be configured manually,
+ *  e.g. via the Connect REST API.
+ *  </p>
+ *  <p>
+ *  An example configuration when running on Connect (not via MirrorMaker driver):
+ *  </p>
+ *  <pre>
+ *      {
+ *        "name": "MirrorSourceConnector",
+ *        "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
+ *        "replication.factor": "1",
+ *        "source.cluster.alias": "backup",
+ *        "target.cluster.alias": "primary",
+ *        "source.cluster.bootstrap.servers": "vip1:9092",
+ *        "target.cluster.bootstrap.servers": "vip2:9092",
+ *        "topics": ".*test-topic-.*",
+ *        "groups": "consumer-group-.*",
+ *        "emit.checkpoints.interval.seconds": "1",
+ *        "emit.heartbeats.interval.seconds": "1",
+ *        "sync.topic.acls.enabled": "false"
+ *      }
+ *  </pre>
+ */
+public class MirrorConnectorConfig extends AbstractConfig {
+
+    protected static final String ENABLED_SUFFIX = ".enabled";
+    protected static final String INTERVAL_SECONDS_SUFFIX = ".interval.seconds";
+
+    protected static final String REFRESH_TOPICS = "refresh.topics";
+    protected static final String REFRESH_GROUPS = "refresh.groups";
+    protected static final String SYNC_TOPIC_CONFIGS = "sync.topic.configs";
+    protected static final String SYNC_TOPIC_ACLS = "sync.topic.acls";
+    protected static final String EMIT_HEARTBEATS = "emit.heartbeats";
+    protected static final String EMIT_CHECKPOINTS = "emit.checkpoints";
+
+    public static final String ENABLED = "enabled";
+    private static final String ENABLED_DOC = "Whether to replicate source->target.";
+    public static final String SOURCE_CLUSTER_ALIAS = "source.cluster.alias";
+    private static final String SOURCE_CLUSTER_ALIAS_DOC = "Alias of source cluster";
+    public static final String TARGET_CLUSTER_ALIAS = "target.cluster.alias";
+    public static final String TARGET_CLUSTER_ALIAS_DEFAULT = "target";
+    private static final String TARGET_CLUSTER_ALIAS_DOC = "Alias of target cluster. Used in metrics reporting.";
+    public static final String REPLICATION_POLICY_CLASS = MirrorClientConfig.REPLICATION_POLICY_CLASS;
+    public static final Class REPLICATION_POLICY_CLASS_DEFAULT = MirrorClientConfig.REPLICATION_POLICY_CLASS_DEFAULT;
+    private static final String REPLICATION_POLICY_CLASS_DOC = "Class which defines the remote topic naming convention.";
+    public static final String REPLICATION_POLICY_SEPARATOR = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR;
+    private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention.";
+    public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
+            MirrorClientConfig.REPLICATION_POLICY_SEPARATOR_DEFAULT;
+    public static final String REPLICATION_FACTOR = "replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "Replication factor for newly created remote topics.";
+    public static final int REPLICATION_FACTOR_DEFAULT = 2;
+    public static final String TOPICS = DefaultTopicFilter.TOPICS_WHITELIST_CONFIG;
+    public static final String TOPICS_DEFAULT = DefaultTopicFilter.TOPICS_WHITELIST_DEFAULT;
+    private static final String TOPICS_DOC = "Topics to replicate. Supports comma-separated topic names and regexes.";
+    public static final String TOPICS_BLACKLIST = DefaultTopicFilter.TOPICS_BLACKLIST_CONFIG;
+    public static final String TOPICS_BLACKLIST_DEFAULT = DefaultTopicFilter.TOPICS_BLACKLIST_DEFAULT;
+    private static final String TOPICS_BLACKLIST_DOC = "Blacklisted topics. Supports comma-separated topic names and regexes."
+            + " Blacklists take precedence over whitelists.";
+    public static final String GROUPS = DefaultGroupFilter.GROUPS_WHITELIST_CONFIG;
+    public static final String GROUPS_DEFAULT = DefaultGroupFilter.GROUPS_WHITELIST_DEFAULT;
+    private static final String GROUPS_DOC = "Consumer groups to replicate. Supports comma-separated group IDs and regexes.";
+    public static final String GROUPS_BLACKLIST = DefaultGroupFilter.GROUPS_BLACKLIST_CONFIG;
+    public static final String GROUPS_BLACKLIST_DEFAULT = DefaultGroupFilter.GROUPS_BLACKLIST_DEFAULT;
+    private static final String GROUPS_BLACKLIST_DOC = "Blacklisted groups. Supports comma-separated group IDs and regexes."
+            + " Blacklists take precedence over whitelists.";
+    public static final String CONFIG_PROPERTIES_BLACKLIST = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_BLACKLIST_CONFIG;
+    public static final String CONFIG_PROPERTIES_BLACKLIST_DEFAULT = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_BLACKLIST_DEFAULT;
+    private static final String CONFIG_PROPERTIES_BLACKLIST_DOC = "Topic config properties that should not be replicated. Supports "
+            + "comma-separated property names and regexes.";
+
+    public static final String HEARTBEATS_TOPIC_REPLICATION_FACTOR = "heartbeats.topic.replication.factor";
+    public static final String HEARTBEATS_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor for heartbeats topic.";
+    public static final short HEARTBEATS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
+
+    public static final String CHECKPOINTS_TOPIC_REPLICATION_FACTOR = "checkpoints.topic.replication.factor";
+    public static final String CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor for checkpoints topic.";
+    public static final short CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
+
+    public static final String OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR = "offset-syncs.topic.replication.factor";
+    public static final String OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor for offset-syncs topic.";
+    public static final short OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
+
+    protected static final String TASK_TOPIC_PARTITIONS = "task.assigned.partitions";
+    protected static final String TASK_CONSUMER_GROUPS = "task.assigned.groups";
+
+    public static final String CONSUMER_POLL_TIMEOUT_MILLIS = "consumer.poll.timeout.ms";
+    private static final String CONSUMER_POLL_TIMEOUT_MILLIS_DOC = "Timeout when polling source cluster.";
+    public static final long CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT = 1000L;
+
+    public static final String ADMIN_TASK_TIMEOUT_MILLIS = "admin.timeout.ms";
+    private static final String ADMIN_TASK_TIMEOUT_MILLIS_DOC = "Timeout for administrative tasks, e.g. detecting new topics.";
+    public static final long ADMIN_TASK_TIMEOUT_MILLIS_DEFAULT = 60000L;
+
+    public static final String REFRESH_TOPICS_ENABLED = REFRESH_TOPICS + ENABLED_SUFFIX;
+    private static final String REFRESH_TOPICS_ENABLED_DOC = "Whether to periodically check for new topics and partitions.";
+    public static final boolean REFRESH_TOPICS_ENABLED_DEFAULT = true;
+    public static final String REFRESH_TOPICS_INTERVAL_SECONDS = REFRESH_TOPICS + INTERVAL_SECONDS_SUFFIX;
+    private static final String REFRESH_TOPICS_INTERVAL_SECONDS_DOC = "Frequency of topic refresh.";
+    public static final long REFRESH_TOPICS_INTERVAL_SECONDS_DEFAULT = 10 * 60;
+
+    public static final String REFRESH_GROUPS_ENABLED = REFRESH_GROUPS + ENABLED_SUFFIX;
+    private static final String REFRESH_GROUPS_ENABLED_DOC = "Whether to periodically check for new consumer groups.";
+    public static final boolean REFRESH_GROUPS_ENABLED_DEFAULT = true;
+    public static final String REFRESH_GROUPS_INTERVAL_SECONDS = REFRESH_GROUPS + INTERVAL_SECONDS_SUFFIX;
+    private static final String REFRESH_GROUPS_INTERVAL_SECONDS_DOC = "Frequency of group refresh.";
+    public static final long REFRESH_GROUPS_INTERVAL_SECONDS_DEFAULT = 10 * 60;
+
+    public static final String SYNC_TOPIC_CONFIGS_ENABLED = SYNC_TOPIC_CONFIGS + ENABLED_SUFFIX;
+    private static final String SYNC_TOPIC_CONFIGS_ENABLED_DOC = "Whether to periodically configure remote topics to match their corresponding upstream topics.";
+    public static final boolean SYNC_TOPIC_CONFIGS_ENABLED_DEFAULT = true;
+    public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX;
+    private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = "Frequency of topic config sync.";
+    public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 * 60;
+
+    public static final String SYNC_TOPIC_ACLS_ENABLED = SYNC_TOPIC_ACLS + ENABLED_SUFFIX;
+    private static final String SYNC_TOPIC_ACLS_ENABLED_DOC = "Whether to periodically configure remote topic ACLs to match their corresponding upstream topics.";
+    public static final boolean SYNC_TOPIC_ACLS_ENABLED_DEFAULT = true;
+    public static final String SYNC_TOPIC_ACLS_INTERVAL_SECONDS = SYNC_TOPIC_ACLS + INTERVAL_SECONDS_SUFFIX;
+    private static final String SYNC_TOPIC_ACLS_INTERVAL_SECONDS_DOC = "Frequency of topic ACL sync.";
+    public static final long SYNC_TOPIC_ACLS_INTERVAL_SECONDS_DEFAULT = 10 * 60;
+
+    public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + ENABLED_SUFFIX;
+    private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit heartbeats to target cluster.";
+    public static final boolean EMIT_HEARTBEATS_ENABLED_DEFAULT = true;
+    public static final String EMIT_HEARTBEATS_INTERVAL_SECONDS = EMIT_HEARTBEATS + INTERVAL_SECONDS_SUFFIX;
+    private static final String EMIT_HEARTBEATS_INTERVAL_SECONDS_DOC = "Frequency of heartbeats.";
+    public static final long EMIT_HEARTBEATS_INTERVAL_SECONDS_DEFAULT = 1;
+
+    public static final String EMIT_CHECKPOINTS_ENABLED = EMIT_CHECKPOINTS + ENABLED_SUFFIX;
+    private static final String EMIT_CHECKPOINTS_ENABLED_DOC = "Whether to replicate consumer offsets to target cluster.";
+    public static final boolean EMIT_CHECKPOINTS_ENABLED_DEFAULT = true;
+    public static final String EMIT_CHECKPOINTS_INTERVAL_SECONDS = EMIT_CHECKPOINTS + INTERVAL_SECONDS_SUFFIX;
+    private static final String EMIT_CHECKPOINTS_INTERVAL_SECONDS_DOC = "Frequency of checkpoints.";
+    public static final long EMIT_CHECKPOINTS_INTERVAL_SECONDS_DEFAULT = 60;
+
+    public static final String TOPIC_FILTER_CLASS = "topic.filter.class";
+    private static final String TOPIC_FILTER_CLASS_DOC = "TopicFilter to use. Selects topics to replicate.";
+    public static final Class TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class;
+    public static final String GROUP_FILTER_CLASS = "group.filter.class";
+    private static final String GROUP_FILTER_CLASS_DOC = "GroupFilter to use. Selects consumer groups to replicate.";
+    public static final Class GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class;
+    public static final String CONFIG_PROPERTY_FILTER_CLASS = "config.property.filter.class";
+    private static final String CONFIG_PROPERTY_FILTER_CLASS_DOC = "ConfigPropertyFilter to use. Selects topic config "
+            + " properties to replicate.";
+    public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class;
+
+    public static final String OFFSET_LAG_MAX = "offset.lag.max";
+    private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced.";
+    public static final long OFFSET_LAG_MAX_DEFAULT = 100L;
+
+    protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
+    protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
+    protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
+    protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+    protected static final String ADMIN_CLIENT_PREFIX = "admin.";
+    protected static final String SOURCE_ADMIN_CLIENT_PREFIX = "source.admin.";
+    protected static final String TARGET_ADMIN_CLIENT_PREFIX = "target.admin.";
+
+    public MirrorConnectorConfig(Map<String, String> props) {
+        this(CONNECTOR_CONFIG_DEF, props);
+    }
+
+    protected MirrorConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+        super(configDef, props, true);
+    }
+
+    String connectorName() {
+        return getString(ConnectorConfig.NAME_CONFIG);
+    }
+
+    boolean enabled() {
+        return getBoolean(ENABLED);
+    }
+
+    Duration consumerPollTimeout() {
+        return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
+    }
+
+    Duration adminTimeout() {
+        return Duration.ofMillis(getLong(ADMIN_TASK_TIMEOUT_MILLIS));
+    }
+
+    Map<String, Object> sourceProducerConfig() {
+        Map<String, Object> props = new HashMap<>();
+        props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
+        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
+        props.putAll(originalsWithPrefix(PRODUCER_CLIENT_PREFIX));
+        return props;
+    }
+
+    Map<String, Object> sourceConsumerConfig() {
+        Map<String, Object> props = new HashMap<>();
+        props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
+        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
+        props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
+        props.put("enable.auto.commit", "false");
+        props.put("auto.offset.reset", "earliest");
+        return props;
+    }
+
+    Map<String, String> taskConfigForTopicPartitions(List<TopicPartition> topicPartitions) {
+        Map<String, String> props = originalsStrings();
+        String topicPartitionsString = topicPartitions.stream()
+                .map(MirrorUtils::encodeTopicPartition)
+                .collect(Collectors.joining(","));
+        props.put(TASK_TOPIC_PARTITIONS, topicPartitionsString);
+        return props;
+    }
+
+    Map<String, String> taskConfigForConsumerGroups(List<String> groups) {
+        Map<String, String> props = originalsStrings();
+        props.put(TASK_CONSUMER_GROUPS, String.join(",", groups));
+        return props;
+    }
+
+    Map<String, Object> targetAdminConfig() {
+        Map<String, Object> props = new HashMap<>();
+        props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX));
+        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
+        props.putAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX));
+        props.putAll(originalsWithPrefix(TARGET_ADMIN_CLIENT_PREFIX));
+        return props;
+    }
+
+    Map<String, Object> sourceAdminConfig() {
+        Map<String, Object> props = new HashMap<>();
+        props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
+        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
+        props.putAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX));
+        props.putAll(originalsWithPrefix(SOURCE_ADMIN_CLIENT_PREFIX));
+        return props;
+    }
+
+    List<MetricsReporter> metricsReporters() {
+        List<MetricsReporter> reporters = getConfiguredInstances(
+                CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
+        reporters.add(new JmxReporter("kafka.connect.mirror"));
+        return reporters;
+    }
+
+    String sourceClusterAlias() {
+        return getString(SOURCE_CLUSTER_ALIAS);
+    }
+
+    String targetClusterAlias() {
+        return getString(TARGET_CLUSTER_ALIAS);
+    }
+
+    String offsetSyncsTopic() {
+        // ".internal" suffix ensures this doesn't get replicated
+        return "mm2-offset-syncs." + targetClusterAlias() + ".internal";
+    }
+
+    String heartbeatsTopic() {
+        return MirrorClientConfig.HEARTBEATS_TOPIC;
+    }
+
+    // e.g. source1.heartbeats
+    String targetHeartbeatsTopic() {
+        return replicationPolicy().formatRemoteTopic(sourceClusterAlias(), heartbeatsTopic());
+    }
+
+    String checkpointsTopic() {
+        // Checkpoint topics are not "remote topics", as they are not replicated, so we don't
+        // need to use ReplicationPolicy here.
+        return sourceClusterAlias() + MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX;
+    }
+
+    long maxOffsetLag() {
+        return getLong(OFFSET_LAG_MAX);
+    }
+
+    Duration emitHeartbeatsInterval() {
+        if (getBoolean(EMIT_HEARTBEATS_ENABLED)) {
+            return Duration.ofSeconds(getLong(EMIT_HEARTBEATS_INTERVAL_SECONDS));
+        } else {
+            // negative interval to disable
+            return Duration.ofMillis(-1);
+        }
+    }
+
+    Duration emitCheckpointsInterval() {
+        if (getBoolean(EMIT_CHECKPOINTS_ENABLED)) {
+            return Duration.ofSeconds(getLong(EMIT_CHECKPOINTS_INTERVAL_SECONDS));
+        } else {
+            // negative interval to disable
+            return Duration.ofMillis(-1);
+        }
+    }
+
+    Duration refreshTopicsInterval() {
+        if (getBoolean(REFRESH_TOPICS_ENABLED)) {
+            return Duration.ofSeconds(getLong(REFRESH_TOPICS_INTERVAL_SECONDS));
+        } else {
+            // negative interval to disable
+            return Duration.ofMillis(-1);
+        }
+    }
+
+    Duration refreshGroupsInterval() {
+        if (getBoolean(REFRESH_GROUPS_ENABLED)) {
+            return Duration.ofSeconds(getLong(REFRESH_GROUPS_INTERVAL_SECONDS));
+        } else {
+            // negative interval to disable
+            return Duration.ofMillis(-1);
+        }
+    }
+
+    Duration syncTopicConfigsInterval() {
+        if (getBoolean(SYNC_TOPIC_CONFIGS_ENABLED)) {
+            return Duration.ofSeconds(getLong(SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS));
+        } else {
+            // negative interval to disable
+            return Duration.ofMillis(-1);
+        }
+    }
+
+    Duration syncTopicAclsInterval() {
+        if (getBoolean(SYNC_TOPIC_ACLS_ENABLED)) {
+            return Duration.ofSeconds(getLong(SYNC_TOPIC_ACLS_INTERVAL_SECONDS));
+        } else {
+            // negative interval to disable
+            return Duration.ofMillis(-1);
+        }
+    }
+
+    ReplicationPolicy replicationPolicy() {
+        return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class);
+    }
+
+    int replicationFactor() {
+        return getInt(REPLICATION_FACTOR);
+    }
+
+    short heartbeatsTopicReplicationFactor() {
+        return getShort(HEARTBEATS_TOPIC_REPLICATION_FACTOR);
+    }
+
+    short checkpointsTopicReplicationFactor() {
+        return getShort(CHECKPOINTS_TOPIC_REPLICATION_FACTOR);
+    }
+
+    short offsetSyncsTopicReplicationFactor() {
+        return getShort(OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR);
+    }
+
+    TopicFilter topicFilter() {
+        return getConfiguredInstance(TOPIC_FILTER_CLASS, TopicFilter.class);
+    }
+
+    GroupFilter groupFilter() {
+        return getConfiguredInstance(GROUP_FILTER_CLASS, GroupFilter.class);
+    }
+
+    ConfigPropertyFilter configPropertyFilter() {
+        return getConfiguredInstance(CONFIG_PROPERTY_FILTER_CLASS, ConfigPropertyFilter.class);
+    }
+
+    protected static final ConfigDef CONNECTOR_CONFIG_DEF = ConnectorConfig.configDef()
+            .define(
+                    ENABLED,
+                    ConfigDef.Type.BOOLEAN,
+                    true,
+                    ConfigDef.Importance.LOW,
+                    ENABLED_DOC)
+            .define(
+                    TOPICS,
+                    ConfigDef.Type.LIST,
+                    TOPICS_DEFAULT,
+                    ConfigDef.Importance.HIGH,
+                    TOPICS_DOC) 
+            .define(
+                    TOPICS_BLACKLIST,
+                    ConfigDef.Type.LIST,
+                    TOPICS_BLACKLIST_DEFAULT,
+                    ConfigDef.Importance.HIGH,
+                    TOPICS_BLACKLIST_DOC)
+            .define(
+                    GROUPS,
+                    ConfigDef.Type.LIST,
+                    GROUPS_DEFAULT,
+                    ConfigDef.Importance.HIGH,
+                    GROUPS_DOC) 
+            .define(
+                    GROUPS_BLACKLIST,
+                    ConfigDef.Type.LIST,
+                    GROUPS_BLACKLIST_DEFAULT,
+                    ConfigDef.Importance.HIGH,
+                    GROUPS_BLACKLIST_DOC)
+            .define(
+                    CONFIG_PROPERTIES_BLACKLIST,
+                    ConfigDef.Type.LIST,
+                    CONFIG_PROPERTIES_BLACKLIST_DEFAULT,
+                    ConfigDef.Importance.HIGH,
+                    CONFIG_PROPERTIES_BLACKLIST_DOC)
+            .define(
+                    TOPIC_FILTER_CLASS,
+                    ConfigDef.Type.CLASS,
+                    TOPIC_FILTER_CLASS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    TOPIC_FILTER_CLASS_DOC)
+            .define(
+                    GROUP_FILTER_CLASS,
+                    ConfigDef.Type.CLASS,
+                    GROUP_FILTER_CLASS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    GROUP_FILTER_CLASS_DOC)
+            .define(
+                    CONFIG_PROPERTY_FILTER_CLASS,
+                    ConfigDef.Type.CLASS,
+                    CONFIG_PROPERTY_FILTER_CLASS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    CONFIG_PROPERTY_FILTER_CLASS_DOC)
+            .define(
+                    SOURCE_CLUSTER_ALIAS,
+                    ConfigDef.Type.STRING,
+                    ConfigDef.Importance.HIGH,
+                    SOURCE_CLUSTER_ALIAS_DOC)
+            .define(
+                    TARGET_CLUSTER_ALIAS,
+                    ConfigDef.Type.STRING,
+                    TARGET_CLUSTER_ALIAS_DEFAULT,
+                    ConfigDef.Importance.HIGH,
+                    TARGET_CLUSTER_ALIAS_DOC)
+            .define(
+                    CONSUMER_POLL_TIMEOUT_MILLIS,
+                    ConfigDef.Type.LONG,
+                    CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    CONSUMER_POLL_TIMEOUT_MILLIS_DOC)
+            .define(
+                    ADMIN_TASK_TIMEOUT_MILLIS,
+                    ConfigDef.Type.LONG,
+                    ADMIN_TASK_TIMEOUT_MILLIS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    ADMIN_TASK_TIMEOUT_MILLIS_DOC)
+            .define(
+                    REFRESH_TOPICS_ENABLED,
+                    ConfigDef.Type.BOOLEAN,
+                    REFRESH_TOPICS_ENABLED_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    REFRESH_TOPICS_ENABLED_DOC)
+            .define(
+                    REFRESH_TOPICS_INTERVAL_SECONDS,
+                    ConfigDef.Type.LONG,
+                    REFRESH_TOPICS_INTERVAL_SECONDS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    REFRESH_TOPICS_INTERVAL_SECONDS_DOC)
+            .define(
+                    REFRESH_GROUPS_ENABLED,
+                    ConfigDef.Type.BOOLEAN,
+                    REFRESH_GROUPS_ENABLED_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    REFRESH_GROUPS_ENABLED_DOC)
+            .define(
+                    REFRESH_GROUPS_INTERVAL_SECONDS,
+                    ConfigDef.Type.LONG,
+                    REFRESH_GROUPS_INTERVAL_SECONDS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    REFRESH_GROUPS_INTERVAL_SECONDS_DOC)
+            .define(
+                    SYNC_TOPIC_CONFIGS_ENABLED,
+                    ConfigDef.Type.BOOLEAN,
+                    SYNC_TOPIC_CONFIGS_ENABLED_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    SYNC_TOPIC_CONFIGS_ENABLED_DOC)
+            .define(
+                    SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS,
+                    ConfigDef.Type.LONG,
+                    SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC)
+            .define(
+                    SYNC_TOPIC_ACLS_ENABLED,
+                    ConfigDef.Type.BOOLEAN,
+                    SYNC_TOPIC_ACLS_ENABLED_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    SYNC_TOPIC_ACLS_ENABLED_DOC)
+            .define(
+                    SYNC_TOPIC_ACLS_INTERVAL_SECONDS,
+                    ConfigDef.Type.LONG,
+                    SYNC_TOPIC_ACLS_INTERVAL_SECONDS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    SYNC_TOPIC_ACLS_INTERVAL_SECONDS_DOC)
+            .define(
+                    EMIT_HEARTBEATS_ENABLED,
+                    ConfigDef.Type.BOOLEAN,
+                    EMIT_HEARTBEATS_ENABLED_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    EMIT_HEARTBEATS_ENABLED_DOC)
+            .define(
+                    EMIT_HEARTBEATS_INTERVAL_SECONDS,
+                    ConfigDef.Type.LONG,
+                    EMIT_HEARTBEATS_INTERVAL_SECONDS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    EMIT_HEARTBEATS_INTERVAL_SECONDS_DOC)
+            .define(
+                    EMIT_CHECKPOINTS_ENABLED,
+                    ConfigDef.Type.BOOLEAN,
+                    EMIT_CHECKPOINTS_ENABLED_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    EMIT_CHECKPOINTS_ENABLED_DOC)
+            .define(
+                    EMIT_CHECKPOINTS_INTERVAL_SECONDS,
+                    ConfigDef.Type.LONG,
+                    EMIT_CHECKPOINTS_INTERVAL_SECONDS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    EMIT_CHECKPOINTS_INTERVAL_SECONDS_DOC)
+            .define(
+                    REPLICATION_POLICY_CLASS,
+                    ConfigDef.Type.CLASS,
+                    REPLICATION_POLICY_CLASS_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    REPLICATION_POLICY_CLASS_DOC)
+            .define(
+                    REPLICATION_POLICY_SEPARATOR,
+                    ConfigDef.Type.STRING,
+                    REPLICATION_POLICY_SEPARATOR_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    REPLICATION_POLICY_SEPARATOR_DOC)
+            .define(
+                    REPLICATION_FACTOR,
+                    ConfigDef.Type.INT,
+                    REPLICATION_FACTOR_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    REPLICATION_FACTOR_DOC)
+            .define(
+                    HEARTBEATS_TOPIC_REPLICATION_FACTOR,
+                    ConfigDef.Type.SHORT,
+                    HEARTBEATS_TOPIC_REPLICATION_FACTOR_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    HEARTBEATS_TOPIC_REPLICATION_FACTOR_DOC)
+            .define(
+                    CHECKPOINTS_TOPIC_REPLICATION_FACTOR,
+                    ConfigDef.Type.SHORT,
+                    CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DOC)
+            .define(
+                    OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR,
+                    ConfigDef.Type.SHORT,
+                    OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR_DOC)
+            .define(
+                    OFFSET_LAG_MAX,
+                    ConfigDef.Type.LONG,
+                    OFFSET_LAG_MAX_DEFAULT,
+                    ConfigDef.Importance.LOW,
+                    OFFSET_LAG_MAX_DOC)
+            .define(
+                    CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                    ConfigDef.Type.LIST,
+                    null,
+                    ConfigDef.Importance.LOW,
+                    CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+            .define(
+                    CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+                    ConfigDef.Type.STRING,
+                    CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+                    ConfigDef.Importance.MEDIUM,
+                    CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+            .withClientSslSupport()
+            .withClientSaslSupport();
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
new file mode 100644
index 0000000..3942c84
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Map;
+import java.util.List;
+import java.util.Collections;
+
+/** Emits heartbeats to Kafka.
+ */
+public class MirrorHeartbeatConnector extends SourceConnector {
+    private MirrorConnectorConfig config;
+    private Scheduler scheduler;
+
+    @Override
+    public void start(Map<String, String> props) {
+        config = new MirrorConnectorConfig(props);
+        scheduler = new Scheduler(MirrorHeartbeatConnector.class, config.adminTimeout());
+        scheduler.execute(this::createInternalTopics, "creating internal topics");
+    }
+
+    @Override
+    public void stop() {
+        Utils.closeQuietly(scheduler, "scheduler");
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MirrorHeartbeatTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        // just need a single task
+        return Collections.singletonList(config.originalsStrings());
+    }
+
+    @Override
+    public ConfigDef config() {
+        return MirrorConnectorConfig.CONNECTOR_CONFIG_DEF;
+    }
+
+    @Override
+    public String version() {
+        return "1";
+    }
+
+    private void createInternalTopics() {
+        MirrorUtils.createSinglePartitionCompactedTopic(config.heartbeatsTopic(),
+            config.heartbeatsTopicReplicationFactor(), config.targetAdminConfig());
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java
new file mode 100644
index 0000000..6bfe441
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTask.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.data.Schema;
+
+import java.util.Map;
+import java.util.List;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+
+/** Emits heartbeats. */
+public class MirrorHeartbeatTask extends SourceTask {
+    private String sourceClusterAlias;
+    private String targetClusterAlias;
+    private String heartbeatsTopic;
+    private Duration interval;
+    private CountDownLatch stopped;
+
+    @Override
+    public void start(Map<String, String> props) {
+        stopped = new CountDownLatch(1);
+        MirrorTaskConfig config = new MirrorTaskConfig(props);
+        sourceClusterAlias = config.sourceClusterAlias();
+        targetClusterAlias = config.targetClusterAlias();
+        heartbeatsTopic = config.heartbeatsTopic();
+        interval = config.emitHeartbeatsInterval();
+    }
+
+    @Override
+    public void commit() throws InterruptedException {
+        // nop
+    }
+
+    @Override
+    public void stop() {
+        stopped.countDown();
+    }
+
+    @Override
+    public String version() {
+        return "1";
+    }
+
+    @Override
+    public List<SourceRecord> poll() throws InterruptedException {
+        // pause to throttle, unless we've stopped
+        if (stopped.await(interval.toMillis(), TimeUnit.MILLISECONDS)) {
+            // SourceWorkerTask expects non-zero batches or null
+            return null;
+        }
+        long timestamp = System.currentTimeMillis();
+        Heartbeat heartbeat = new Heartbeat(sourceClusterAlias, targetClusterAlias, timestamp);
+        SourceRecord record = new SourceRecord(
+            heartbeat.connectPartition(), MirrorUtils.wrapOffset(0),
+            heartbeatsTopic, 0,
+            Schema.BYTES_SCHEMA, heartbeat.recordKey(),
+            Schema.BYTES_SCHEMA, heartbeat.recordValue(),
+            timestamp);
+        return Collections.singletonList(record);
+    }
+
+    @Override
+    public void commitRecord(SourceRecord record) {
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
new file mode 100644
index 0000000..d635b1c
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.sourceforge.argparse4j.impl.Arguments;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.ArgumentParsers;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.io.File;
+
+/**
+ *  Entry point for "MirrorMaker 2.0".
+ *  <p>
+ *  MirrorMaker runs a set of Connectors between multiple clusters, in order to replicate data, configuration,
+ *  ACL rules, and consumer group state.
+ *  </p>
+ *  <p>
+ *  Configuration is via a top-level "mm2.properties" file, which supports per-cluster and per-replication
+ *  sub-configs. Each source->target replication must be explicitly enabled. For example:
+ *  </p>
+ *  <pre>
+ *    clusters = primary, backup
+ *    primary.bootstrap.servers = vip1:9092
+ *    backup.bootstrap.servers = vip2:9092
+ *    primary->backup.enabled = true
+ *    backup->primary.enabled = true
+ *  </pre>
+ *  <p>
+ *  Run as follows:
+ *  </p>
+ *  <pre>
+ *    ./bin/connect-mirror-maker.sh mm2.properties
+ *  </pre>
+ *  <p>
+ *  Additional information and example configurations are provided in ./connect/mirror/README.md
+ *  </p>
+ */
+public class MirrorMaker {
+    private static final Logger log = LoggerFactory.getLogger(MirrorMaker.class);
+
+    private static final long SHUTDOWN_TIMEOUT_SECONDS = 60L;
+    private static final ConnectorClientConfigOverridePolicy CLIENT_CONFIG_OVERRIDE_POLICY =
+            new AllConnectorClientConfigOverridePolicy();
+
+    private static final List<Class> CONNECTOR_CLASSES = Arrays.asList(
+        MirrorSourceConnector.class,
+        MirrorHeartbeatConnector.class,
+        MirrorCheckpointConnector.class);
+ 
+    private final Map<SourceAndTarget, Herder> herders = new HashMap<>();
+    private CountDownLatch startLatch;
+    private CountDownLatch stopLatch;
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+    private final ShutdownHook shutdownHook;
+    private final String advertisedBaseUrl;
+    private final Time time;
+    private final MirrorMakerConfig config;
+    private final Set<String> clusters;
+    private final Set<SourceAndTarget> herderPairs;
+
+    /**
+     * @param config    MM2 configuration from mm2.properties file
+     * @param clusters  target clusters for this node. These must match cluster
+     *                  aliases as defined in the config. If null or empty list,
+     *                  uses all clusters in the config.
+     * @param time      time source
+     */
+    public MirrorMaker(MirrorMakerConfig config, List<String> clusters, Time time) {
+        log.debug("Kafka MirrorMaker instance created");
+        this.time = time;
+        this.advertisedBaseUrl = "NOTUSED";
+        this.config = config;
+        if (clusters != null && !clusters.isEmpty()) {
+            this.clusters = new HashSet<>(clusters);
+        } else {
+            // default to all clusters
+            this.clusters = config.clusters();
+        }
+        log.info("Targeting clusters {}", this.clusters);
+        this.herderPairs = config.clusterPairs().stream()
+            .filter(x -> this.clusters.contains(x.target()))
+            .collect(Collectors.toSet());
+        if (herderPairs.isEmpty()) {
+            throw new IllegalArgumentException("No source->target replication flows.");
+        }
+        this.herderPairs.forEach(x -> addHerder(x));
+        shutdownHook = new ShutdownHook();
+    }
+
+    /**
+     * @param config    MM2 configuration from mm2.properties file
+     * @param clusters  target clusters for this node. These must match cluster
+     *                  aliases as defined in the config. If null or empty list,
+     *                  uses all clusters in the config.
+     * @param time      time source
+     */
+    public MirrorMaker(Map<String, String> config, List<String> clusters, Time time) {
+        this(new MirrorMakerConfig(config), clusters, time);
+    }
+
+    public MirrorMaker(Map<String, String> props, List<String> clusters) {
+        this(props, clusters, Time.SYSTEM);
+    }
+
+    public MirrorMaker(Map<String, String> props) {
+        this(props, null);
+    }
+
+
+    public void start() {
+        log.info("Kafka MirrorMaker starting with {} herders.", herders.size());
+        if (startLatch != null) {
+            throw new IllegalStateException("MirrorMaker instance already started");
+        }
+        startLatch = new CountDownLatch(herders.size());
+        stopLatch = new CountDownLatch(herders.size());
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+        for (Herder herder : herders.values()) {
+            try {
+                herder.start();
+            } finally {
+                startLatch.countDown();
+            }
+        }
+        log.info("Configuring connectors...");
+        herderPairs.forEach(x -> configureConnectors(x));
+        log.info("Kafka MirrorMaker started");
+    }
+
+    public void stop() {
+        boolean wasShuttingDown = shutdown.getAndSet(true);
+        if (!wasShuttingDown) {
+            log.info("Kafka MirrorMaker stopping");
+            for (Herder herder : herders.values()) {
+                try {
+                    herder.stop();
+                } finally {
+                    stopLatch.countDown();
+                }
+            }
+            log.info("Kafka MirrorMaker stopped.");
+        }
+    }
+
+    public void awaitStop() {
+        try {
+            stopLatch.await();
+        } catch (InterruptedException e) {
+            log.error("Interrupted waiting for MirrorMaker to shutdown");
+        }
+    }
+
+    private void configureConnector(SourceAndTarget sourceAndTarget, Class connectorClass) {
+        checkHerder(sourceAndTarget);
+        Map<String, String> connectorProps = config.connectorBaseConfig(sourceAndTarget, connectorClass);
+        herders.get(sourceAndTarget)
+                .putConnectorConfig(connectorClass.getSimpleName(), connectorProps, true, (e, x) -> {
+                    if (e instanceof NotLeaderException) {
+                        // No way to determine if the connector is a leader or not beforehand.
+                        log.info("Connector {} is a follower. Using existing configuration.", sourceAndTarget);
+                    } else {
+                        log.info("Connector {} configured.", sourceAndTarget, e);
+                    }
+                });
+    }
+
+    private void checkHerder(SourceAndTarget sourceAndTarget) {
+        if (!herders.containsKey(sourceAndTarget)) {
+            throw new IllegalArgumentException("No herder for " + sourceAndTarget.toString());
+        }
+    }
+
+    private void configureConnectors(SourceAndTarget sourceAndTarget) {
+        CONNECTOR_CLASSES.forEach(x -> configureConnector(sourceAndTarget, x));
+    }
+
+    private void addHerder(SourceAndTarget sourceAndTarget) {
+        log.info("creating herder for " + sourceAndTarget.toString());
+        Map<String, String> workerProps = config.workerConfig(sourceAndTarget);
+        String advertisedUrl = advertisedBaseUrl + "/" + sourceAndTarget.source();
+        String workerId = sourceAndTarget.toString();
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
+        DistributedConfig distributedConfig = new DistributedConfig(workerProps);
+        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(distributedConfig);
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        offsetBackingStore.configure(distributedConfig);
+        Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+        WorkerConfigTransformer configTransformer = worker.configTransformer();
+        Converter internalValueConverter = worker.getInternalValueConverter();
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        statusBackingStore.configure(distributedConfig);
+        ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
+                internalValueConverter,
+                distributedConfig,
+                configTransformer);
+        Herder herder = new DistributedHerder(distributedConfig, time, worker,
+                kafkaClusterId, statusBackingStore, configBackingStore,
+                advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY);
+        herders.put(sourceAndTarget, herder);
+    }
+
+    private class ShutdownHook extends Thread {
+        @Override
+        public void run() {
+            try {
+                if (!startLatch.await(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+                    log.error("Timed out in shutdown hook waiting for MirrorMaker startup to finish. Unable to shutdown cleanly.");
+                }
+            } catch (InterruptedException e) {
+                log.error("Interrupted in shutdown hook while waiting for MirrorMaker startup to finish. Unable to shutdown cleanly.");
+            } finally {
+                MirrorMaker.this.stop();
+            }
+        }
+    }
+
+    public static void main(String[] args) {
+        ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-mirror-maker");
+        parser.description("MirrorMaker 2.0 driver");
+        parser.addArgument("config").type(Arguments.fileType().verifyCanRead())
+            .metavar("mm2.properties").required(true)
+            .help("MM2 configuration file.");
+        parser.addArgument("--clusters").nargs("+").metavar("CLUSTER").required(false)
+            .help("Target cluster to use for this node.");
+        Namespace ns;
+        try {
+            ns = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            parser.handleError(e);
+            System.exit(-1);
+            return;
+        }
+        File configFile = (File) ns.get("config");
+        List<String> clusters = ns.getList("clusters");
+        try {
+            log.info("Kafka MirrorMaker initializing ...");
+
+            Properties props = Utils.loadProps(configFile.getPath());
+            Map<String, String> config = Utils.propsToStringMap(props);
+            MirrorMaker mirrorMaker = new MirrorMaker(config, clusters, Time.SYSTEM);
+            
+            try {
+                mirrorMaker.start();
+            } catch (Exception e) {
+                log.error("Failed to start MirrorMaker", e);
+                mirrorMaker.stop();
+                Exit.exit(3);
+            }
+
+            mirrorMaker.awaitStop();
+
+        } catch (Throwable t) {
+            log.error("Stopping due to error", t);
+            Exit.exit(2);
+        }
+    }
+
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
new file mode 100644
index 0000000..df5d38f
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.config.ConfigTransformer;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+/** Top-level config describing replication flows between multiple Kafka clusters.
+ *
+ *  Supports cluster-level properties of the form cluster.x.y.z, and replication-level
+ *  properties of the form source->target.x.y.z.
+ *  e.g.
+ *
+ *      clusters = A, B, C
+ *      A.bootstrap.servers = aaa:9092
+ *      A.security.protocol = SSL
+ *      --->%---
+ *      A->B.enabled = true
+ *      A->B.producer.client.id = "A-B-producer"
+ *      --->%---
+ *
+ */
+public class MirrorMakerConfig extends AbstractConfig {
+
+    public static final String CLUSTERS_CONFIG = "clusters";
+    private static final String CLUSTERS_DOC = "List of cluster aliases.";
+    public static final String CONFIG_PROVIDERS_CONFIG = WorkerConfig.CONFIG_PROVIDERS_CONFIG;
+    private static final String CONFIG_PROVIDERS_DOC = "Names of ConfigProviders to use.";
+
+    private static final String NAME = "name";
+    private static final String CONNECTOR_CLASS = "connector.class";
+    private static final String SOURCE_CLUSTER_ALIAS = "source.cluster.alias";
+    private static final String TARGET_CLUSTER_ALIAS = "target.cluster.alias";
+    private static final String GROUP_ID_CONFIG = "group.id";
+    private static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
+    private static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
+    private static final String HEADER_CONVERTER_CLASS_CONFIG = "header.converter";
+    private static final String BYTE_ARRAY_CONVERTER_CLASS =
+        "org.apache.kafka.connect.converters.ByteArrayConverter";
+    private static final String REPLICATION_FACTOR = "replication.factor";
+
+    static final String SOURCE_CLUSTER_PREFIX = "source.cluster.";
+    static final String TARGET_CLUSTER_PREFIX = "target.cluster.";
+
+    private final Plugins plugins;
+   
+    public MirrorMakerConfig(Map<?, ?> props) {
+        super(CONFIG_DEF, props, true);
+        plugins = new Plugins(originalsStrings());
+    }
+
+    public Set<String> clusters() {
+        return new HashSet<>(getList(CLUSTERS_CONFIG));
+    }
+
+    public List<SourceAndTarget> clusterPairs() {
+        List<SourceAndTarget> pairs = new ArrayList<>();
+        Set<String> clusters = clusters();
+        for (String source : clusters) {
+            for (String target : clusters) {
+                SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target);
+                if (!source.equals(target)) {
+                    pairs.add(sourceAndTarget);
+                }
+            }
+        }
+        return pairs;
+    }
+
+    /** Construct a MirrorClientConfig from properties of the form cluster.x.y.z.
+      * Use to connect to a cluster based on the MirrorMaker top-level config file.
+      */
+    public MirrorClientConfig clientConfig(String cluster) {
+        Map<String, String> props = new HashMap<>();
+        props.putAll(originalsStrings());
+        props.putAll(clusterProps(cluster));
+        return new MirrorClientConfig(transform(props));
+    }
+
+    // loads properties of the form cluster.x.y.z
+    Map<String, String> clusterProps(String cluster) {
+        Map<String, String> props = new HashMap<>();
+        Map<String, String> strings = originalsStrings();
+
+        props.putAll(stringsWithPrefixStripped(cluster + "."));
+
+        for (String k : MirrorClientConfig.CLIENT_CONFIG_DEF.names()) {
+            String v = props.get(k);
+            if (v != null) {
+                props.putIfAbsent("producer." + k, v);
+                props.putIfAbsent("consumer." + k, v);
+                props.putIfAbsent("admin." + k, v);
+            }
+        }
+
+        for (String k : MirrorClientConfig.CLIENT_CONFIG_DEF.names()) {
+            String v = strings.get(k);
+            if (v != null) {
+                props.putIfAbsent("producer." + k, v);
+                props.putIfAbsent("consumer." + k, v);
+                props.putIfAbsent("admin." + k, v);
+                props.putIfAbsent(k, v);
+            }
+        }
+ 
+        return props;
+    }
+
+    // loads worker configs based on properties of the form x.y.z and cluster.x.y.z 
+    Map<String, String> workerConfig(SourceAndTarget sourceAndTarget) {
+        Map<String, String> props = new HashMap<>();
+        props.putAll(clusterProps(sourceAndTarget.target()));
+      
+        // Accept common top-level configs that are otherwise ignored by MM2.
+        // N.B. all other worker properties should be configured for specific herders,
+        // e.g. primary->backup.client.id
+        props.putAll(stringsWithPrefix("offset.storage"));
+        props.putAll(stringsWithPrefix("config.storage"));
+        props.putAll(stringsWithPrefix("status.storage"));
+        props.putAll(stringsWithPrefix("key.converter")); 
+        props.putAll(stringsWithPrefix("value.converter")); 
+        props.putAll(stringsWithPrefix("header.converter"));
+        props.putAll(stringsWithPrefix("task"));
+        props.putAll(stringsWithPrefix("worker"));
+ 
+        // transform any expression like ${provider:path:key}, since the worker doesn't do so
+        props = transform(props);
+        props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG));
+
+        // fill in reasonable defaults
+        props.putIfAbsent(GROUP_ID_CONFIG, sourceAndTarget.source() + "-mm2");
+        props.putIfAbsent(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "mm2-offsets."
+                + sourceAndTarget.source() + ".internal");
+        props.putIfAbsent(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "mm2-status."
+                + sourceAndTarget.source() + ".internal");
+        props.putIfAbsent(DistributedConfig.CONFIG_TOPIC_CONFIG, "mm2-configs."
+                + sourceAndTarget.source() + ".internal");
+        props.putIfAbsent(KEY_CONVERTER_CLASS_CONFIG, BYTE_ARRAY_CONVERTER_CLASS); 
+        props.putIfAbsent(VALUE_CONVERTER_CLASS_CONFIG, BYTE_ARRAY_CONVERTER_CLASS); 
+        props.putIfAbsent(HEADER_CONVERTER_CLASS_CONFIG, BYTE_ARRAY_CONVERTER_CLASS);
+
+        return props;
+    }
+
+    // loads properties of the form cluster.x.y.z and source->target.x.y.z
+    Map<String, String> connectorBaseConfig(SourceAndTarget sourceAndTarget, Class connectorClass) {
+        Map<String, String> props = new HashMap<>();
+
+        props.putAll(originalsStrings());
+        props.keySet().retainAll(MirrorConnectorConfig.CONNECTOR_CONFIG_DEF.names());
+        
+        props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG));
+        
+        props.putAll(withPrefix(SOURCE_CLUSTER_PREFIX, clusterProps(sourceAndTarget.source())));
+        props.putAll(withPrefix(TARGET_CLUSTER_PREFIX, clusterProps(sourceAndTarget.target())));
+
+        props.putIfAbsent(NAME, connectorClass.getSimpleName());
+        props.putIfAbsent(CONNECTOR_CLASS, connectorClass.getName());
+        props.putIfAbsent(SOURCE_CLUSTER_ALIAS, sourceAndTarget.source());
+        props.putIfAbsent(TARGET_CLUSTER_ALIAS, sourceAndTarget.target());
+
+        // override with connector-level properties
+        props.putAll(stringsWithPrefixStripped(sourceAndTarget.source() + "->"
+            + sourceAndTarget.target() + "."));
+
+        // disabled by default
+        props.putIfAbsent(MirrorConnectorConfig.ENABLED, "false");
+
+        // don't transform -- the worker will handle transformation of Connector and Task configs
+        return props;
+    }
+
+    List<String> configProviders() {
+        return getList(CONFIG_PROVIDERS_CONFIG);
+    } 
+
+    Map<String, String> transform(Map<String, String> props) {
+        // transform worker config according to config.providers
+        List<String> providerNames = configProviders();
+        Map<String, ConfigProvider> providers = new HashMap<>();
+        for (String name : providerNames) {
+            ConfigProvider configProvider = plugins.newConfigProvider(
+                    this,
+                    CONFIG_PROVIDERS_CONFIG + "." + name,
+                    Plugins.ClassLoaderUsage.PLUGINS
+            );
+            providers.put(name, configProvider);
+        }
+        ConfigTransformer transformer = new ConfigTransformer(providers);
+        Map<String, String> transformed = transformer.transform(props).data();
+        providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+        return transformed;
+    }
+ 
+    protected static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, CLUSTERS_DOC)
+            .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, Collections.emptyList(), Importance.LOW, CONFIG_PROVIDERS_DOC)
+            // security support
+            .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+                Type.STRING,
+                CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+                Importance.MEDIUM,
+                CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+            .withClientSslSupport()
+            .withClientSaslSupport();
+
+    private Map<String, String> stringsWithPrefixStripped(String prefix) {
+        return originalsStrings().entrySet().stream()
+            .filter(x -> x.getKey().startsWith(prefix))
+            .collect(Collectors.toMap(x -> x.getKey().substring(prefix.length()), x -> x.getValue()));
+    }
+
+    private Map<String, String> stringsWithPrefix(String prefix) {
+        Map<String, String> strings = originalsStrings();
+        strings.keySet().removeIf(x -> !x.startsWith(prefix));
+        return strings;
+    } 
+
+    static Map<String, String> withPrefix(String prefix, Map<String, String> props) {
+        return props.entrySet().stream()
+            .collect(Collectors.toMap(x -> prefix + x.getKey(), x -> x.getValue()));
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
new file mode 100644
index 0000000..51ddafc
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.stream.Collectors;
+
+/** Metrics for replicated topic-partitions */
+class MirrorMetrics implements AutoCloseable {
+
+    private static final String SOURCE_CONNECTOR_GROUP = MirrorSourceConnector.class.getSimpleName();
+    private static final String CHECKPOINT_CONNECTOR_GROUP = MirrorCheckpointConnector.class.getSimpleName();
+
+    private static final Set<String> PARTITION_TAGS = new HashSet<>(Arrays.asList("target", "topic", "partition"));
+    private static final Set<String> GROUP_TAGS = new HashSet<>(Arrays.asList("source", "target", "group", "topic", "partition"));
+    
+    private static final MetricNameTemplate RECORD_COUNT = new MetricNameTemplate(
+            "record-count", SOURCE_CONNECTOR_GROUP,
+            "Number of source records replicated to the target cluster.", PARTITION_TAGS);
+    private static final MetricNameTemplate RECORD_AGE = new MetricNameTemplate(
+            "record-age-ms", SOURCE_CONNECTOR_GROUP,
+            "The age of incoming source records when replicated to the target cluster.", PARTITION_TAGS);
+    private static final MetricNameTemplate RECORD_AGE_MAX = new MetricNameTemplate(
+            "record-age-ms-max", SOURCE_CONNECTOR_GROUP,
+            "The max age of incoming source records when replicated to the target cluster.", PARTITION_TAGS);
+    private static final MetricNameTemplate RECORD_AGE_MIN = new MetricNameTemplate(
+            "record-age-ms-min", SOURCE_CONNECTOR_GROUP,
+            "The min age of incoming source records when replicated to the target cluster.", PARTITION_TAGS);
+    private static final MetricNameTemplate RECORD_AGE_AVG = new MetricNameTemplate(
+            "record-age-ms-avg", SOURCE_CONNECTOR_GROUP,
+            "The average age of incoming source records when replicated to the target cluster.", PARTITION_TAGS);
+    private static final MetricNameTemplate BYTE_RATE = new MetricNameTemplate(
+            "byte-rate", SOURCE_CONNECTOR_GROUP,
+            "Average number of bytes replicated per second.", PARTITION_TAGS);
+    private static final MetricNameTemplate REPLICATION_LATENCY = new MetricNameTemplate(
+            "replication-latency-ms", SOURCE_CONNECTOR_GROUP,
+            "Time it takes records to replicate from source to target cluster.", PARTITION_TAGS);
+    private static final MetricNameTemplate REPLICATION_LATENCY_MAX = new MetricNameTemplate(
+            "replication-latency-ms-max", SOURCE_CONNECTOR_GROUP,
+            "Max time it takes records to replicate from source to target cluster.", PARTITION_TAGS);
+    private static final MetricNameTemplate REPLICATION_LATENCY_MIN = new MetricNameTemplate(
+            "replication-latency-ms-min", SOURCE_CONNECTOR_GROUP,
+            "Min time it takes records to replicate from source to target cluster.", PARTITION_TAGS);
+    private static final MetricNameTemplate REPLICATION_LATENCY_AVG = new MetricNameTemplate(
+            "replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP,
+            "Average time it takes records to replicate from source to target cluster.", PARTITION_TAGS);
+
+    private static final MetricNameTemplate CHECKPOINT_LATENCY = new MetricNameTemplate(
+            "checkpoint-latency-ms", CHECKPOINT_CONNECTOR_GROUP,
+            "Time it takes consumer group offsets to replicate from source to target cluster.", GROUP_TAGS);
+    private static final MetricNameTemplate CHECKPOINT_LATENCY_MAX = new MetricNameTemplate(
+            "checkpoint-latency-ms-max", CHECKPOINT_CONNECTOR_GROUP,
+            "Max time it takes consumer group offsets to replicate from source to target cluster.", GROUP_TAGS);
+    private static final MetricNameTemplate CHECKPOINT_LATENCY_MIN = new MetricNameTemplate(
+            "checkpoint-latency-ms-min", CHECKPOINT_CONNECTOR_GROUP,
+            "Min time it takes consumer group offsets to replicate from source to target cluster.", GROUP_TAGS);
+    private static final MetricNameTemplate CHECKPOINT_LATENCY_AVG = new MetricNameTemplate(
+            "checkpoint-latency-ms-avg", CHECKPOINT_CONNECTOR_GROUP,
+            "Average time it takes consumer group offsets to replicate from source to target cluster.", GROUP_TAGS);
+
+
+    private final Metrics metrics; 
+    private final Map<TopicPartition, PartitionMetrics> partitionMetrics; 
+    private final Map<String, GroupMetrics> groupMetrics = new HashMap<>();
+    private final String source;
+    private final String target;
+    private final Set<String> groups;
+
+    MirrorMetrics(MirrorTaskConfig taskConfig) {
+        this.target = taskConfig.targetClusterAlias();
+        this.source = taskConfig.sourceClusterAlias();
+        this.groups = taskConfig.taskConsumerGroups();
+        this.metrics = new Metrics();
+
+        // for side-effect
+        metrics.sensor("record-count");
+        metrics.sensor("byte-rate");
+        metrics.sensor("record-age");
+        metrics.sensor("replication-latency");
+
+        ReplicationPolicy replicationPolicy = taskConfig.replicationPolicy();
+        partitionMetrics = taskConfig.taskTopicPartitions().stream()
+            .map(x -> new TopicPartition(replicationPolicy.formatRemoteTopic(source, x.topic()), x.partition()))
+            .collect(Collectors.toMap(x -> x, x -> new PartitionMetrics(x)));
+
+    }
+
+    @Override
+    public void close() {
+        metrics.close();
+    }
+
+    void countRecord(TopicPartition topicPartition) {
+        partitionMetrics.get(topicPartition).recordSensor.record();
+    }
+
+    void recordAge(TopicPartition topicPartition, long ageMillis) {
+        partitionMetrics.get(topicPartition).recordAgeSensor.record((double) ageMillis);
+    }
+
+    void replicationLatency(TopicPartition topicPartition, long millis) {
+        partitionMetrics.get(topicPartition).replicationLatencySensor.record((double) millis);
+    }
+
+    void recordBytes(TopicPartition topicPartition, long bytes) {
+        partitionMetrics.get(topicPartition).byteRateSensor.record((double) bytes);
+    }
+
+    void checkpointLatency(TopicPartition topicPartition, String group, long millis) {
+        group(topicPartition, group).checkpointLatencySensor.record((double) millis);
+    }
+
+    GroupMetrics group(TopicPartition topicPartition, String group) {
+        return groupMetrics.computeIfAbsent(String.join("-", topicPartition.toString(), group),
+            x -> new GroupMetrics(topicPartition, group));
+    }
+
+    void addReporter(MetricsReporter reporter) {
+        metrics.addReporter(reporter);
+    }
+
+    private class PartitionMetrics {
+        private final Sensor recordSensor;
+        private final Sensor byteRateSensor;
+        private final Sensor recordAgeSensor;
+        private final Sensor replicationLatencySensor;
+        private final TopicPartition topicPartition;
+     
+        PartitionMetrics(TopicPartition topicPartition) {
+            this.topicPartition = topicPartition;
+
+            Map<String, String> tags = new LinkedHashMap<>();
+            tags.put("target", target); 
+            tags.put("topic", topicPartition.topic());
+            tags.put("partition", Integer.toString(topicPartition.partition()));
+
+            recordSensor = metrics.sensor("record-count");
+            recordSensor.add(metrics.metricInstance(RECORD_COUNT, tags), new WindowedCount());
+
+            byteRateSensor = metrics.sensor("byte-rate");
+            byteRateSensor.add(metrics.metricInstance(BYTE_RATE, tags), new Rate());
+
+            recordAgeSensor = metrics.sensor("record-age");
+            recordAgeSensor.add(metrics.metricInstance(RECORD_AGE, tags), new Value());
+            recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MAX, tags), new Max());
+            recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MIN, tags), new Min());
+            recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_AVG, tags), new Avg());
+
+            replicationLatencySensor = metrics.sensor("replication-latency");
+            replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY, tags), new Value());
+            replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MAX, tags), new Max());
+            replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MIN, tags), new Min());
+            replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_AVG, tags), new Avg());
+        }
+
+        
+    }
+
+    private class GroupMetrics {
+        private final Sensor checkpointLatencySensor;
+
+        GroupMetrics(TopicPartition topicPartition, String group) {
+            Map<String, String> tags = new LinkedHashMap<>();
+            tags.put("source", source); 
+            tags.put("target", target); 
+            tags.put("group", group);
+            tags.put("topic", topicPartition.topic());
+            tags.put("partition", Integer.toString(topicPartition.partition()));
+ 
+            checkpointLatencySensor = metrics.sensor("checkpoint-latency");
+            checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY, tags), new Value());
+            checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY_MAX, tags), new Max());
+            checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY_MIN, tags), new Min());
+            checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY_AVG, tags), new Avg());
+        }
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
new file mode 100644
index 0000000..081bedc
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidPartitionsException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.CreateTopicsOptions;
+
+import java.util.Map;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Replicate data, configuration, and ACLs between clusters.
+ *
+ *  @see MirrorConnectorConfig for supported config properties.
+ */
+public class MirrorSourceConnector extends SourceConnector {
+
+    private static final Logger log = LoggerFactory.getLogger(MirrorSourceConnector.class);
+    private static final ResourcePatternFilter ANY_TOPIC = new ResourcePatternFilter(ResourceType.TOPIC,
+        null, PatternType.ANY);
+    private static final AclBindingFilter ANY_TOPIC_ACL = new AclBindingFilter(ANY_TOPIC, AccessControlEntryFilter.ANY);
+
+    private Scheduler scheduler;
+    private MirrorConnectorConfig config;
+    private SourceAndTarget sourceAndTarget;
+    private String connectorName;
+    private TopicFilter topicFilter;
+    private ConfigPropertyFilter configPropertyFilter;
+    private List<TopicPartition> knownTopicPartitions = Collections.emptyList();
+    private Set<String> knownTargetTopics = Collections.emptySet();
+    private ReplicationPolicy replicationPolicy;
+    private int replicationFactor;
+    private AdminClient sourceAdminClient;
+    private AdminClient targetAdminClient;
+
+    public MirrorSourceConnector() {
+        // nop
+    }
+
+    // visible for testing
+    MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy,
+            TopicFilter topicFilter, ConfigPropertyFilter configPropertyFilter) {
+        this.sourceAndTarget = sourceAndTarget;
+        this.replicationPolicy = replicationPolicy;
+        this.topicFilter = topicFilter;
+        this.configPropertyFilter = configPropertyFilter;
+    } 
+
+    @Override
+    public void start(Map<String, String> props) {
+        long start = System.currentTimeMillis();
+        config = new MirrorConnectorConfig(props);
+        if (!config.enabled()) {
+            return;
+        }
+        connectorName = config.connectorName();
+        sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
+        topicFilter = config.topicFilter();
+        configPropertyFilter = config.configPropertyFilter();
+        replicationPolicy = config.replicationPolicy();
+        replicationFactor = config.replicationFactor();
+        sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
+        targetAdminClient = AdminClient.create(config.targetAdminConfig());
+        scheduler = new Scheduler(MirrorSourceConnector.class, config.adminTimeout());
+        scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
+        scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions");
+        scheduler.execute(this::createTopicPartitions, "creating downstream topic-partitions");
+        scheduler.execute(this::refreshKnownTargetTopics, "refreshing known target topics");
+        scheduler.scheduleRepeating(this::syncTopicAcls, config.syncTopicAclsInterval(), "syncing topic ACLs");
+        scheduler.scheduleRepeating(this::syncTopicConfigs, config.syncTopicConfigsInterval(),
+            "syncing topic configs");
+        scheduler.scheduleRepeatingDelayed(this::refreshTopicPartitions, config.refreshTopicsInterval(),
+            "refreshing topics");
+        log.info("Started {} with {} topic-partitions.", connectorName, knownTopicPartitions.size());
+        log.info("Starting {} took {} ms.", connectorName, System.currentTimeMillis() - start);
+    }
+
+    @Override
+    public void stop() {
+        long start = System.currentTimeMillis();
+        if (!config.enabled()) {
+            return;
+        }
+        Utils.closeQuietly(scheduler, "scheduler");
+        Utils.closeQuietly(topicFilter, "topic filter");
+        Utils.closeQuietly(configPropertyFilter, "config property filter");
+        Utils.closeQuietly(sourceAdminClient, "source admin client");
+        Utils.closeQuietly(targetAdminClient, "target admin client");
+        log.info("Stopping {} took {} ms.", connectorName, System.currentTimeMillis() - start);
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MirrorSourceTask.class;
+    }
+
+    // divide topic-partitions among tasks
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        if (!config.enabled() || knownTopicPartitions.isEmpty()) {
+            return Collections.emptyList();
+        }
+        int numTasks = Math.min(maxTasks, knownTopicPartitions.size());
+        return ConnectorUtils.groupPartitions(knownTopicPartitions, numTasks).stream()
+            .map(config::taskConfigForTopicPartitions)
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public ConfigDef config() {
+        return MirrorConnectorConfig.CONNECTOR_CONFIG_DEF;
+    }
+
+    @Override
+    public String version() {
+        return "1";
+    }
+
+    private List<TopicPartition> findTopicPartitions()
+            throws InterruptedException, ExecutionException {
+        Set<String> topics = listTopics(sourceAdminClient).stream()
+            .filter(this::shouldReplicateTopic)
+            .collect(Collectors.toSet());
+        return describeTopics(topics).stream()
+            .flatMap(MirrorSourceConnector::expandTopicDescription)
+            .collect(Collectors.toList());
+    }
+
+    private void refreshTopicPartitions()
+            throws InterruptedException, ExecutionException {
+        List<TopicPartition> topicPartitions = findTopicPartitions();
+        Set<TopicPartition> newTopicPartitions = new HashSet<>();
+        newTopicPartitions.addAll(topicPartitions);
+        newTopicPartitions.removeAll(knownTopicPartitions);
+        Set<TopicPartition> deadTopicPartitions = new HashSet<>();
+        deadTopicPartitions.addAll(knownTopicPartitions);
+        deadTopicPartitions.removeAll(topicPartitions);
+        if (!newTopicPartitions.isEmpty() || !deadTopicPartitions.isEmpty()) {
+            log.info("Found {} topic-partitions on {}. {} are new. {} were removed. Previously had {}.",
+                    topicPartitions.size(), sourceAndTarget.source(), newTopicPartitions.size(), 
+                    deadTopicPartitions.size(), knownTopicPartitions.size());
+            log.trace("Found new topic-partitions: {}", newTopicPartitions);
+            knownTopicPartitions = topicPartitions;
+            knownTargetTopics = findExistingTargetTopics(); 
+            createTopicPartitions();
+            context.requestTaskReconfiguration();
+        }
+    }
+
+    private void loadTopicPartitions()
+            throws InterruptedException, ExecutionException {
+        knownTopicPartitions = findTopicPartitions();
+        knownTargetTopics = findExistingTargetTopics(); 
+    }
+
+    private void refreshKnownTargetTopics()
+            throws InterruptedException, ExecutionException {
+        knownTargetTopics = findExistingTargetTopics();
+    }
+
+    private Set<String> findExistingTargetTopics()
+            throws InterruptedException, ExecutionException {
+        return listTopics(targetAdminClient).stream()
+            .filter(x -> sourceAndTarget.source().equals(replicationPolicy.topicSource(x)))
+            .collect(Collectors.toSet());
+    }
+
+    private Set<String> topicsBeingReplicated() {
+        return knownTopicPartitions.stream()
+            .map(x -> x.topic())
+            .distinct()
+            .filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
+            .collect(Collectors.toSet());
+    }
+
+    private void syncTopicAcls()
+            throws InterruptedException, ExecutionException {
+        List<AclBinding> bindings = listTopicAclBindings().stream()
+            .filter(x -> x.pattern().resourceType() == ResourceType.TOPIC)
+            .filter(x -> x.pattern().patternType() == PatternType.LITERAL)
+            .filter(this::shouldReplicateAcl)
+            .filter(x -> shouldReplicateTopic(x.pattern().name()))
+            .map(this::targetAclBinding)
+            .collect(Collectors.toList());
+        updateTopicAcls(bindings);
+    }
+
+    private void syncTopicConfigs()
+            throws InterruptedException, ExecutionException {
+        Map<String, Config> sourceConfigs = describeTopicConfigs(topicsBeingReplicated());
+        Map<String, Config> targetConfigs = sourceConfigs.entrySet().stream()
+            .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue())));
+        updateTopicConfigs(targetConfigs);
+    }
+
+    private void createOffsetSyncsTopic() {
+        MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig());
+    }
+
+    private void createTopicPartitions()
+            throws InterruptedException, ExecutionException {
+        Map<String, Long> partitionCounts = knownTopicPartitions.stream()
+            .collect(Collectors.groupingBy(x -> x.topic(), Collectors.counting())).entrySet().stream()
+            .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> x.getValue()));
+        List<NewTopic> newTopics = partitionCounts.entrySet().stream()
+            .filter(x -> !knownTargetTopics.contains(x.getKey()))
+            .map(x -> new NewTopic(x.getKey(), x.getValue().intValue(), (short) replicationFactor))
+            .collect(Collectors.toList());
+        Map<String, NewPartitions> newPartitions = partitionCounts.entrySet().stream()
+            .filter(x -> knownTargetTopics.contains(x.getKey()))
+            .collect(Collectors.toMap(x -> x.getKey(), x -> NewPartitions.increaseTo(x.getValue().intValue())));
+        targetAdminClient.createTopics(newTopics, new CreateTopicsOptions()).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+            if (e != null) {
+                log.warn("Could not create topic {}.", k, e);
+            } else {
+                log.info("Created remote topic {} with {} partitions.", k, partitionCounts.get(k));
+            }
+        }));
+        targetAdminClient.createPartitions(newPartitions).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+            if (e instanceof InvalidPartitionsException) {
+                // swallow, this is normal
+            } else if (e != null) {
+                log.warn("Could not create topic-partitions for {}.", k, e);
+            } else {
+                log.info("Increased size of {} to {} partitions.", k, partitionCounts.get(k));
+            }
+        }));
+    }
+
+    private Set<String> listTopics(AdminClient adminClient)
+            throws InterruptedException, ExecutionException {
+        return adminClient.listTopics().names().get();
+    }
+
+    private Collection<AclBinding> listTopicAclBindings()
+            throws InterruptedException, ExecutionException {
+        return sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get();
+    }
+
+    private Collection<TopicDescription> describeTopics(Collection<String> topics)
+            throws InterruptedException, ExecutionException {
+        return sourceAdminClient.describeTopics(topics).all().get().values();
+    }
+
+    @SuppressWarnings("deprecation")
+    // use deprecated alterConfigs API for broker compatibility back to 0.11.0
+    private void updateTopicConfigs(Map<String, Config> topicConfigs)
+            throws InterruptedException, ExecutionException {
+        Map<ConfigResource, Config> configs = topicConfigs.entrySet().stream()
+            .collect(Collectors.toMap(x ->
+                new ConfigResource(ConfigResource.Type.TOPIC, x.getKey()), x -> x.getValue()));
+        log.trace("Syncing configs for {} topics.", configs.size());
+        targetAdminClient.alterConfigs(configs).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+            if (e != null) {
+                log.warn("Could not alter configuration of topic {}.", k.name(), e);
+            }
+        }));
+    }
+
+    private void updateTopicAcls(List<AclBinding> bindings)
+            throws InterruptedException, ExecutionException {
+        log.trace("Syncing {} topic ACL bindings.", bindings.size());
+        targetAdminClient.createAcls(bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+            if (e != null) {
+                log.warn("Could not sync ACL of topic {}.", k.pattern().name(), e);
+            }
+        }));
+    }
+
+    private static Stream<TopicPartition> expandTopicDescription(TopicDescription description) {
+        String topic = description.name();
+        return description.partitions().stream()
+            .map(x -> new TopicPartition(topic, x.partition()));
+    }
+
+    private Map<String, Config> describeTopicConfigs(Set<String> topics)
+            throws InterruptedException, ExecutionException {
+        Set<ConfigResource> resources = topics.stream()
+            .map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x))
+            .collect(Collectors.toSet());
+        return sourceAdminClient.describeConfigs(resources).all().get().entrySet().stream()
+            .collect(Collectors.toMap(x -> x.getKey().name(), x -> x.getValue()));
+    }
+
+    Config targetConfig(Config sourceConfig) {
+        List<ConfigEntry> entries = sourceConfig.entries().stream()
+            .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
+            .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
+            .filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
+            .collect(Collectors.toList());
+        return new Config(entries);
+    }
+
+    private static AccessControlEntry downgradeAllowAllACL(AccessControlEntry entry) {
+        return new AccessControlEntry(entry.principal(), entry.host(), AclOperation.READ, entry.permissionType());
+    }
+
+    AclBinding targetAclBinding(AclBinding sourceAclBinding) {
+        String targetTopic = formatRemoteTopic(sourceAclBinding.pattern().name());
+        final AccessControlEntry entry;
+        if (sourceAclBinding.entry().permissionType() == AclPermissionType.ALLOW
+                && sourceAclBinding.entry().operation() == AclOperation.ALL) {
+            entry = downgradeAllowAllACL(sourceAclBinding.entry());
+        } else {
+            entry = sourceAclBinding.entry();
+        }
+        return new AclBinding(new ResourcePattern(ResourceType.TOPIC, targetTopic, PatternType.LITERAL), entry);
+    }
+
+    boolean shouldReplicateTopic(String topic) {
+        return (topicFilter.shouldReplicateTopic(topic) || isHeartbeatTopic(topic))
+            && !replicationPolicy.isInternalTopic(topic) && !isCycle(topic);
+    }
+
+    boolean shouldReplicateAcl(AclBinding aclBinding) {
+        return !(aclBinding.entry().permissionType() == AclPermissionType.ALLOW
+            && aclBinding.entry().operation() == AclOperation.WRITE);
+    }
+
+    boolean shouldReplicateTopicConfigurationProperty(String property) {
+        return configPropertyFilter.shouldReplicateConfigProperty(property);
+    }
+
+    // Recurse upstream to detect cycles, i.e. whether this topic is already on the target cluster
+    boolean isCycle(String topic) {
+        String source = replicationPolicy.topicSource(topic);
+        if (source == null) {
+            return false;
+        } else if (source.equals(sourceAndTarget.target())) {
+            return true;
+        } else {
+            return isCycle(replicationPolicy.upstreamTopic(topic));
+        }
+    }
+
+    // e.g. heartbeats, us-west.heartbeats
+    boolean isHeartbeatTopic(String topic) {
+        return MirrorClientConfig.HEARTBEATS_TOPIC.equals(replicationPolicy.originalTopic(topic));
+    }
+
+    String formatRemoteTopic(String topic) {
+        return replicationPolicy.formatRemoteTopic(sourceAndTarget.source(), topic);
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
new file mode 100644
index 0000000..0b86476
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.stream.Collectors;
+import java.util.concurrent.Semaphore;
+import java.time.Duration;
+
+/** Replicates a set of topic-partitions. */
+public class MirrorSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class);
+
+    private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
+
+    private KafkaConsumer<byte[], byte[]> consumer;
+    private KafkaProducer<byte[], byte[]> offsetProducer;
+    private String sourceClusterAlias;
+    private String offsetSyncsTopic;
+    private Duration pollTimeout;
+    private long maxOffsetLag;
+    private Map<TopicPartition, PartitionState> partitionStates;
+    private ReplicationPolicy replicationPolicy;
+    private MirrorMetrics metrics;
+    private boolean stopping = false;
+    private Semaphore outstandingOffsetSyncs;
+    private Semaphore consumerAccess;
+
+    public MirrorSourceTask() {}
+
+    // for testing
+    MirrorSourceTask(String sourceClusterAlias, ReplicationPolicy replicationPolicy, long maxOffsetLag) {
+        this.sourceClusterAlias = sourceClusterAlias;
+        this.replicationPolicy = replicationPolicy;
+        this.maxOffsetLag = maxOffsetLag;
+    }
+
+    @Override
+    public void start(Map<String, String> props) {
+        MirrorTaskConfig config = new MirrorTaskConfig(props);
+        outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
+        consumerAccess = new Semaphore(1);  // let one thread at a time access the consumer
+        sourceClusterAlias = config.sourceClusterAlias();
+        metrics = config.metrics();
+        pollTimeout = config.consumerPollTimeout();
+        maxOffsetLag = config.maxOffsetLag();
+        replicationPolicy = config.replicationPolicy();
+        partitionStates = new HashMap<>();
+        offsetSyncsTopic = config.offsetSyncsTopic();
+        consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig());
+        offsetProducer = MirrorUtils.newProducer(config.sourceProducerConfig());
+        Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
+        Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions);
+        consumer.assign(topicPartitionOffsets.keySet());
+        log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream()
+            .filter(x -> x.getValue() == 0L).count());
+        log.trace("Seeking offsets: {}", topicPartitionOffsets);
+        topicPartitionOffsets.forEach(consumer::seek);
+        log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(),
+            taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions);
+    }
+
+    @Override
+    public void commit() {
+        // nop
+    }
+
+    @Override
+    public void stop() {
+        long start = System.currentTimeMillis();
+        stopping = true;
+        consumer.wakeup();
+        try {
+            consumerAccess.acquire();
+        } catch (InterruptedException e) {
+            log.warn("Interrupted waiting for access to consumer. Will try closing anyway."); 
+        }
+        Utils.closeQuietly(consumer, "source consumer");
+        Utils.closeQuietly(offsetProducer, "offset producer");
+        Utils.closeQuietly(metrics, "metrics");
+        log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), System.currentTimeMillis() - start);
+    }
+   
+    @Override
+    public String version() {
+        return "1";
+    }
+
+    @Override
+    public List<SourceRecord> poll() {
+        if (!consumerAccess.tryAcquire()) {
+            return null;
+        }
+        if (stopping) {
+            return null;
+        }
+        try {
+            ConsumerRecords<byte[], byte[]> records = consumer.poll(pollTimeout);
+            List<SourceRecord> sourceRecords = new ArrayList<>(records.count());
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                SourceRecord converted = convertRecord(record);
+                sourceRecords.add(converted);
+                TopicPartition topicPartition = new TopicPartition(converted.topic(), converted.kafkaPartition());
+                metrics.recordAge(topicPartition, System.currentTimeMillis() - record.timestamp());
+                metrics.recordBytes(topicPartition, byteSize(record.value()));
+            }
+            if (sourceRecords.isEmpty()) {
+                // WorkerSourceTasks expects non-zero batch size
+                return null;
+            } else {
+                log.trace("Polled {} records from {}.", sourceRecords.size(), records.partitions());
+                return sourceRecords;
+            }
+        } catch (WakeupException e) {
+            return null;
+        } catch (KafkaException e) {
+            log.warn("Failure during poll.", e);
+            return null;
+        } catch (Throwable e)  {
+            log.error("Failure during poll.", e);
+            // allow Connect to deal with the exception
+            throw e;
+        } finally {
+            consumerAccess.release();
+        }
+    }
+ 
+    @Override
+    public void commitRecord(SourceRecord record, RecordMetadata metadata) {
+        try {
+            if (stopping) {
+                return;
+            }
+            if (!metadata.hasOffset()) {
+                log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
+                return;
+            }
+            TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
+            long latency = System.currentTimeMillis() - record.timestamp();
+            metrics.countRecord(topicPartition);
+            metrics.replicationLatency(topicPartition, latency);
+            TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
+            long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
+            long downstreamOffset = metadata.offset();
+            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        } catch (Throwable e) {
+            log.warn("Failure committing record.", e);
+        }
+    }
+
+    // updates partition state and sends OffsetSync if necessary
+    private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset,
+            long downstreamOffset) {
+        PartitionState partitionState =
+            partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
+        if (partitionState.update(upstreamOffset, downstreamOffset)) {
+            sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+        }
+    }
+
+    // sends OffsetSync record upstream to internal offsets topic
+    private void sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
+            long downstreamOffset) {
+        if (!outstandingOffsetSyncs.tryAcquire()) {
+            // Too many outstanding offset syncs.
+            return;
+        }
+        OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
+                offsetSync.recordKey(), offsetSync.recordValue());
+        offsetProducer.send(record, (x, e) -> {
+            if (e != null) {
+                log.error("Failure sending offset sync.", e);
+            } else {
+                log.trace("Sync'd offsets for {}: {}=={}", topicPartition,
+                    upstreamOffset, downstreamOffset);
+            }
+            outstandingOffsetSyncs.release();
+        });
+    }
+ 
+    private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {
+        return topicPartitions.stream().collect(Collectors.toMap(x -> x, x -> loadOffset(x)));
+    }
+
+    private Long loadOffset(TopicPartition topicPartition) {
+        Map<String, Object> wrappedPartition = MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias);
+        Map<String, Object> wrappedOffset = context.offsetStorageReader().offset(wrappedPartition);
+        return MirrorUtils.unwrapOffset(wrappedOffset) + 1;
+    }
+
+    // visible for testing 
+    SourceRecord convertRecord(ConsumerRecord<byte[], byte[]> record) {
+        String targetTopic = formatRemoteTopic(record.topic());
+        Headers headers = convertHeaders(record);
+        return new SourceRecord(
+                MirrorUtils.wrapPartition(new TopicPartition(record.topic(), record.partition()), sourceClusterAlias),
+                MirrorUtils.wrapOffset(record.offset()),
+                targetTopic, record.partition(),
+                Schema.OPTIONAL_BYTES_SCHEMA, record.key(),
+                Schema.BYTES_SCHEMA, record.value(),
+                record.timestamp(), headers);
+    }
+
+    private Headers convertHeaders(ConsumerRecord<byte[], byte[]> record) {
+        ConnectHeaders headers = new ConnectHeaders();
+        for (Header header : record.headers()) {
+            headers.addBytes(header.key(), header.value());
+        }
+        return headers;
+    }
+
+    private String formatRemoteTopic(String topic) {
+        return replicationPolicy.formatRemoteTopic(sourceClusterAlias, topic);
+    }
+
+    private static int byteSize(byte[] bytes) {
+        if (bytes == null) {
+            return 0;
+        } else {
+            return bytes.length;
+        }
+    }
+
+    static class PartitionState {
+        long previousUpstreamOffset = -1L;
+        long previousDownstreamOffset = -1L;
+        long lastSyncUpstreamOffset = -1L;
+        long lastSyncDownstreamOffset = -1L;
+        long maxOffsetLag;
+
+        PartitionState(long maxOffsetLag) {
+            this.maxOffsetLag = maxOffsetLag;
+        }
+
+        // true if we should emit an offset sync
+        boolean update(long upstreamOffset, long downstreamOffset) {
+            boolean shouldSyncOffsets = false;
+            long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
+            long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep;
+            if (lastSyncDownstreamOffset == -1L
+                    || downstreamOffset - downstreamTargetOffset >= maxOffsetLag
+                    || upstreamOffset - previousUpstreamOffset != 1L
+                    || downstreamOffset < previousDownstreamOffset) {
+                lastSyncUpstreamOffset = upstreamOffset;
+                lastSyncDownstreamOffset = downstreamOffset;
+                shouldSyncOffsets = true;
+            }
+            previousUpstreamOffset = upstreamOffset;
+            previousDownstreamOffset = downstreamOffset;
+            return shouldSyncOffsets;
+        }
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java
new file mode 100644
index 0000000..839d41e
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorTaskConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+public class MirrorTaskConfig extends MirrorConnectorConfig {
+
+    private static final String TASK_TOPIC_PARTITIONS_DOC = "Topic-partitions assigned to this task to replicate.";
+    private static final String TASK_CONSUMER_GROUPS_DOC = "Consumer groups assigned to this task to replicate.";
+
+    public MirrorTaskConfig(Map<String, String> props) {
+        super(TASK_CONFIG_DEF, props);
+    }
+
+    Set<TopicPartition> taskTopicPartitions() {
+        List<String> fields = getList(TASK_TOPIC_PARTITIONS);
+        if (fields == null || fields.isEmpty()) {
+            return Collections.emptySet();
+        }
+        return fields.stream()
+            .map(MirrorUtils::decodeTopicPartition)
+            .collect(Collectors.toSet());
+    }
+
+    Set<String> taskConsumerGroups() {
+        List<String> fields = getList(TASK_CONSUMER_GROUPS);
+        if (fields == null || fields.isEmpty()) {
+            return Collections.emptySet();
+        }
+        return new HashSet<>(fields);
+    } 
+
+    MirrorMetrics metrics() {
+        MirrorMetrics metrics = new MirrorMetrics(this);
+        metricsReporters().forEach(metrics::addReporter);
+        return metrics;
+    }
+ 
+    protected static final ConfigDef TASK_CONFIG_DEF = CONNECTOR_CONFIG_DEF
+        .define(
+            TASK_TOPIC_PARTITIONS,
+            ConfigDef.Type.LIST,
+            null,
+            ConfigDef.Importance.LOW,
+            TASK_TOPIC_PARTITIONS_DOC)
+        .define(
+            TASK_CONSUMER_GROUPS,
+            ConfigDef.Type.LIST,
+            null,
+            ConfigDef.Importance.LOW,
+            TASK_CONSUMER_GROUPS_DOC);
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
new file mode 100644
index 0000000..f15dda8
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Collections;
+import java.util.regex.Pattern;
+
+/** Internal utility methods. */
+final class MirrorUtils {
+
+    // utility class
+    private MirrorUtils() {}
+
+    static KafkaProducer<byte[], byte[]> newProducer(Map<String, Object> props) {
+        return new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer());
+    }
+
+    static KafkaConsumer<byte[], byte[]> newConsumer(Map<String, Object> props) {
+        return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    }
+
+    static String encodeTopicPartition(TopicPartition topicPartition) {
+        return topicPartition.toString();
+    }
+
+    static Map<String, Object> wrapPartition(TopicPartition topicPartition, String sourceClusterAlias) {
+        Map<String, Object> wrapped = new HashMap<>();
+        wrapped.put("topic", topicPartition.topic());
+        wrapped.put("partition", topicPartition.partition());
+        wrapped.put("cluster", sourceClusterAlias);
+        return wrapped;
+    }
+
+    static Map<String, Object> wrapOffset(long offset) {
+        return Collections.singletonMap("offset", offset);
+    }
+
+    static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
+        String topic = (String) wrapped.get("topic");
+        int partition = (Integer) wrapped.get("partition");
+        return new TopicPartition(topic, partition);
+    }
+
+    static Long unwrapOffset(Map<String, ?> wrapped) {
+        if (wrapped == null || wrapped.get("offset") == null) {
+            return -1L;
+        }
+        return (Long) wrapped.get("offset");
+    }
+
+    static TopicPartition decodeTopicPartition(String topicPartitionString) {
+        int sep = topicPartitionString.lastIndexOf('-');
+        String topic = topicPartitionString.substring(0, sep);
+        String partitionString = topicPartitionString.substring(sep + 1);
+        int partition = Integer.parseInt(partitionString);
+        return new TopicPartition(topic, partition);
+    }
+
+    // returns null if given empty list
+    static Pattern compilePatternList(List<String> fields) {
+        if (fields.isEmpty()) {
+            // The empty pattern matches _everything_, but a blank
+            // config property should match _nothing_.
+            return null;
+        } else {
+            String joined = String.join("|", fields);
+            return Pattern.compile(joined);
+        }
+    }
+
+    static Pattern compilePatternList(String fields) {
+        return compilePatternList(Arrays.asList(fields.split("\\W*,\\W*")));
+    }
+
+    static void createCompactedTopic(String topicName, short partitions, short replicationFactor, Map<String, Object> adminProps) {
+        NewTopic topicDescription = TopicAdmin.defineTopic(topicName).
+                compacted().
+                partitions(partitions).
+                replicationFactor(replicationFactor).
+                build();
+
+        try (TopicAdmin admin = new TopicAdmin(adminProps)) {
+            admin.createTopics(topicDescription);
+        }
+    }
+
+    static void createSinglePartitionCompactedTopic(String topicName, short replicationFactor, Map<String, Object> adminProps) {
+        createCompactedTopic(topicName, (short) 1, replicationFactor, adminProps);
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
new file mode 100644
index 0000000..abdc64c
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.nio.ByteBuffer;
+
+public class OffsetSync {
+    public static final String TOPIC_KEY = "topic";
+    public static final String PARTITION_KEY = "partition";
+    public static final String UPSTREAM_OFFSET_KEY = "upstreamOffset";
+    public static final String DOWNSTREAM_OFFSET_KEY = "offset";
+
+    public static final Schema VALUE_SCHEMA = new Schema(
+            new Field(UPSTREAM_OFFSET_KEY, Type.INT64),
+            new Field(DOWNSTREAM_OFFSET_KEY, Type.INT64));
+
+    public static final Schema KEY_SCHEMA = new Schema(
+            new Field(TOPIC_KEY, Type.STRING),
+            new Field(PARTITION_KEY, Type.INT32));
+
+    private TopicPartition topicPartition;
+    private long upstreamOffset;
+    private long downstreamOffset;
+
+    public OffsetSync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
+        this.topicPartition = topicPartition;
+        this.upstreamOffset = upstreamOffset;
+        this.downstreamOffset = downstreamOffset;
+    }
+
+    public TopicPartition topicPartition() {
+        return topicPartition;
+    }
+
+    public long upstreamOffset() {
+        return upstreamOffset;
+    }
+
+    public long downstreamOffset() {
+        return downstreamOffset;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("OffsetSync{topicPartition=%s, upstreamOffset=%d, downstreamOffset=%d}",
+            topicPartition, upstreamOffset, downstreamOffset);
+    }
+
+    ByteBuffer serializeValue() {
+        Struct struct = valueStruct();
+        ByteBuffer buffer = ByteBuffer.allocate(VALUE_SCHEMA.sizeOf(struct));
+        VALUE_SCHEMA.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    ByteBuffer serializeKey() {
+        Struct struct = keyStruct();
+        ByteBuffer buffer = ByteBuffer.allocate(KEY_SCHEMA.sizeOf(struct));
+        KEY_SCHEMA.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    static OffsetSync deserializeRecord(ConsumerRecord<byte[], byte[]> record) {
+        Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(record.key()));
+        String topic = keyStruct.getString(TOPIC_KEY);
+        int partition = keyStruct.getInt(PARTITION_KEY);
+        
+        Struct valueStruct = VALUE_SCHEMA.read(ByteBuffer.wrap(record.value()));
+        long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
+        long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
+        
+        return new OffsetSync(new TopicPartition(topic, partition), upstreamOffset, downstreamOffset);
+    } 
+
+    private Struct valueStruct() {
+        Struct struct = new Struct(VALUE_SCHEMA);
+        struct.set(UPSTREAM_OFFSET_KEY, upstreamOffset);
+        struct.set(DOWNSTREAM_OFFSET_KEY, downstreamOffset);
+        return struct;
+    }
+
+    private Struct keyStruct() {
+        Struct struct = new Struct(KEY_SCHEMA);
+        struct.set(TOPIC_KEY, topicPartition.topic());
+        struct.set(PARTITION_KEY, topicPartition.partition());
+        return struct;
+    }
+
+    byte[] recordKey() {
+        return serializeKey().array();
+    }
+
+    byte[] recordValue() {
+        return serializeValue().array();
+    }
+};
+
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
new file mode 100644
index 0000000..fff1abd
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+import java.time.Duration;
+
+/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
+class OffsetSyncStore implements AutoCloseable {
+    private KafkaConsumer<byte[], byte[]> consumer;
+    private Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
+    private TopicPartition offsetSyncTopicPartition;
+
+    OffsetSyncStore(MirrorConnectorConfig config) {
+        consumer = new KafkaConsumer<>(config.sourceConsumerConfig(),
+            new ByteArrayDeserializer(), new ByteArrayDeserializer());
+        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
+        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+    }
+
+    // for testing
+    OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) {
+        this.consumer = consumer;
+        this.offsetSyncTopicPartition = offsetSyncTopicPartition;
+    }
+
+    long translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
+        OffsetSync offsetSync = latestOffsetSync(sourceTopicPartition);
+        if (offsetSync.upstreamOffset() > upstreamOffset) {
+            // Offset is too far in the past to translate accurately
+            return -1;
+        }
+        long upstreamStep = upstreamOffset - offsetSync.upstreamOffset();
+        return offsetSync.downstreamOffset() + upstreamStep;
+    }
+
+    // poll and handle records
+    synchronized void update(Duration pollTimeout) {
+        try {
+            consumer.poll(pollTimeout).forEach(this::handleRecord);
+        } catch (WakeupException e) {
+            // swallow
+        }
+    }
+
+    public synchronized void close() {
+        consumer.wakeup();
+        Utils.closeQuietly(consumer, "offset sync store consumer");
+    }
+
+    protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
+        OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
+        TopicPartition sourceTopicPartition = offsetSync.topicPartition();
+        offsetSyncs.put(sourceTopicPartition, offsetSync);
+    }
+
+    private OffsetSync latestOffsetSync(TopicPartition topicPartition) {
+        return offsetSyncs.computeIfAbsent(topicPartition, x -> new OffsetSync(topicPartition,
+            -1, -1));
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
new file mode 100644
index 0000000..cac9a80
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/Scheduler.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.time.Duration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class Scheduler implements AutoCloseable {
+    private static Logger log = LoggerFactory.getLogger(Scheduler.class);
+
+    private final String name;
+    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    private final Duration timeout;
+    private boolean closed = false;
+
+    Scheduler(String name, Duration timeout) {
+        this.name = name;
+        this.timeout = timeout;
+    }
+
+    Scheduler(Class clazz, Duration timeout) {
+        this("Scheduler for " + clazz.getSimpleName(), timeout);
+    }
+
+    void scheduleRepeating(Task task, Duration interval, String description) {
+        if (interval.toMillis() < 0L) {
+            return;
+        }
+        executor.scheduleAtFixedRate(() -> executeThread(task, description), 0, interval.toMillis(), TimeUnit.MILLISECONDS);
+    }
+ 
+    void scheduleRepeatingDelayed(Task task, Duration interval, String description) {
+        if (interval.toMillis() < 0L) {
+            return;
+        }
+        executor.scheduleAtFixedRate(() -> executeThread(task, description), interval.toMillis(),
+            interval.toMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    void execute(Task task, String description) {
+        try {
+            executor.submit(() -> executeThread(task, description)).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.warn("{} was interrupted running task: {}", name, description);
+        } catch (TimeoutException e) {
+            log.error("{} timed out running task: {}", name, description);
+        } catch (Throwable e) {
+            log.error("{} caught exception in task: {}", name, description, e);
+        }
+    } 
+
+    public void close() {
+        closed = true;
+        executor.shutdown();
+        try {
+            boolean terminated = executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
+            if (!terminated) {
+                log.error("{} timed out during shutdown of internal scheduler.", name);
+            }
+        } catch (InterruptedException e) {
+            log.warn("{} was interrupted during shutdown of internal scheduler.", name);
+        }
+    }
+
+    interface Task {
+        void run() throws InterruptedException, ExecutionException;
+    }
+
+    private void run(Task task, String description) {
+        try {
+            long start = System.currentTimeMillis();
+            task.run();
+            long elapsed = System.currentTimeMillis() - start;
+            log.info("{} took {} ms", description, elapsed);
+            if (elapsed > timeout.toMillis()) {
+                log.warn("{} took too long ({} ms) running task: {}", name, elapsed, description);
+            }
+        } catch (InterruptedException e) {
+            log.warn("{} was interrupted running task: {}", name, description);
+        } catch (Throwable e) {
+            log.error("{} caught exception in scheduled task: {}", name, description, e);
+        }
+    }
+
+    private void executeThread(Task task, String description) {
+        Thread.currentThread().setName(description);
+        if (closed) {
+            log.info("{} skipping task due to shutdown: {}", name, description);
+            return;
+        }
+        run(task, description);
+    }
+}
+
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/TopicFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/TopicFilter.java
new file mode 100644
index 0000000..f13453f
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/TopicFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+/** Defines which topics should be replicated. */
+@InterfaceStability.Evolving
+public interface TopicFilter extends Configurable, AutoCloseable {
+
+    boolean shouldReplicateTopic(String topic);
+
+    default void close() {
+        //nop
+    }
+
+    default void configure(Map<String, ?> props) {
+        //nop
+    }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java
new file mode 100644
index 0000000..1a3f210
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CheckpointTest {
+
+    @Test
+    public void testSerde() {
+        Checkpoint checkpoint = new Checkpoint("group-1", new TopicPartition("topic-2", 3), 4, 5, "metadata-6");
+        byte[] key = checkpoint.recordKey();
+        byte[] value = checkpoint.recordValue();
+        ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 7, 8, key, value);
+        Checkpoint deserialized = Checkpoint.deserializeRecord(record);
+        assertEquals(checkpoint.consumerGroupId(), deserialized.consumerGroupId());
+        assertEquals(checkpoint.topicPartition(), deserialized.topicPartition());
+        assertEquals(checkpoint.upstreamOffset(), deserialized.upstreamOffset());
+        assertEquals(checkpoint.downstreamOffset(), deserialized.downstreamOffset());
+    }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java
new file mode 100644
index 0000000..adc6578
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class HeartbeatTest {
+
+    @Test
+    public void testSerde() {
+        Heartbeat heartbeat = new Heartbeat("source-1", "target-2", 1234567890L);
+        byte[] key = heartbeat.recordKey();
+        byte[] value = heartbeat.recordValue();
+        ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 6, 7, key, value);
+        Heartbeat deserialized = Heartbeat.deserializeRecord(record);
+        assertEquals(heartbeat.sourceClusterAlias(), deserialized.sourceClusterAlias());
+        assertEquals(heartbeat.targetClusterAlias(), deserialized.targetClusterAlias());
+        assertEquals(heartbeat.timestamp(), deserialized.timestamp());
+    }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
new file mode 100644
index 0000000..a665744
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MirrorCheckpointTaskTest {
+
+    @Test
+    public void testDownstreamTopicRenaming() {
+        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+            new DefaultReplicationPolicy(), null);
+        assertEquals(new TopicPartition("source1.topic3", 4),
+            mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)));
+        assertEquals(new TopicPartition("topic3", 5),
+            mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)));
+        assertEquals(new TopicPartition("source1.source6.topic7", 8),
+            mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)));
+    }
+
+    @Test
+    public void testCheckpoint() {
+        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+            new DefaultReplicationPolicy(), offsetSyncStore);
+        offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
+        offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
+        Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
+            new OffsetAndMetadata(10, null));
+        SourceRecord sourceRecord1 = mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L);
+        assertEquals(new TopicPartition("source1.topic1", 2), checkpoint1.topicPartition());
+        assertEquals("group9", checkpoint1.consumerGroupId());
+        assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()));
+        assertEquals(10, checkpoint1.upstreamOffset());
+        assertEquals(11, checkpoint1.downstreamOffset());
+        assertEquals(123L, sourceRecord1.timestamp().longValue());
+        Checkpoint checkpoint2 = mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5", 6),
+            new OffsetAndMetadata(12, null));
+        SourceRecord sourceRecord2 = mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L);
+        assertEquals(new TopicPartition("topic5", 6), checkpoint2.topicPartition());
+        assertEquals("group11", checkpoint2.consumerGroupId());
+        assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()));
+        assertEquals(12, checkpoint2.upstreamOffset());
+        assertEquals(13, checkpoint2.downstreamOffset());
+        assertEquals(234L, sourceRecord2.timestamp().longValue());
+    }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
new file mode 100644
index 0000000..6003202
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+public class MirrorConnectorConfigTest {
+
+    private Map<String, String> makeProps(String... keyValues) {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "ConnectorName");
+        props.put("connector.class", "ConnectorClass");
+        props.put("source.cluster.alias", "source1");
+        props.put("target.cluster.alias", "target2");
+        for (int i = 0; i < keyValues.length; i += 2) {
+            props.put(keyValues[i], keyValues[i + 1]);
+        }
+        return props;
+    }
+
+    @Test
+    public void testTaskConfigTopicPartitions() {
+        List<TopicPartition> topicPartitions = Arrays.asList(new TopicPartition("topic-1", 2),
+            new TopicPartition("topic-3", 4), new TopicPartition("topic-5", 6));
+        MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps());
+        Map<String, String> props = config.taskConfigForTopicPartitions(topicPartitions);
+        MirrorTaskConfig taskConfig = new MirrorTaskConfig(props);
+        assertEquals(taskConfig.taskTopicPartitions(), new HashSet<>(topicPartitions));
+    }
+
+    @Test
+    public void testTaskConfigConsumerGroups() {
+        List<String> groups = Arrays.asList("consumer-1", "consumer-2", "consumer-3");
+        MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps());
+        Map<String, String> props = config.taskConfigForConsumerGroups(groups);
+        MirrorTaskConfig taskConfig = new MirrorTaskConfig(props);
+        assertEquals(taskConfig.taskConsumerGroups(), new HashSet<>(groups));
+    }
+
+    @Test
+    public void testTopicMatching() {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", "topic1"));
+        assertTrue(config.topicFilter().shouldReplicateTopic("topic1"));
+        assertFalse(config.topicFilter().shouldReplicateTopic("topic2"));
+    }
+
+    @Test
+    public void testGroupMatching() {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("groups", "group1"));
+        assertTrue(config.groupFilter().shouldReplicateGroup("group1"));
+        assertFalse(config.groupFilter().shouldReplicateGroup("group2"));
+    }
+
+    @Test
+    public void testConfigPropertyMatching() {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(
+            makeProps("config.properties.blacklist", "prop2"));
+        assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"));
+        assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"));
+    }
+
+    @Test
+    public void testNoTopics() {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", ""));
+        assertFalse(config.topicFilter().shouldReplicateTopic("topic1"));
+        assertFalse(config.topicFilter().shouldReplicateTopic("topic2"));
+        assertFalse(config.topicFilter().shouldReplicateTopic(""));
+    }
+
+    @Test
+    public void testAllTopics() {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", ".*"));
+        assertTrue(config.topicFilter().shouldReplicateTopic("topic1"));
+        assertTrue(config.topicFilter().shouldReplicateTopic("topic2"));
+    }
+
+    @Test
+    public void testListOfTopics() {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", "topic1, topic2"));
+        assertTrue(config.topicFilter().shouldReplicateTopic("topic1"));
+        assertTrue(config.topicFilter().shouldReplicateTopic("topic2"));
+        assertFalse(config.topicFilter().shouldReplicateTopic("topic3"));
+    }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
new file mode 100644
index 0000000..11abc14
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.time.Duration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+/**
+ * Tests MM2 replication and failover/failback logic.
+ *
+ * MM2 is configured with active/active replication between two Kafka clusters. Tests validate that
+ * records sent to either cluster arrive at the other cluster. Then, a consumer group is migrated from
+ * one cluster to the other and back. Tests validate that consumer offsets are translated and replicated
+ * between clusters during this failover and failback.
+ */
+@Category(IntegrationTest.class)
+public class MirrorConnectorsIntegrationTest {
+
+    private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationTest.class);
+
+    private static final int NUM_RECORDS_PRODUCED = 100;  // to save trees
+    private static final int NUM_PARTITIONS = 10;
+    private static final int RECORD_TRANSFER_DURATION_MS = 10_000;
+    private static final int CHECKPOINT_DURATION_MS = 20_000;
+
+    private MirrorMakerConfig mm2Config; 
+    private EmbeddedConnectCluster primary;
+    private EmbeddedConnectCluster backup;
+
+    @Before
+    public void setup() throws IOException {
+        Properties brokerProps = new Properties();
+        brokerProps.put("auto.create.topics.enable", "false");
+
+        Map<String, String> mm2Props = new HashMap<>();
+        mm2Props.put("clusters", "primary, backup");
+        mm2Props.put("max.tasks", "10");
+        mm2Props.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
+        mm2Props.put("groups", "consumer-group-.*");
+        mm2Props.put("primary->backup.enabled", "true");
+        mm2Props.put("backup->primary.enabled", "true");
+        mm2Props.put("sync.topic.acls.enabled", "false");
+        mm2Props.put("emit.checkpoints.interval.seconds", "1");
+        mm2Props.put("emit.heartbeats.interval.seconds", "1");
+        mm2Props.put("refresh.topics.interval.seconds", "1");
+        mm2Props.put("refresh.groups.interval.seconds", "1");
+        mm2Props.put("checkpoints.topic.replication.factor", "1");
+        mm2Props.put("heartbeats.topic.replication.factor", "1");
+        mm2Props.put("offset-syncs.topic.replication.factor", "1");
+        mm2Props.put("config.storage.topic.replication.factor", "1");
+        mm2Props.put("offset.stoage.topic.replication.factor", "1");
+        mm2Props.put("status.stoage.topic.replication.factor", "1");
+        mm2Props.put("replication.factor", "1");
+        
+        mm2Config = new MirrorMakerConfig(mm2Props); 
+        Map<String, String> primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget("backup", "primary"));
+        Map<String, String> backupWorkerProps = mm2Config.workerConfig(new SourceAndTarget("primary", "backup"));
+
+        primary = new EmbeddedConnectCluster.Builder()
+                .name("primary-connect-cluster")
+                .numWorkers(3)
+                .numBrokers(1)
+                .brokerProps(brokerProps)
+                .workerProps(primaryWorkerProps)
+                .build();
+
+        backup = new EmbeddedConnectCluster.Builder()
+                .name("backup-connect-cluster")
+                .numWorkers(3)
+                .numBrokers(1)
+                .brokerProps(brokerProps)
+                .workerProps(backupWorkerProps)
+                .build();
+
+        primary.start();
+        backup.start();
+
+        // create these topics before starting the connectors so we don't need to wait for discovery
+        primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
+        primary.kafka().createTopic("backup.test-topic-1", 1);
+        primary.kafka().createTopic("heartbeats", 1);
+        backup.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
+        backup.kafka().createTopic("primary.test-topic-1", 1);
+        backup.kafka().createTopic("heartbeats", 1);
+
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i);
+            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+        }
+
+        // create consumers before starting the connectors so we don't need to wait for discovery
+        Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-1", "backup.test-topic-1");
+        consumer1.poll(Duration.ofMillis(500));
+        consumer1.commitSync();
+        consumer1.close();
+
+        Consumer<byte[], byte[]> consumer2 = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", "consumer-group-1"), "test-topic-1", "primary.test-topic-1");
+        consumer2.poll(Duration.ofMillis(500));
+        consumer2.commitSync();
+        consumer2.close();
+
+        log.info("primary REST service: {}", primary.endpointForResource("connectors"));
+        log.info("backup REST service: {}", backup.endpointForResource("connectors"));
+ 
+        log.info("primary brokers: {}", primary.kafka().bootstrapServers());
+        log.info("backup brokers: {}", backup.kafka().bootstrapServers());
+        
+        // now that the brokers are running, we can finish setting up the Connectors
+        mm2Props.put("primary.bootstrap.servers", primary.kafka().bootstrapServers());
+        mm2Props.put("backup.bootstrap.servers", backup.kafka().bootstrapServers());
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        backup.configureConnector("MirrorSourceConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"),
+            MirrorSourceConnector.class));
+
+        backup.configureConnector("MirrorCheckpointConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"),
+            MirrorCheckpointConnector.class));
+
+        backup.configureConnector("MirrorHeartbeatConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("primary", "backup"),
+            MirrorHeartbeatConnector.class));
+
+        primary.configureConnector("MirrorSourceConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"),
+            MirrorSourceConnector.class));
+
+        primary.configureConnector("MirrorCheckpointConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"),
+            MirrorCheckpointConnector.class));
+
+        primary.configureConnector("MirrorHeartbeatConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"),
+            MirrorHeartbeatConnector.class));
+    }
+
+    @After
+    public void close() throws IOException {
+        for (String x : primary.connectors()) {
+            primary.deleteConnector(x);
+        }
+        for (String x : backup.connectors()) {
+            backup.deleteConnector(x);
+        }
+        deleteAllTopics(primary.kafka());
+        deleteAllTopics(backup.kafka());
+        primary.stop();
+        backup.stop();
+    }
+
+    @Test
+    public void testReplication() throws InterruptedException, TimeoutException {
+        MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig("primary"));
+        MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig("backup"));
+
+        assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PRODUCED,
+            primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count());
+        assertEquals("Records were not replicated to backup cluster.", NUM_RECORDS_PRODUCED,
+            backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1").count());
+        assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PRODUCED,
+            backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count());
+        assertEquals("Records were not replicated to primary cluster.", NUM_RECORDS_PRODUCED,
+            primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1").count());
+        assertEquals("Primary cluster doesn't have all records from both clusters.", NUM_RECORDS_PRODUCED * 2,
+            primary.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1", "test-topic-1").count());
+        assertEquals("Backup cluster doesn't have all records from both clusters.", NUM_RECORDS_PRODUCED * 2,
+            backup.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1", "test-topic-1").count());
+        assertTrue("Heartbeats were not emitted to primary cluster.", primary.kafka().consume(1,
+            RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
+        assertTrue("Heartbeats were not emitted to backup cluster.", backup.kafka().consume(1,
+            RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0);
+        assertTrue("Heartbeats were not replicated downstream to backup cluster.", backup.kafka().consume(1,
+            RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0);
+        assertTrue("Heartbeats were not replicated downstream to primary cluster.", primary.kafka().consume(1,
+            RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0);
+        assertTrue("Did not find upstream primary cluster.", backupClient.upstreamClusters().contains("primary"));
+        assertEquals("Did not calculate replication hops correctly.", 1, backupClient.replicationHops("primary"));
+        assertTrue("Did not find upstream backup cluster.", primaryClient.upstreamClusters().contains("backup"));
+        assertEquals("Did not calculate replication hops correctly.", 1, primaryClient.replicationHops("backup"));
+        assertTrue("Checkpoints were not emitted downstream to backup cluster.", backup.kafka().consume(1,
+            CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0);
+
+        Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets("consumer-group-1", "primary",
+            Duration.ofMillis(CHECKPOINT_DURATION_MS));
+
+        assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+            new TopicPartition("primary.test-topic-1", 0)));
+
+        // Failover consumer group to backup cluster.
+        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
+        consumer1.assign(backupOffsets.keySet());
+        backupOffsets.forEach(consumer1::seek);
+        consumer1.poll(Duration.ofMillis(500));
+        consumer1.commitSync();
+
+        assertTrue("Consumer failedover to zero offset.", consumer1.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedover beyond expected offset.", consumer1.position(
+            new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+        assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
+            CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
+
+        consumer1.close();
+
+        waitForCondition(() -> {
+            try {
+                return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+                    Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0));
+            } catch (Throwable e) {
+                return false;
+            }
+        }, CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster.");
+
+        waitForCondition(() -> {
+            try {
+                return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+                    Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
+            } catch (Throwable e) {
+                return false;
+            }
+        }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
+
+        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+                Duration.ofMillis(CHECKPOINT_DURATION_MS));
+ 
+        // Failback consumer group to primary cluster
+        Consumer<byte[], byte[]> consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
+        consumer2.assign(primaryOffsets.keySet());
+        primaryOffsets.forEach(consumer2::seek);
+        consumer2.poll(Duration.ofMillis(500));
+
+        assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position(
+            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+        assertTrue("Consumer failedback beyond expected downstream offset.", consumer2.position(
+            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+        
+        consumer2.close();
+      
+        // create more matching topics
+        primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
+        backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
+
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i);
+            backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
+        }
+
+        assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PRODUCED,
+            primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+        assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PRODUCED,
+            backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+ 
+        assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PRODUCED,
+            primary.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+        assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PRODUCED,
+            backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+    }
+
+    private void deleteAllTopics(EmbeddedKafkaCluster cluster) {
+        Admin client = cluster.createAdminClient();
+        try {
+            client.deleteTopics(client.listTopics().names().get());
+        } catch (Throwable e) {
+        }
+    }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
new file mode 100644
index 0000000..b618e37
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.metrics.FakeMetricsReporter;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class MirrorMakerConfigTest {
+
+    private Map<String, String> makeProps(String... keyValues) {
+        Map<String, String> props = new HashMap<>();
+        for (int i = 0; i < keyValues.length; i += 2) {
+            props.put(keyValues[i], keyValues[i + 1]);
+        }
+        return props;
+    }
+
+    @Test
+    public void testClusterConfigProperties() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+            "clusters", "a, b",
+            "a.bootstrap.servers", "servers-one",
+            "b.bootstrap.servers", "servers-two",
+            "security.protocol", "SASL",
+            "replication.factor", "4"));
+        Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(new SourceAndTarget("a", "b"),
+            MirrorSourceConnector.class);
+        assertEquals("source.cluster.bootstrap.servers is set",
+            "servers-one", connectorProps.get("source.cluster.bootstrap.servers"));
+        assertEquals("target.cluster.bootstrap.servers is set",
+            "servers-two", connectorProps.get("target.cluster.bootstrap.servers"));
+        assertEquals("top-level security.protocol is passed through to connector config",
+            "SASL", connectorProps.get("security.protocol"));
+    }
+
+    @Test
+    public void testReplicationConfigProperties() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+            "clusters", "a, b",
+            "a->b.tasks.max", "123"));
+        Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(new SourceAndTarget("a", "b"),
+            MirrorSourceConnector.class);
+        assertEquals("connector props should include tasks.max",
+            "123", connectorProps.get("tasks.max"));
+    }
+
+    @Test
+    public void testClientConfigProperties() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+            "clusters", "a, b",
+            "config.providers", "fake",
+            "config.providers.fake.class", FakeConfigProvider.class.getName(),
+            "replication.policy.separator", "__",
+            "ssl.truststore.password", "secret1",
+            "ssl.key.password", "${fake:secret:password}",  // resolves to "secret2"
+            "security.protocol", "SSL", 
+            "a.security.protocol", "PLAINTEXT", 
+            "a.producer.security.protocol", "SASL", 
+            "a.bootstrap.servers", "one:9092, two:9092",
+            "metrics.reporter", FakeMetricsReporter.class.getName(),
+            "a.metrics.reporter", FakeMetricsReporter.class.getName(),
+            "b->a.metrics.reporter", FakeMetricsReporter.class.getName(),
+            "a.xxx", "yyy",
+            "xxx", "zzz"));
+        MirrorClientConfig aClientConfig = mirrorConfig.clientConfig("a");
+        MirrorClientConfig bClientConfig = mirrorConfig.clientConfig("b");
+        assertEquals("replication.policy.separator is picked up in MirrorClientConfig",
+            "__", aClientConfig.getString("replication.policy.separator"));
+        assertEquals("replication.policy.separator is honored",
+            "b__topic1", aClientConfig.replicationPolicy().formatRemoteTopic("b", "topic1"));
+        assertEquals("client configs include boostrap.servers",
+            "one:9092, two:9092", aClientConfig.adminConfig().get("bootstrap.servers"));
+        assertEquals("client configs include security.protocol",
+            "PLAINTEXT", aClientConfig.adminConfig().get("security.protocol"));
+        assertEquals("producer configs include security.protocol",
+            "SASL", aClientConfig.producerConfig().get("security.protocol"));
+        assertFalse("unknown properties aren't included in client configs",
+            aClientConfig.adminConfig().containsKey("xxx"));
+        assertFalse("top-leve metrics reporters aren't included in client configs",
+            aClientConfig.adminConfig().containsKey("metric.reporters"));
+        assertEquals("security properties are picked up in MirrorClientConfig",
+            "secret1", aClientConfig.getPassword("ssl.truststore.password").value());
+        assertEquals("client configs include top-level security properties",
+            "secret1", ((Password) aClientConfig.adminConfig().get("ssl.truststore.password")).value());
+        assertEquals("security properties are translated from external sources",
+            "secret2", aClientConfig.getPassword("ssl.key.password").value());
+        assertEquals("client configs are translated from external sources",
+            "secret2", ((Password) aClientConfig.adminConfig().get("ssl.key.password")).value());
+        assertFalse("client configs should not include metrics reporter",
+            aClientConfig.producerConfig().containsKey("metrics.reporter"));
+        assertFalse("client configs should not include metrics reporter",
+            bClientConfig.adminConfig().containsKey("metrics.reporter"));
+    }
+
+    @Test
+    public void testIncludesConnectorConfigProperties() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+            "clusters", "a, b",
+            "tasks.max", "100",
+            "topics", "topic-1",
+            "groups", "group-2",
+            "replication.policy.separator", "__",
+            "config.properties.blacklist", "property-3",
+            "metric.reporters", "FakeMetricsReporter",
+            "topic.filter.class", DefaultTopicFilter.class.getName(),
+            "xxx", "yyy"));
+        SourceAndTarget sourceAndTarget = new SourceAndTarget("source", "target");
+        Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(sourceAndTarget,
+            MirrorSourceConnector.class);
+        MirrorConnectorConfig connectorConfig = new MirrorConnectorConfig(connectorProps);
+        assertEquals("Connector properties like tasks.max should be passed through to underlying Connectors.",
+            100, (int) connectorConfig.getInt("tasks.max"));
+        assertEquals("Topics whitelist should be passed through to underlying Connectors.",
+            Arrays.asList("topic-1"), connectorConfig.getList("topics"));
+        assertEquals("Groups whitelist should be passed through to underlying Connectors.",
+            Arrays.asList("group-2"), connectorConfig.getList("groups"));
+        assertEquals("Config properties blacklist should be passed through to underlying Connectors.",
+            Arrays.asList("property-3"), connectorConfig.getList("config.properties.blacklist"));
+        assertEquals("Metrics reporters should be passed through to underlying Connectors.",
+            Arrays.asList("FakeMetricsReporter"), connectorConfig.getList("metric.reporters"));
+        assertEquals("Filters should be passed through to underlying Connectors.",
+            "DefaultTopicFilter", connectorConfig.getClass("topic.filter.class").getSimpleName());
+        assertEquals("replication policy separator should be passed through to underlying Connectors.",
+            "__", connectorConfig.getString("replication.policy.separator"));
+        assertFalse("Unknown properties should not be passed through to Connectors.",
+            connectorConfig.originals().containsKey("xxx"));
+    }
+
+    @Test
+    public void testIncludesTopicFilterProperties() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+            "clusters", "a, b",
+            "source->target.topics", "topic1, topic2",
+            "source->target.topics.blacklist", "topic3"));
+        SourceAndTarget sourceAndTarget = new SourceAndTarget("source", "target");
+        Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(sourceAndTarget,
+            MirrorSourceConnector.class);
+        DefaultTopicFilter.TopicFilterConfig filterConfig = 
+            new DefaultTopicFilter.TopicFilterConfig(connectorProps);
+        assertEquals("source->target.topics should be passed through to TopicFilters.",
+            Arrays.asList("topic1", "topic2"), filterConfig.getList("topics"));
+        assertEquals("source->target.topics.blacklist should be passed through to TopicFilters.",
+            Arrays.asList("topic3"), filterConfig.getList("topics.blacklist"));
+    }
+
+    @Test
+    public void testWorkerConfigs() {
+        MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
+            "clusters", "a, b",
+            "config.providers", "fake",
+            "config.providers.fake.class", FakeConfigProvider.class.getName(),
+            "replication.policy.separator", "__",
+            "offset.storage.replication.factor", "123",
+            "b.status.storage.replication.factor", "456",
+            "b.producer.client.id", "client-one",
+            "b.security.protocol", "PLAINTEXT",
+            "b.producer.security.protocol", "SASL",
+            "ssl.truststore.password", "secret1",
+            "ssl.key.password", "${fake:secret:password}",  // resolves to "secret2"
+            "b.xxx", "yyy"));
+        SourceAndTarget a = new SourceAndTarget("b", "a");
+        SourceAndTarget b = new SourceAndTarget("a", "b");
+        Map<String, String> aProps = mirrorConfig.workerConfig(a);
+        assertEquals("123", aProps.get("offset.storage.replication.factor"));
+        Map<String, String> bProps = mirrorConfig.workerConfig(b);
+        assertEquals("456", bProps.get("status.storage.replication.factor"));
+        assertEquals("producer props should be passed through to worker producer config: " + bProps,
+            "client-one", bProps.get("producer.client.id"));
+        assertEquals("replication-level security props should be passed through to worker producer config",
+            "SASL", bProps.get("producer.security.protocol"));
+        assertEquals("replication-level security props should be passed through to worker producer config",
+            "SASL", bProps.get("producer.security.protocol"));
+        assertEquals("replication-level security props should be passed through to worker consumer config",
+            "PLAINTEXT", bProps.get("consumer.security.protocol"));
+        assertEquals("security properties should be passed through to worker config: " + bProps,
+            "secret1", bProps.get("ssl.truststore.password"));
+        assertEquals("security properties should be passed through to worker producer config: " + bProps,
+            "secret1", bProps.get("producer.ssl.truststore.password"));
+        assertEquals("security properties should be transformed in worker config",
+            "secret2", bProps.get("ssl.key.password"));
+        assertEquals("security properties should be transformed in worker producer config",
+            "secret2", bProps.get("producer.ssl.key.password"));
+    }
+
+    public static class FakeConfigProvider implements ConfigProvider {
+
+        Map<String, String> secrets = Collections.singletonMap("password", "secret2");
+
+        @Override
+        public void configure(Map<String, ?> props) {
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public ConfigData get(String path) {
+            return new ConfigData(secrets);
+        }
+
+        @Override
+        public ConfigData get(String path, Set<String> keys) {
+            return get(path);
+        }
+    }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
new file mode 100644
index 0000000..b1ccef8
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+
+public class MirrorSourceConnectorTest {
+
+    @Test
+    public void testReplicatesHeartbeatsByDefault() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"), 
+            new DefaultReplicationPolicy(), new DefaultTopicFilter(), new DefaultConfigPropertyFilter());
+        assertTrue("should replicate heartbeats", connector.shouldReplicateTopic("heartbeats"));
+        assertTrue("should replicate upstream heartbeats", connector.shouldReplicateTopic("us-west.heartbeats"));
+    }
+
+    @Test
+    public void testReplicatesHeartbeatsDespiteFilter() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+            new DefaultReplicationPolicy(), x -> false, new DefaultConfigPropertyFilter());
+        assertTrue("should replicate heartbeats", connector.shouldReplicateTopic("heartbeats"));
+        assertTrue("should replicate upstream heartbeats", connector.shouldReplicateTopic("us-west.heartbeats"));
+    }
+
+    @Test
+    public void testNoCycles() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+            new DefaultReplicationPolicy(), x -> true, x -> true);
+        assertFalse("should not allow cycles", connector.shouldReplicateTopic("target.topic1"));
+        assertFalse("should not allow cycles", connector.shouldReplicateTopic("target.source.topic1"));
+        assertFalse("should not allow cycles", connector.shouldReplicateTopic("source.target.topic1"));
+        assertTrue("should allow anything else", connector.shouldReplicateTopic("topic1"));
+        assertTrue("should allow anything else", connector.shouldReplicateTopic("source.topic1"));
+    }
+
+    @Test
+    public void testAclFiltering() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+            new DefaultReplicationPolicy(), x -> true, x -> true);
+        assertFalse("should not replicate ALLOW WRITE", connector.shouldReplicateAcl(
+            new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+            new AccessControlEntry("kafka", "", AclOperation.WRITE, AclPermissionType.ALLOW))));
+        assertTrue("should replicate ALLOW ALL", connector.shouldReplicateAcl(
+            new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+            new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW))));
+    }
+
+    @Test
+    public void testAclTransformation() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+            new DefaultReplicationPolicy(), x -> true, x -> true);
+        AclBinding allowAllAclBinding = new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+            new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.ALLOW));
+        AclBinding processedAllowAllAclBinding = connector.targetAclBinding(allowAllAclBinding);
+        String expectedRemoteTopicName = "source" + DefaultReplicationPolicy.SEPARATOR_DEFAULT
+            + allowAllAclBinding.pattern().name();
+        assertTrue("should change topic name",
+            processedAllowAllAclBinding.pattern().name().equals(expectedRemoteTopicName));
+        assertTrue("should change ALL to READ", processedAllowAllAclBinding.entry().operation() == AclOperation.READ);
+        assertTrue("should not change ALLOW",
+            processedAllowAllAclBinding.entry().permissionType() == AclPermissionType.ALLOW);
+
+        AclBinding denyAllAclBinding = new AclBinding(
+            new ResourcePattern(ResourceType.TOPIC, "test_topic", PatternType.LITERAL),
+            new AccessControlEntry("kafka", "", AclOperation.ALL, AclPermissionType.DENY));
+        AclBinding processedDenyAllAclBinding = connector.targetAclBinding(denyAllAclBinding);
+        assertTrue("should not change ALL", processedDenyAllAclBinding.entry().operation() == AclOperation.ALL);
+        assertTrue("should not change DENY",
+            processedDenyAllAclBinding.entry().permissionType() == AclPermissionType.DENY);
+    }
+    
+    @Test
+    public void testConfigPropertyFiltering() {
+        MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
+            new DefaultReplicationPolicy(), x -> true, new DefaultConfigPropertyFilter());
+        ArrayList<ConfigEntry> entries = new ArrayList<>();
+        entries.add(new ConfigEntry("name-1", "value-1"));
+        entries.add(new ConfigEntry("min.insync.replicas", "2"));
+        Config config = new Config(entries);
+        Config targetConfig = connector.targetConfig(config);
+        assertTrue("should replicate properties", targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("name-1")));
+        assertFalse("should not replicate blacklisted properties", targetConfig.entries().stream()
+            .anyMatch(x -> x.name().equals("min.insync.replicas")));
+    }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
new file mode 100644
index 0000000..0035117
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class MirrorSourceTaskTest {
+
+    @Test
+    public void testSerde() {
+        byte[] key = new byte[]{'a', 'b', 'c', 'd', 'e'};
+        byte[] value = new byte[]{'f', 'g', 'h', 'i', 'j', 'k'};
+        Headers headers = new RecordHeaders();
+        headers.add("header1", new byte[]{'l', 'm', 'n', 'o'});
+        headers.add("header2", new byte[]{'p', 'q', 'r', 's', 't'});
+        ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic1", 2, 3L, 4L,
+            TimestampType.CREATE_TIME, 0L, 5, 6, key, value, headers);
+        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask("cluster7",
+            new DefaultReplicationPolicy(), 50);
+        SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
+        assertEquals("cluster7.topic1", sourceRecord.topic());
+        assertEquals(2, sourceRecord.kafkaPartition().intValue());
+        assertEquals(new TopicPartition("topic1", 2), MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()));
+        assertEquals(3L, MirrorUtils.unwrapOffset(sourceRecord.sourceOffset()).longValue());
+        assertEquals(4L, sourceRecord.timestamp().longValue());
+        assertEquals(key, sourceRecord.key());
+        assertEquals(value, sourceRecord.value());
+        assertEquals(headers.lastHeader("header1").value(), sourceRecord.headers().lastWithName("header1").value());
+        assertEquals(headers.lastHeader("header2").value(), sourceRecord.headers().lastWithName("header2").value());
+    }
+
+    @Test
+    public void testOffsetSync() {
+        MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50);
+
+        assertTrue("always emit offset sync on first update",
+            partitionState.update(0, 100));
+        assertTrue("upstream offset skipped -> resync",
+            partitionState.update(2, 102));
+        assertFalse("no sync",
+            partitionState.update(3, 152));
+        assertFalse("no sync",
+            partitionState.update(4, 153));
+        assertFalse("no sync",
+            partitionState.update(5, 154));
+        assertTrue("one past target offset",
+            partitionState.update(6, 205));
+        assertTrue("upstream reset",
+            partitionState.update(2, 206));
+        assertFalse("no sync",
+            partitionState.update(3, 207));
+        assertTrue("downstream reset",
+                partitionState.update(4, 3));
+        assertFalse("no sync",
+            partitionState.update(5, 4));
+    }
+
+    @Test
+    public void testZeroOffsetSync() {
+        MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0);
+
+        // if max offset lag is zero, should always emit offset syncs
+        assertTrue(partitionState.update(0, 100));
+        assertTrue(partitionState.update(2, 102));
+        assertTrue(partitionState.update(3, 153));
+        assertTrue(partitionState.update(4, 154));
+        assertTrue(partitionState.update(5, 155));
+        assertTrue(partitionState.update(6, 207));
+        assertTrue(partitionState.update(2, 208));
+        assertTrue(partitionState.update(3, 209));
+        assertTrue(partitionState.update(4, 3));
+        assertTrue(partitionState.update(5, 4));
+    }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
new file mode 100644
index 0000000..19954cd
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class OffsetSyncStoreTest {
+
+    static TopicPartition tp = new TopicPartition("topic1", 2);
+
+    static class FakeOffsetSyncStore extends OffsetSyncStore {
+
+        FakeOffsetSyncStore() {
+            super(null, null);
+        }
+
+        void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
+            OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+            byte[] key = offsetSync.recordKey();
+            byte[] value = offsetSync.recordValue();
+            ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("test.offsets.internal", 0, 3, key, value);
+            handleRecord(record);
+        }
+    }
+
+    @Test
+    public void testOffsetTranslation() {
+        FakeOffsetSyncStore store = new FakeOffsetSyncStore();
+
+        store.sync(tp, 100, 200);
+        assertEquals(store.translateDownstream(tp, 150), 250);
+
+        // Translate exact offsets
+        store.sync(tp, 150, 251);
+        assertEquals(store.translateDownstream(tp, 150), 251);
+
+        // Use old offset (5) prior to any sync -> can't translate
+        assertEquals(-1, store.translateDownstream(tp, 5));
+
+        // Downstream offsets reset
+        store.sync(tp, 200, 10);
+        assertEquals(store.translateDownstream(tp, 200), 10);
+
+        // Upstream offsets reset
+        store.sync(tp, 20, 20);
+        assertEquals(store.translateDownstream(tp, 20), 20);
+    }
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java
new file mode 100644
index 0000000..5dc4729
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class OffsetSyncTest {
+
+    @Test
+    public void testSerde() {
+        OffsetSync offsetSync = new OffsetSync(new TopicPartition("topic-1", 2), 3, 4);
+        byte[] key = offsetSync.recordKey();
+        byte[] value = offsetSync.recordValue();
+        ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 6, 7, key, value);
+        OffsetSync deserialized = OffsetSync.deserializeRecord(record);
+        assertEquals(offsetSync.topicPartition(), deserialized.topicPartition());
+        assertEquals(offsetSync.upstreamOffset(), deserialized.upstreamOffset());
+        assertEquals(offsetSync.downstreamOffset(), deserialized.downstreamOffset());
+    }
+}
diff --git a/connect/mirror/src/test/resources/log4j.properties b/connect/mirror/src/test/resources/log4j.properties
new file mode 100644
index 0000000..a2ac021
--- /dev/null
+++ b/connect/mirror/src/test/resources/log4j.properties
@@ -0,0 +1,34 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+log4j.rootLogger=ERROR, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+#
+# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
+# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
+# specific connector. Simply add this parameter to the log layout configuration below to include the contextual information.
+#
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %X{connector.context}%m (%c:%L)%n
+#
+# The following line includes no MDC context parameters:
+#log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n (%t)
+
+log4j.logger.org.reflections=OFF
+log4j.logger.kafka=OFF
+log4j.logger.state.change.logger=OFF
+log4j.logger.org.apache.kafka.connect.mirror=INFO
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 41b56c8..c66429d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -309,7 +309,7 @@ class WorkerSourceTask extends WorkerTask {
             final ProducerRecord<byte[], byte[]> producerRecord = convertTransformedRecord(record);
             if (producerRecord == null || retryWithToleranceOperator.failed()) {
                 counter.skipRecord();
-                commitTaskRecord(preTransformRecord);
+                commitTaskRecord(preTransformRecord, null);
                 continue;
             }
 
@@ -347,7 +347,7 @@ class WorkerSourceTask extends WorkerTask {
                                             WorkerSourceTask.this,
                                             recordMetadata.topic(), recordMetadata.partition(),
                                             recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord);
+                                    commitTaskRecord(preTransformRecord, recordMetadata);
                                 }
                             }
                         });
@@ -381,9 +381,9 @@ class WorkerSourceTask extends WorkerTask {
         return result;
     }
 
-    private void commitTaskRecord(SourceRecord record) {
+    private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
         try {
-            task.commitRecord(record);
+            task.commitRecord(record, metadata);
         } catch (Throwable t) {
             log.error("{} Exception thrown while calling task.commitRecord()", this, t);
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index bce38d7a..eb618fb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -427,7 +427,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                         + "than required by current worker configuration. Distributing new key now.");
                     return true;
                 }
-            } else if (sessionKey == null) {
+            } else if (sessionKey == null && configState.sessionKey() != null) {
                 // This happens on startup for follower workers; the snapshot contains the session key,
                 // but no callback in the config update listener has been fired for it yet.
                 sessionKey = configState.sessionKey().key();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index b9d5470..36feac5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -131,6 +131,8 @@ public class PluginUtils {
             + "transforms\\.(?!Transformation$).*"
             + "|json\\..*"
             + "|file\\..*"
+            + "|mirror\\..*"
+            + "|mirror-client\\..*"
             + "|converters\\..*"
             + "|storage\\.StringConverter"
             + "|storage\\.SimpleHeaderConverter"
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index e455783..fbd763a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.integration;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.TestSourceConnector;
@@ -150,7 +151,7 @@ public class MonitorableSourceConnector extends TestSourceConnector {
         }
 
         @Override
-        public void commitRecord(SourceRecord record) {
+        public void commitRecord(SourceRecord record, RecordMetadata metadata) {
             log.trace("Committing record: {}", record);
             taskHandle.commit();
         }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 8751f1c..dff267a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -883,7 +883,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         if (sendSuccess) {
             // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit
-            expectTaskCommitRecord(anyTimes, commitSuccess);
+            expectTaskCommitRecordWithOffset(anyTimes, commitSuccess);
         }
 
         return sent;
@@ -932,8 +932,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
             });
     }
 
-    private void expectTaskCommitRecord(boolean anyTimes, boolean succeed) throws InterruptedException {
-        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
+    private void expectTaskCommitRecordWithOffset(boolean anyTimes, boolean succeed) throws InterruptedException {
+        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.anyObject(RecordMetadata.class));
         IExpectationSetters<Void> expect = EasyMock.expectLastCall();
         if (!succeed) {
             expect = expect.andThrow(new RuntimeException("Error committing record in source task"));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index bf441ff..c406ead 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -142,6 +142,12 @@ public class PluginUtilsTest {
         assertTrue(PluginUtils.shouldLoadInIsolation(
                 "org.apache.kafka.connect.file.FileStreamSinkConnector")
         );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.mirror.MirrorSourceTask")
+        );
+        assertTrue(PluginUtils.shouldLoadInIsolation(
+                "org.apache.kafka.connect.mirror.MirrorSourceConnector")
+        );
         assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.converters."));
         assertTrue(PluginUtils.shouldLoadInIsolation(
                 "org.apache.kafka.connect.converters.ByteArrayConverter")
diff --git a/settings.gradle b/settings.gradle
index a31ea13..813cf64 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -18,6 +18,8 @@ include 'clients',
     'connect:basic-auth-extension',
     'connect:file',
     'connect:json',
+    'connect:mirror',
+    'connect:mirror-client',
     'connect:runtime',
     'connect:transforms',
     'core',


Mime
View raw message