kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [kafka] 01/06: MINOR: Add the KIP-500 metadata shell
Date Thu, 11 Feb 2021 17:57:11 GMT
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch metashell
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit cd01042ac8c6cfb76e0e1719f056aff720971a13
Author: Colin P. Mccabe <cmccabe@confluent.io>
AuthorDate: Tue Feb 9 14:11:35 2021 -0800

    MINOR: Add the KIP-500 metadata shell
    
    The Kafka Metadata shell is a new command which allows users to
    interactively examine the metadata stored in a KIP-500 cluster.
    
    It can read the metadata from the controllers directly, by connecting to
    them, or from a metadata snapshot on disk.  In the former case, the
    quorum voters must be specified by passing the --controllers flag; in
    the latter case, the snapshot file should be specified via --snapshot.
    
    The metadata tool works by replaying the log and storing the state into
    in-memory nodes.  These nodes are presented in a fashion similar to
    filesystem directories.
    
    This PR currently includes the metalog/ directory since that is a
    dependency of metadata shell.  Eventually we want to migrate to using
    the Raft API directly, however.
---
 bin/kafka-shell.sh                                 |  17 +
 build.gradle                                       |  44 +++
 checkstyle/import-control.xml                      |  26 ++
 checkstyle/suppressions.xml                        |   4 +
 core/src/main/scala/kafka/server/Server.scala      |   1 +
 gradle/dependencies.gradle                         |   2 +
 .../org/apache/kafka/metalog/LocalLogManager.java  | 378 +++++++++++++++++++++
 .../org/apache/kafka/metalog/MetaLogLeader.java    |  58 ++++
 .../org/apache/kafka/metalog/MetaLogListener.java  |  55 +++
 .../org/apache/kafka/metalog/MetaLogManager.java   |  80 +++++
 .../resources/common/metadata/IsrChangeRecord.json |   4 +-
 .../resources/common/metadata/PartitionRecord.json |   4 +-
 .../apache/kafka/metadata/MetadataParserTest.java  |   2 +-
 .../apache/kafka/metalog/LocalLogManagerTest.java  | 154 +++++++++
 .../kafka/metalog/LocalLogManagerTestEnv.java      | 143 ++++++++
 .../kafka/metalog/MockMetaLogManagerListener.java  |  77 +++++
 settings.gradle                                    |   1 +
 .../org/apache/kafka/shell/CatCommandHandler.java  | 120 +++++++
 .../org/apache/kafka/shell/CdCommandHandler.java   | 117 +++++++
 .../java/org/apache/kafka/shell/CommandUtils.java  | 151 ++++++++
 .../main/java/org/apache/kafka/shell/Commands.java | 154 +++++++++
 .../kafka/shell/ErroneousCommandHandler.java       |  58 ++++
 .../org/apache/kafka/shell/ExitCommandHandler.java |  88 +++++
 .../org/apache/kafka/shell/FindCommandHandler.java | 121 +++++++
 .../java/org/apache/kafka/shell/GlobComponent.java | 179 ++++++++++
 .../java/org/apache/kafka/shell/GlobVisitor.java   | 148 ++++++++
 .../org/apache/kafka/shell/HelpCommandHandler.java |  88 +++++
 .../apache/kafka/shell/HistoryCommandHandler.java  | 108 ++++++
 .../org/apache/kafka/shell/InteractiveShell.java   | 174 ++++++++++
 .../org/apache/kafka/shell/LsCommandHandler.java   | 299 ++++++++++++++++
 .../org/apache/kafka/shell/ManCommandHandler.java  | 109 ++++++
 .../java/org/apache/kafka/shell/MetadataNode.java  | 140 ++++++++
 .../apache/kafka/shell/MetadataNodeManager.java    | 308 +++++++++++++++++
 .../java/org/apache/kafka/shell/MetadataShell.java | 274 +++++++++++++++
 .../org/apache/kafka/shell/NoOpCommandHandler.java |  43 +++
 .../apache/kafka/shell/NotDirectoryException.java  |  30 ++
 .../org/apache/kafka/shell/NotFileException.java   |  30 ++
 .../org/apache/kafka/shell/PwdCommandHandler.java  |  89 +++++
 .../org/apache/kafka/shell/SnapshotReader.java     | 183 ++++++++++
 .../java/org/apache/kafka/shell/CommandTest.java   |  70 ++++
 .../org/apache/kafka/shell/CommandUtilsTest.java   |  37 ++
 .../org/apache/kafka/shell/GlobComponentTest.java  |  75 ++++
 .../org/apache/kafka/shell/GlobVisitorTest.java    | 144 ++++++++
 .../apache/kafka/shell/LsCommandHandlerTest.java   |  99 ++++++
 .../org/apache/kafka/shell/MetadataNodeTest.java   |  73 ++++
 45 files changed, 4556 insertions(+), 3 deletions(-)

diff --git a/bin/kafka-shell.sh b/bin/kafka-shell.sh
new file mode 100755
index 0000000..289f0c1
--- /dev/null
+++ b/bin/kafka-shell.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@"
diff --git a/build.gradle b/build.gradle
index 790cd11..ca9b341 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1351,6 +1351,50 @@ project(':tools') {
   }
 }
 
+project(':shell') {
+  archivesBaseName = "kafka-shell"
+
+  dependencies {
+    compile libs.argparse4j
+    compile libs.jacksonDatabind
+    compile libs.jacksonJDK8Datatypes
+    compile libs.jline
+    compile libs.slf4jApi
+    compile project(':clients')
+    compile project(':core')
+    compile project(':log4j-appender')
+    compile project(':metadata')
+    compile project(':raft')
+
+    compile libs.jacksonJaxrsJsonProvider
+
+    testCompile project(':clients')
+    testCompile libs.junitJupiter
+    testCompile project(':clients').sourceSets.test.output
+
+    testRuntime libs.slf4jlog4j
+  }
+
+  javadoc {
+    enabled = false
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('jline-*jar')
+    }
+    from (configurations.runtime) {
+      include('jline-*jar')
+    }
+    into "$buildDir/dependant-libs-${versions.scala}"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn 'copyDependantLibs'
+  }
+}
+
 project(':streams') {
   archivesBaseName = "kafka-streams"
   ext.buildStreamsVersionFileName = "kafka-streams-version.properties"
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9cc432e..41fbbc1 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -201,6 +201,15 @@
     <allow pkg="org.apache.kafka.test" />
   </subpackage>
 
+  <subpackage name="metalog">
+    <allow pkg="org.apache.kafka.common.metadata" />
+    <allow pkg="org.apache.kafka.common.protocol" />
+    <allow pkg="org.apache.kafka.metadata" />
+    <allow pkg="org.apache.kafka.metalog" />
+    <allow pkg="org.apache.kafka.queue" />
+    <allow pkg="org.apache.kafka.test" />
+  </subpackage>
+
   <subpackage name="clients">
     <allow pkg="org.slf4j" />
     <allow pkg="org.apache.kafka.common" />
@@ -229,6 +238,23 @@
     <allow pkg="org.apache.kafka.test" />
   </subpackage>
 
+  <subpackage name="shell">
+    <allow pkg="com.fasterxml.jackson" />
+    <allow pkg="kafka.raft"/>
+    <allow pkg="kafka.server"/>
+    <allow pkg="kafka.tools"/>
+    <allow pkg="net.sourceforge.argparse4j" />
+    <allow pkg="org.apache.kafka.common"/>
+    <allow pkg="org.apache.kafka.metadata"/>
+    <allow pkg="org.apache.kafka.metalog"/>
+    <allow pkg="org.apache.kafka.queue"/>
+    <allow pkg="org.apache.kafka.raft"/>
+    <allow pkg="org.apache.kafka.shell"/>
+    <allow pkg="org.apache.log4j" />
+    <allow pkg="org.jline"/>
+    <allow pkg="scala.compat"/>
+  </subpackage>
+
   <subpackage name="tools">
     <allow pkg="org.apache.kafka.common"/>
     <allow pkg="org.apache.kafka.clients.admin" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 76690bb..1cfc630 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -253,6 +253,10 @@
     <suppress id="dontUseSystemExit"
               files="VerifiableProducer.java"/>
 
+    <!-- Shell -->
+    <suppress checks="CyclomaticComplexity"
+              files="(GlobComponent).java"/>
+
     <!-- Log4J-Appender -->
     <suppress checks="CyclomaticComplexity"
               files="KafkaLog4jAppender.java"/>
diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala
index 9126114..11cc528 100644
--- a/core/src/main/scala/kafka/server/Server.scala
+++ b/core/src/main/scala/kafka/server/Server.scala
@@ -29,6 +29,7 @@ trait Server {
 }
 
 object Server {
+  val metadataTopicName = "@metadata"
 
   val MetricsPrefix: String = "kafka.server"
   val ClusterIdLabel: String = "kafka.cluster.id"
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 954a495..ff8ac86 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -71,6 +71,7 @@ versions += [
   jacoco: "0.8.5",
   jetty: "9.4.33.v20201020",
   jersey: "2.31",
+  jline: "3.12.1",
   jmh: "1.27",
   hamcrest: "2.2",
   log4j: "1.2.17",
@@ -149,6 +150,7 @@ libs += [
   jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
   jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
   jerseyHk2: "org.glassfish.jersey.inject:jersey-hk2:$versions.jersey",
+  jline: "org.jline:jline:$versions.jline",
   jmhCore: "org.openjdk.jmh:jmh-core:$versions.jmh",
   jmhCoreBenchmarks: "org.openjdk.jmh:jmh-core-benchmarks:$versions.jmh",
   jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
new file mode 100644
index 0000000..ef85314
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of memory.
+ */
+public final class LocalLogManager implements MetaLogManager, AutoCloseable {
+    interface LocalBatch {
+        int size();
+    }
+
+    static class LeaderChangeBatch implements LocalBatch {
+        private final MetaLogLeader newLeader;
+
+        LeaderChangeBatch(MetaLogLeader newLeader) {
+            this.newLeader = newLeader;
+        }
+
+        @Override
+        public int size() {
+            return 1;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof LeaderChangeBatch)) return false;
+            LeaderChangeBatch other = (LeaderChangeBatch) o;
+            if (!other.newLeader.equals(newLeader)) return false;
+            return true;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(newLeader);
+        }
+
+        @Override
+        public String toString() {
+            return "LeaderChangeBatch(newLeader=" + newLeader + ")";
+        }
+    }
+
+    static class LocalRecordBatch implements LocalBatch {
+        private final List<ApiMessage> records;
+
+        LocalRecordBatch(List<ApiMessage> records) {
+            this.records = records;
+        }
+
+        @Override
+        public int size() {
+            return records.size();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof LocalRecordBatch)) return false;
+            LocalRecordBatch other = (LocalRecordBatch) o;
+            if (!other.records.equals(records)) return false;
+            return true;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(records);
+        }
+
+        @Override
+        public String toString() {
+            return "LocalRecordBatch(records=" + records + ")";
+        }
+    }
+
+    public static class SharedLogData {
+        private final Logger log = LoggerFactory.getLogger(SharedLogData.class);
+        private final HashMap<Integer, LocalLogManager> logManagers = new HashMap<>();
+        private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
+        private MetaLogLeader leader = new MetaLogLeader(-1, -1);
+        private long prevOffset = -1;
+
+        synchronized void registerLogManager(LocalLogManager logManager) {
+            if (logManagers.put(logManager.nodeId(), logManager) != null) {
+                throw new RuntimeException("Can't have multiple LocalLogManagers " +
+                    "with id " + logManager.nodeId());
+            }
+            electLeaderIfNeeded();
+        }
+
+        synchronized void unregisterLogManager(LocalLogManager logManager) {
+            if (!logManagers.remove(logManager.nodeId(), logManager)) {
+                throw new RuntimeException("Log manager " + logManager.nodeId() +
+                    " was not found.");
+            }
+        }
+
+        synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) {
+            if (epoch != leader.epoch()) {
+                log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " +
+                    "match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
+                return Long.MAX_VALUE;
+            }
+            if (nodeId != leader.nodeId()) {
+                log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " +
+                    "match the current leader id of {}.", nodeId, leader.nodeId());
+                return Long.MAX_VALUE;
+            }
+            log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
+            long offset = append(batch);
+            electLeaderIfNeeded();
+            return offset;
+        }
+
+        synchronized long append(LocalBatch batch) {
+            prevOffset += batch.size();
+            log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
+            batches.put(prevOffset, batch);
+            if (batch instanceof LeaderChangeBatch) {
+                LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) batch;
+                leader = leaderChangeBatch.newLeader;
+            }
+            for (LocalLogManager logManager : logManagers.values()) {
+                logManager.scheduleLogCheck();
+            }
+            return prevOffset;
+        }
+
+        synchronized void electLeaderIfNeeded() {
+            if (leader.nodeId() != -1 || logManagers.isEmpty()) {
+                return;
+            }
+            int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size());
+            Iterator<Integer> iter = logManagers.keySet().iterator();
+            Integer nextLeaderNode = null;
+            for (int i = 0; i <= nextLeaderIndex; i++) {
+                nextLeaderNode = iter.next();
+            }
+            MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1);
+            log.info("Elected new leader: {}.", newLeader);
+            append(new LeaderChangeBatch(newLeader));
+        }
+
+        synchronized Entry<Long, LocalBatch> nextBatch(long offset) {
+            Entry<Long, LocalBatch> entry = batches.higherEntry(offset);
+            if (entry == null) {
+                return null;
+            }
+            return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private static class MetaLogListenerData {
+        private long offset = -1;
+        private final MetaLogListener listener;
+
+        MetaLogListenerData(MetaLogListener listener) {
+            this.listener = listener;
+        }
+    }
+
+    private final Logger log;
+
+    private final int nodeId;
+
+    private final SharedLogData shared;
+
+    private final EventQueue eventQueue;
+
+    private boolean initialized = false;
+
+    private boolean shutdown = false;
+
+    private long maxReadOffset = Long.MAX_VALUE;
+
+    private final List<MetaLogListenerData> listeners = new ArrayList<>();
+
+    private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1);
+
+    public LocalLogManager(LogContext logContext,
+                           int nodeId,
+                           SharedLogData shared,
+                           String threadNamePrefix) {
+        this.log = logContext.logger(LocalLogManager.class);
+        this.nodeId = nodeId;
+        this.shared = shared;
+        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
+        shared.registerLogManager(this);
+    }
+
+    private void scheduleLogCheck() {
+        eventQueue.append(() -> {
+            try {
+                log.debug("Node {}: running log check.", nodeId);
+                int numEntriesFound = 0;
+                for (MetaLogListenerData listenerData : listeners) {
+                    while (true) {
+                        Entry<Long, LocalBatch> entry = shared.nextBatch(listenerData.offset);
+                        if (entry == null) {
+                            log.trace("Node {}: reached the end of the log after finding " +
+                                "{} entries.", nodeId, numEntriesFound);
+                            break;
+                        }
+                        long entryOffset = entry.getKey();
+                        if (entryOffset > maxReadOffset) {
+                            log.trace("Node {}: after {} entries, not reading the next " +
+                                "entry because its offset is {}, and maxReadOffset is {}.",
+                                nodeId, numEntriesFound, entryOffset, maxReadOffset);
+                            break;
+                        }
+                        if (entry.getValue() instanceof LeaderChangeBatch) {
+                            LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue();
+                            log.trace("Node {}: handling LeaderChange to {}.",
+                                nodeId, batch.newLeader);
+                            listenerData.listener.handleNewLeader(batch.newLeader);
+                            if (batch.newLeader.epoch() > leader.epoch()) {
+                                leader = batch.newLeader;
+                            }
+                        } else if (entry.getValue() instanceof LocalRecordBatch) {
+                            LocalRecordBatch batch = (LocalRecordBatch) entry.getValue();
+                            log.trace("Node {}: handling LocalRecordBatch with offset {}.",
+                                nodeId, entryOffset);
+                            listenerData.listener.handleCommits(entryOffset, batch.records);
+                        }
+                        numEntriesFound++;
+                        listenerData.offset = entryOffset;
+                    }
+                }
+                log.trace("Completed log check for node " + nodeId);
+            } catch (Exception e) {
+                log.error("Exception while handling log check", e);
+            }
+        });
+    }
+
+    public void beginShutdown() {
+        eventQueue.beginShutdown("beginShutdown", () -> {
+            try {
+                if (initialized && !shutdown) {
+                    log.debug("Node {}: beginning shutdown.", nodeId);
+                    renounce(leader.epoch());
+                    for (MetaLogListenerData listenerData : listeners) {
+                        listenerData.listener.beginShutdown();
+                    }
+                    shared.unregisterLogManager(this);
+                }
+            } catch (Exception e) {
+                log.error("Unexpected exception while sending beginShutdown callbacks", e);
+            }
+            shutdown = true;
+        });
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        log.debug("Node {}: closing.", nodeId);
+        beginShutdown();
+        eventQueue.close();
+    }
+
+    @Override
+    public void initialize() throws Exception {
+        eventQueue.append(() -> {
+            log.debug("initialized local log manager for node " + nodeId);
+            initialized = true;
+        });
+    }
+
+    @Override
+    public void register(MetaLogListener listener) throws Exception {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        eventQueue.append(() -> {
+            if (shutdown) {
+                log.info("Node {}: can't register because local log manager has " +
+                    "already been shut down.", nodeId);
+                future.complete(null);
+            } else if (initialized) {
+                log.info("Node {}: registered MetaLogListener.", nodeId);
+                listeners.add(new MetaLogListenerData(listener));
+                shared.electLeaderIfNeeded();
+                scheduleLogCheck();
+                future.complete(null);
+            } else {
+                log.info("Node {}: can't register because local log manager has not " +
+                    "been initialized.", nodeId);
+                future.completeExceptionally(new RuntimeException(
+                    "LocalLogManager was not initialized."));
+            }
+        });
+        future.get();
+    }
+
+    @Override
+    public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
+            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+    }
+
+    @Override
+    public void renounce(long epoch) {
+        MetaLogLeader curLeader = leader;
+        MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1);
+        shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
+    }
+
+    @Override
+    public MetaLogLeader leader() {
+        return leader;
+    }
+
+    @Override
+    public int nodeId() {
+        return nodeId;
+    }
+
+    public List<MetaLogListener> listeners() {
+        final CompletableFuture<List<MetaLogListener>> future = new CompletableFuture<>();
+        eventQueue.append(() -> {
+            future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList()));
+        });
+        try {
+            return future.get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void setMaxReadOffset(long maxReadOffset) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        eventQueue.append(() -> {
+            log.trace("Node {}: set maxReadOffset to {}.", nodeId, maxReadOffset);
+            this.maxReadOffset = maxReadOffset;
+            scheduleLogCheck();
+            future.complete(null);
+        });
+        try {
+            future.get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java
new file mode 100644
index 0000000..2bf4f7c
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogLeader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import java.util.Objects;
+
+/**
+ * The current leader of the MetaLog.
+ */
+public class MetaLogLeader {
+    private final int nodeId;
+    private final long epoch;
+
+    public MetaLogLeader(int nodeId, long epoch) {
+        this.nodeId = nodeId;
+        this.epoch = epoch;
+    }
+
+    public int nodeId() {
+        return nodeId;
+    }
+
+    public long epoch() {
+        return epoch;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof MetaLogLeader)) return false;
+        MetaLogLeader other = (MetaLogLeader) o;
+        return other.nodeId == nodeId && other.epoch == epoch;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(nodeId, epoch);
+    }
+
+    @Override
+    public String toString() {
+        return "MetaLogLeader(nodeId=" + nodeId + ", epoch=" + epoch + ")";
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java
new file mode 100644
index 0000000..9374420
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+
+import java.util.List;
+
+/**
+ * Listeners receive notifications from the MetaLogManager.
+ */
+public interface MetaLogListener {
+    /**
+     * Called when the MetaLogManager commits some messages.
+     *
+     * @param lastOffset    The last offset found in all the given messages.
+     * @param messages      The messages.
+     */
+    void handleCommits(long lastOffset, List<ApiMessage> messages);
+
+    /**
+     * Called when a new leader is elected.
+     *
+     * @param leader        The new leader id and epoch.
+     */
+    default void handleNewLeader(MetaLogLeader leader) {}
+
+    /**
+     * Called when the MetaLogManager has renounced the leadership.
+     *
+     * @param epoch         The controller epoch that has ended.
+     */
+    default void handleRenounce(long epoch) {}
+
+    /**
+     * Called when the MetaLogManager has finished shutting down, and wants to tell its
+     * listener that it is safe to shut down as well.
+     */
+    default void beginShutdown() {}
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
new file mode 100644
index 0000000..0d062a7
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/MetaLogManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+
+import java.util.List;
+
+
+/**
+ * The MetaLogManager handles storing metadata and electing leaders.
+ */
+public interface MetaLogManager {
+
+    /**
+     * Start this meta log manager.
+     * The manager must be ready to accept incoming calls after this function returns.
+     * It is an error to initialize a MetaLogManager more than once.
+     */
+    void initialize() throws Exception;
+
+    /**
+     * Register the listener.  The manager must be initialized already.
+     * The listener must be ready to accept incoming calls immediately.
+     *
+     * @param listener      The listener to register.
+     */
+    void register(MetaLogListener listener) throws Exception;
+
+    /**
+     * Schedule a write to the log.
+     *
+     * The write will be scheduled to happen at some time in the future.  There is no
+     * error return or exception thrown if the write fails.  Instead, the listener may
+     * regard the write as successful if and only if the MetaLogManager reaches the given
+     * index before renouncing its leadership.  The listener should determine this by
+     * monitoring the committed indexes.
+     *
+     * @param epoch         The controller epoch.
+     * @param batch         The batch of messages to write.
+     *
+     * @return              The index of the message.
+     */
+    long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch);
+
+    /**
+     * Renounce the leadership.
+     *
+     * @param epoch         The epoch.  If this does not match the current epoch, this
+     *                      call will be ignored.
+     */
+    void renounce(long epoch);
+
+    /**
+     * Returns the current leader.  The active node may change immediately after this
+     * function is called, of course.
+     */
+    MetaLogLeader leader();
+
+    /**
+     * Returns the node id.
+     */
+    int nodeId();
+
+}
diff --git a/metadata/src/main/resources/common/metadata/IsrChangeRecord.json b/metadata/src/main/resources/common/metadata/IsrChangeRecord.json
index e458390..fd8d834 100644
--- a/metadata/src/main/resources/common/metadata/IsrChangeRecord.json
+++ b/metadata/src/main/resources/common/metadata/IsrChangeRecord.json
@@ -28,6 +28,8 @@
     { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
-      "about": "An epoch that gets incremented each time we change the ISR." }
+      "about": "An epoch that gets incremented each time we change the partition leader." },
+    { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
+      "about": "An epoch that gets incremented each time we change anything in the partition." }
   ]
 }
diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json b/metadata/src/main/resources/common/metadata/PartitionRecord.json
index 79c24c2..5cc7d13 100644
--- a/metadata/src/main/resources/common/metadata/PartitionRecord.json
+++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json
@@ -34,6 +34,8 @@
     { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
-      "about": "An epoch that gets incremented each time we change the ISR." }
+      "about": "An epoch that gets incremented each time we change the partition leader." },
+    { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",
+      "about": "An epoch that gets incremented each time we change anything in the partition." }
   ]
 }
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java b/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
index 6d673b3..41e968c 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/MetadataParserTest.java
@@ -82,7 +82,7 @@ public class MetadataParserTest {
         PartitionRecord partitionRecord = new PartitionRecord().
             setReplicas(longReplicaList);
         ObjectSerializationCache cache = new ObjectSerializationCache();
-        assertEquals("Event size would be 33554478, but the maximum serialized event " +
+        assertEquals("Event size would be 33554482, but the maximum serialized event " +
             "size is 33554432", assertThrows(RuntimeException.class, () -> {
                 MetadataParser.size(partitionRecord, (short) 0, cache);
             }).getMessage());
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
new file mode 100644
index 0000000..3aa08dd
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.LAST_COMMITTED_OFFSET;
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.SHUTDOWN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@Timeout(value = 40)
+public class LocalLogManagerTest {
+    private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class);
+
+    /**
+     * Test creating a LocalLogManager and closing it.
+     */
+    @Test
+    public void testCreateAndClose() throws Exception {
+        try (LocalLogManagerTestEnv env =
+                 LocalLogManagerTestEnv.createWithMockListeners(1)) {
+            env.close();
+            assertEquals(null, env.firstError.get());
+        }
+    }
+
+    /**
+     * Test that the local log maanger will claim leadership.
+     */
+    @Test
+    public void testClaimsLeadership() throws Exception {
+        try (LocalLogManagerTestEnv env =
+                 LocalLogManagerTestEnv.createWithMockListeners(1)) {
+            assertEquals(new MetaLogLeader(0, 0), env.waitForLeader());
+            env.close();
+            assertEquals(null, env.firstError.get());
+        }
+    }
+
+    /**
+     * Test that we can pass leadership back and forth between log managers.
+     */
+    @Test
+    public void testPassLeadership() throws Exception {
+        try (LocalLogManagerTestEnv env =
+                 LocalLogManagerTestEnv.createWithMockListeners(3)) {
+            MetaLogLeader first = env.waitForLeader();
+            MetaLogLeader cur = first;
+            do {
+                env.logManagers().get(cur.nodeId()).renounce(cur.epoch());
+                MetaLogLeader next = env.waitForLeader();
+                while (next.epoch() == cur.epoch()) {
+                    Thread.sleep(1);
+                    next = env.waitForLeader();
+                }
+                long expectedNextEpoch = cur.epoch() + 2;
+                assertEquals(expectedNextEpoch, next.epoch(), "Expected next epoch to be " + expectedNextEpoch +
+                    ", but found  " + next);
+                cur = next;
+            } while (cur.nodeId() == first.nodeId());
+            env.close();
+            assertEquals(null, env.firstError.get());
+        }
+    }
+
+    private static void waitForLastCommittedOffset(long targetOffset,
+                LocalLogManager logManager) throws InterruptedException {
+        TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
+            MockMetaLogManagerListener listener =
+                (MockMetaLogManagerListener) logManager.listeners().get(0);
+            long highestOffset = -1;
+            for (String event : listener.serializedEvents()) {
+                if (event.startsWith(LAST_COMMITTED_OFFSET)) {
+                    long offset = Long.valueOf(
+                        event.substring(LAST_COMMITTED_OFFSET.length() + 1));
+                    if (offset < highestOffset) {
+                        throw new RuntimeException("Invalid offset: " + offset +
+                            " is less than the previous offset of " + highestOffset);
+                    }
+                    highestOffset = offset;
+                }
+            }
+            if (highestOffset < targetOffset) {
+                throw new RuntimeException("Offset for log manager " +
+                    logManager.nodeId() + " only reached " + highestOffset);
+            }
+        });
+    }
+
+    /**
+     * Test that all the log managers see all the commits.
+     */
+    @Test
+    public void testCommits() throws Exception {
+        try (LocalLogManagerTestEnv env =
+                 LocalLogManagerTestEnv.createWithMockListeners(3)) {
+            MetaLogLeader leaderInfo = env.waitForLeader();
+            LocalLogManager activeLogManager = env.logManagers().get(leaderInfo.nodeId());
+            long epoch = activeLogManager.leader().epoch();
+            List<ApiMessageAndVersion> messages = Arrays.asList(
+                new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0), (short) 0),
+                new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1), (short) 0),
+                new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(2), (short) 0));
+            assertEquals(3, activeLogManager.scheduleWrite(epoch, messages));
+            for (LocalLogManager logManager : env.logManagers()) {
+                waitForLastCommittedOffset(3, logManager);
+            }
+            List<MockMetaLogManagerListener> listeners = env.logManagers().stream().
+                map(m -> (MockMetaLogManagerListener) m.listeners().get(0)).
+                collect(Collectors.toList());
+            env.close();
+            for (MockMetaLogManagerListener listener : listeners) {
+                List<String> events = listener.serializedEvents();
+                assertEquals(SHUTDOWN, events.get(events.size() - 1));
+                int foundIndex = 0;
+                for (String event : events) {
+                    if (event.startsWith(COMMIT)) {
+                        assertEquals(messages.get(foundIndex).message().toString(),
+                            event.substring(COMMIT.length() + 1));
+                        foundIndex++;
+                    }
+                }
+                assertEquals(messages.size(), foundIndex);
+            }
+        }
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
new file mode 100644
index 0000000..52aeea0
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class LocalLogManagerTestEnv implements AutoCloseable {
+    private static final Logger log =
+        LoggerFactory.getLogger(LocalLogManagerTestEnv.class);
+
+    /**
+     * The first error we encountered during this test, or the empty string if we have
+     * not encountered any.
+     */
+    final AtomicReference<String> firstError = new AtomicReference<>(null);
+
+    /**
+     * The test directory, which we will delete once the test is over.
+     */
+    private final File dir;
+
+    /**
+     * The shared data for our LocalLogManager instances.
+     */
+    private final SharedLogData shared;
+
+    /**
+     * A list of log managers.
+     */
+    private final List<LocalLogManager> logManagers;
+
+    public static LocalLogManagerTestEnv createWithMockListeners(int numManagers) throws Exception {
+        LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers);
+        try {
+            for (LocalLogManager logManager : testEnv.logManagers) {
+                logManager.register(new MockMetaLogManagerListener());
+            }
+        } catch (Exception e) {
+            testEnv.close();
+            throw e;
+        }
+        return testEnv;
+    }
+
+    public LocalLogManagerTestEnv(int numManagers) throws Exception {
+        dir = TestUtils.tempDirectory();
+        shared = new SharedLogData();
+        List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
+        try {
+            for (int nodeId = 0; nodeId < numManagers; nodeId++) {
+                newLogManagers.add(new LocalLogManager(
+                    new LogContext(String.format("[LocalLogManager %d] ", nodeId)),
+                    nodeId,
+                    shared,
+                    String.format("LocalLogManager-%d_", nodeId)));
+            }
+            for (LocalLogManager logManager : newLogManagers) {
+                logManager.initialize();
+            }
+        } catch (Throwable t) {
+            for (LocalLogManager logManager : newLogManagers) {
+                logManager.close();
+            }
+            throw t;
+        }
+        this.logManagers = newLogManagers;
+    }
+
+    AtomicReference<String> firstError() {
+        return firstError;
+    }
+
+    File dir() {
+        return dir;
+    }
+
+    MetaLogLeader waitForLeader() throws InterruptedException {
+        AtomicReference<MetaLogLeader> value = new AtomicReference<>(null);
+        TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
+            MetaLogLeader result = null;
+            for (LocalLogManager logManager : logManagers) {
+                MetaLogLeader leader = logManager.leader();
+                if (leader.nodeId() == logManager.nodeId()) {
+                    if (result != null) {
+                        throw new RuntimeException("node " + leader.nodeId() +
+                            " thinks it's the leader, but so does " + result.nodeId());
+                    }
+                    result = leader;
+                }
+            }
+            if (result == null) {
+                throw new RuntimeException("No leader found.");
+            }
+            value.set(result);
+        });
+        return value.get();
+    }
+
+    public List<LocalLogManager> logManagers() {
+        return logManagers;
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        try {
+            for (LocalLogManager logManager : logManagers) {
+                logManager.beginShutdown();
+            }
+            for (LocalLogManager logManager : logManagers) {
+                logManager.close();
+            }
+            Utils.delete(dir);
+        } catch (IOException e) {
+            log.error("Error deleting {}", dir.getAbsolutePath(), e);
+        }
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
new file mode 100644
index 0000000..fe61ec0
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MockMetaLogManagerListener implements MetaLogListener {
+    public static final String COMMIT = "COMMIT";
+    public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET";
+    public static final String NEW_LEADER = "NEW_LEADER";
+    public static final String RENOUNCE = "RENOUNCE";
+    public static final String SHUTDOWN = "SHUTDOWN";
+
+    private final List<String> serializedEvents = new ArrayList<>();
+
+    @Override
+    public synchronized void handleCommits(long lastCommittedOffset, List<ApiMessage> messages) {
+        for (ApiMessage message : messages) {
+            StringBuilder bld = new StringBuilder();
+            bld.append(COMMIT).append(" ").append(message.toString());
+            serializedEvents.add(bld.toString());
+        }
+        StringBuilder bld = new StringBuilder();
+        bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
+        serializedEvents.add(bld.toString());
+    }
+
+    @Override
+    public void handleNewLeader(MetaLogLeader leader) {
+        StringBuilder bld = new StringBuilder();
+        bld.append(NEW_LEADER).append(" ").
+            append(leader.nodeId()).append(" ").append(leader.epoch());
+        synchronized (this) {
+            serializedEvents.add(bld.toString());
+        }
+    }
+
+    @Override
+    public void handleRenounce(long epoch) {
+        StringBuilder bld = new StringBuilder();
+        bld.append(RENOUNCE).append(" ").append(epoch);
+        synchronized (this) {
+            serializedEvents.add(bld.toString());
+        }
+    }
+
+    @Override
+    public void beginShutdown() {
+        StringBuilder bld = new StringBuilder();
+        bld.append(SHUTDOWN);
+        synchronized (this) {
+            serializedEvents.add(bld.toString());
+        }
+    }
+
+    public synchronized List<String> serializedEvents() {
+        return new ArrayList<>(serializedEvents);
+    }
+}
diff --git a/settings.gradle b/settings.gradle
index 55f77f3..fedfa9a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -29,6 +29,7 @@ include 'clients',
     'log4j-appender',
     'metadata',
     'raft',
+    'shell',
     'streams',
     'streams:examples',
     'streams:streams-scala',
diff --git a/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
new file mode 100644
index 0000000..3fc9427
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.jline.reader.Candidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Implements the cat command.
+ */
+public final class CatCommandHandler implements Commands.Handler {
+    private static final Logger log = LoggerFactory.getLogger(CatCommandHandler.class);
+
+    public final static Commands.Type TYPE = new CatCommandType();
+
+    public static class CatCommandType implements Commands.Type {
+        private CatCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "cat";
+        }
+
+        @Override
+        public String description() {
+            return "Show the contents of metadata nodes.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return false;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            parser.addArgument("targets").
+                nargs("+").
+                help("The metadata nodes to display.");
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new CatCommandHandler(namespace.getList("targets"));
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
+                candidates);
+        }
+    }
+
+    private final List<String> targets;
+
+    public CatCommandHandler(List<String> targets) {
+        this.targets = targets;
+    }
+
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) throws Exception {
+        log.trace("cat " + targets);
+        for (String target : targets) {
+            manager.visit(new GlobVisitor(target, entryOption -> {
+                if (entryOption.isPresent()) {
+                    MetadataNode node = entryOption.get().node();
+                    if (node instanceof DirectoryNode) {
+                        writer.println("cat: " + target + ": Is a directory");
+                    } else if (node instanceof FileNode) {
+                        FileNode fileNode = (FileNode) node;
+                        writer.println(fileNode.contents());
+                    }
+                } else {
+                    writer.println("cat: " + target + ": No such file or directory.");
+                }
+            }));
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return targets.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof CatCommandHandler)) return false;
+        CatCommandHandler o = (CatCommandHandler) other;
+        if (!Objects.equals(o.targets, targets)) return false;
+        return true;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java
new file mode 100644
index 0000000..8d270e5
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Implements the cd command.
+ */
+public final class CdCommandHandler implements Commands.Handler {
+    public final static Commands.Type TYPE = new CdCommandType();
+
+    public static class CdCommandType implements Commands.Type {
+        private CdCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "cd";
+        }
+
+        @Override
+        public String description() {
+            return "Set the current working directory.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return true;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            parser.addArgument("target").
+                nargs("?").
+                help("The directory to change to.");
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new CdCommandHandler(Optional.ofNullable(namespace.getString("target")));
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            if (nextWords.size() == 1) {
+                CommandUtils.completePath(nodeManager, nextWords.get(0), candidates);
+            }
+        }
+    }
+
+    private final Optional<String> target;
+
+    public CdCommandHandler(Optional<String> target) {
+        this.target = target;
+    }
+
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) throws Exception {
+        String effectiveTarget = target.orElse("/");
+        manager.visit(new Consumer<MetadataNodeManager.Data>() {
+            @Override
+            public void accept(MetadataNodeManager.Data data) {
+                new GlobVisitor(effectiveTarget, entryOption -> {
+                    if (entryOption.isPresent()) {
+                        if (!(entryOption.get().node() instanceof DirectoryNode)) {
+                            writer.println("cd: " + effectiveTarget + ": not a directory.");
+                        } else {
+                            data.setWorkingDirectory(entryOption.get().absolutePath());
+                        }
+                    } else {
+                        writer.println("cd: " + effectiveTarget + ": no such directory.");
+                    }
+                }).accept(data);
+            }
+        });
+    }
+
+    @Override
+    public int hashCode() {
+        return target.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof CdCommandHandler)) return false;
+        CdCommandHandler o = (CdCommandHandler) other;
+        if (!o.target.equals(target)) return false;
+        return true;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java b/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java
new file mode 100644
index 0000000..8e5bc21
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.jline.reader.Candidate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+/**
+ * Utility functions for command handlers.
+ */
+public final class CommandUtils {
+    /**
+     * Convert a list of paths into the effective list of paths which should be used.
+     * Empty strings will be removed.  If no paths are given, the current working
+     * directory will be used.
+     *
+     * @param paths     The input paths.  Non-null.
+     *
+     * @return          The output paths.
+     */
+    public static List<String> getEffectivePaths(List<String> paths) {
+        List<String> effectivePaths = new ArrayList<>();
+        for (String path : paths) {
+            if (!path.isEmpty()) {
+                effectivePaths.add(path);
+            }
+        }
+        if (effectivePaths.isEmpty()) {
+            effectivePaths.add(".");
+        }
+        return effectivePaths;
+    }
+
+    /**
+     * Generate a list of potential completions for a prefix of a command name.
+     *
+     * @param commandPrefix     The command prefix.  Non-null.
+     * @param candidates        The list to add the output completions to.
+     */
+    public static void completeCommand(String commandPrefix, List<Candidate> candidates) {
+        String command = Commands.TYPES.ceilingKey(commandPrefix);
+        while (true) {
+            if (command == null || !command.startsWith(commandPrefix)) {
+                return;
+            }
+            candidates.add(new Candidate(command));
+            command = Commands.TYPES.higherKey(command);
+        }
+    }
+
+    /**
+     * Convert a path to a list of path components.
+     * Multiple slashes in a row are treated the same as a single slash.
+     * Trailing slashes are ignored.
+     */
+    public static List<String> splitPath(String path) {
+        List<String> results = new ArrayList<>();
+        String[] components = path.split("/");
+        for (int i = 0; i < components.length; i++) {
+            if (!components[i].isEmpty()) {
+                results.add(components[i]);
+            }
+        }
+        return results;
+    }
+
+    public static List<String> stripDotPathComponents(List<String> input) {
+        List<String> output = new ArrayList<>();
+        for (String string : input) {
+            if (string.equals("..")) {
+                if (output.size() > 0) {
+                    output.remove(output.size() - 1);
+                }
+            } else if (!string.equals(".")) {
+                output.add(string);
+            }
+        }
+        return output;
+    }
+
+    /**
+     * Generate a list of potential completions for a path.
+     *
+     * @param nodeManager       The NodeManager.
+     * @param pathPrefix        The path prefix.  Non-null.
+     * @param candidates        The list to add the output completions to.
+     */
+    public static void completePath(MetadataNodeManager nodeManager,
+                                    String pathPrefix,
+                                    List<Candidate> candidates) throws Exception {
+        nodeManager.visit(data -> {
+            String absolutePath = pathPrefix.startsWith("/") ?
+                pathPrefix : data.workingDirectory() + "/" + pathPrefix;
+            List<String> pathComponents = stripDotPathComponents(splitPath(absolutePath));
+            DirectoryNode directory = data.root();
+            int numDirectories = pathPrefix.endsWith("/") ?
+                pathComponents.size() : pathComponents.size() - 1;
+            for (int i = 0; i < numDirectories; i++) {
+                MetadataNode node = directory.child(pathComponents.get(i));
+                if (node == null || !(node instanceof DirectoryNode)) {
+                    return;
+                }
+                directory = (DirectoryNode) node;
+            }
+            String lastComponent = "";
+            if (numDirectories >= 0 && numDirectories < pathComponents.size()) {
+                lastComponent = pathComponents.get(numDirectories);
+            }
+            Entry<String, MetadataNode> candidate =
+                directory.children().ceilingEntry(lastComponent);
+            String effectivePrefix;
+            int lastSlash = pathPrefix.lastIndexOf('/');
+            if (lastSlash < 0) {
+                effectivePrefix = "";
+            } else {
+                effectivePrefix = pathPrefix.substring(0, lastSlash + 1);
+            }
+            while (candidate != null && candidate.getKey().startsWith(lastComponent)) {
+                StringBuilder candidateBuilder = new StringBuilder();
+                candidateBuilder.append(effectivePrefix).append(candidate.getKey());
+                boolean complete = true;
+                if (candidate.getValue() instanceof DirectoryNode) {
+                    candidateBuilder.append("/");
+                    complete = false;
+                }
+                candidates.add(new Candidate(candidateBuilder.toString(),
+                    candidateBuilder.toString(), null, null, null, null, complete));
+                candidate = directory.children().higherEntry(candidate.getKey());
+            }
+        });
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/Commands.java b/shell/src/main/java/org/apache/kafka/shell/Commands.java
new file mode 100644
index 0000000..db16411
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/Commands.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import net.sourceforge.argparse4j.internal.HelpScreenException;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+
+/**
+ * The commands for the Kafka metadata tool.
+ */
+public final class Commands {
+    /**
+     * A map from command names to command types.
+     */
+    static final NavigableMap<String, Type> TYPES;
+
+    static {
+        TreeMap<String, Type> typesMap = new TreeMap<>();
+        for (Type type : Arrays.asList(
+                CatCommandHandler.TYPE,
+                CdCommandHandler.TYPE,
+                ExitCommandHandler.TYPE,
+                FindCommandHandler.TYPE,
+                HelpCommandHandler.TYPE,
+                HistoryCommandHandler.TYPE,
+                LsCommandHandler.TYPE,
+                ManCommandHandler.TYPE,
+                PwdCommandHandler.TYPE)) {
+            typesMap.put(type.name(), type);
+        }
+        TYPES = Collections.unmodifiableNavigableMap(typesMap);
+    }
+
+    /**
+     * Command handler objects are instantiated with specific arguments to
+     * execute commands.
+     */
+    public interface Handler {
+        void run(Optional<InteractiveShell> shell,
+                 PrintWriter writer,
+                 MetadataNodeManager manager) throws Exception;
+    }
+
+    /**
+     * An object which describes a type of command handler. This includes
+     * information like its name, help text, and whether it should be accessible
+     * from non-interactive mode.
+     */
+    public interface Type {
+        String name();
+        String description();
+        boolean shellOnly();
+        void addArguments(ArgumentParser parser);
+        Handler createHandler(Namespace namespace);
+        void completeNext(MetadataNodeManager nodeManager,
+                          List<String> nextWords,
+                          List<Candidate> candidates) throws Exception;
+    }
+
+    private final ArgumentParser parser;
+
+    /**
+     * Create the commands instance.
+     *
+     * @param addShellCommands  True if we should include the shell-only commands.
+     */
+    public Commands(boolean addShellCommands) {
+        this.parser = ArgumentParsers.newArgumentParser("", false);
+        Subparsers subparsers = this.parser.addSubparsers().dest("command");
+        for (Type type : TYPES.values()) {
+            if (addShellCommands || !type.shellOnly()) {
+                Subparser subParser = subparsers.addParser(type.name());
+                subParser.help(type.description());
+                type.addArguments(subParser);
+            }
+        }
+    }
+
+    ArgumentParser parser() {
+        return parser;
+    }
+
+    /**
+     * Handle the given command.
+     *
+     * In general this function should not throw exceptions. Instead, it should
+     * return ErroneousCommandHandler if the input was invalid.
+     *
+     * @param arguments     The command line arguments.
+     * @return              The command handler.
+     */
+    public Handler parseCommand(List<String> arguments) {
+        List<String> trimmedArguments = new ArrayList<>(arguments);
+        while (true) {
+            if (trimmedArguments.isEmpty()) {
+                return new NoOpCommandHandler();
+            }
+            String last = trimmedArguments.get(trimmedArguments.size() - 1);
+            if (!last.isEmpty()) {
+                break;
+            }
+            trimmedArguments.remove(trimmedArguments.size() - 1);
+        }
+        Namespace namespace;
+        try {
+            namespace = parser.parseArgs(trimmedArguments.toArray(new String[0]));
+        } catch (HelpScreenException e) {
+            return new NoOpCommandHandler();
+        } catch (ArgumentParserException e) {
+            return new ErroneousCommandHandler(e.getMessage());
+        }
+        String command = namespace.get("command");
+        if (!command.equals(trimmedArguments.get(0))) {
+            return new ErroneousCommandHandler("invalid choice: '" +
+                trimmedArguments.get(0) + "': did you mean '" + command + "'?");
+        }
+        Type type = TYPES.get(command);
+        if (type == null) {
+            return new ErroneousCommandHandler("Unknown command specified: " + command);
+        } else {
+            return type.createHandler(namespace);
+        }
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java
new file mode 100644
index 0000000..d52c55f
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import java.io.PrintWriter;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Handles erroneous commands.
+ */
+public final class ErroneousCommandHandler implements Commands.Handler {
+    private final String message;
+
+    public ErroneousCommandHandler(String message) {
+        this.message = message;
+    }
+
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) {
+        writer.println(message);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(message);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof ErroneousCommandHandler)) return false;
+        ErroneousCommandHandler o = (ErroneousCommandHandler) other;
+        if (!Objects.equals(o.message, message)) return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "ErroneousCommandHandler(" + message + ")";
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java
new file mode 100644
index 0000000..2b11b35
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.Exit;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Implements the exit command.
+ */
+public final class ExitCommandHandler implements Commands.Handler {
+    public final static Commands.Type TYPE = new ExitCommandType();
+
+    public static class ExitCommandType implements Commands.Type {
+        private ExitCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "exit";
+        }
+
+        @Override
+        public String description() {
+            return "Exit the metadata shell.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return true;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            // nothing to do
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new ExitCommandHandler();
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            // nothing to do
+        }
+    }
+
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) {
+        Exit.exit(0);
+    }
+
+    @Override
+    public int hashCode() {
+        return 0;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof ExitCommandHandler)) return false;
+        return true;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java
new file mode 100644
index 0000000..6d9ae44
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Implements the find command.
+ */
+public final class FindCommandHandler implements Commands.Handler {
+    public final static Commands.Type TYPE = new FindCommandType();
+
+    public static class FindCommandType implements Commands.Type {
+        private FindCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "find";
+        }
+
+        @Override
+        public String description() {
+            return "Search for nodes in the directory hierarchy.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return false;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            parser.addArgument("paths").
+                nargs("*").
+                help("The paths to start at.");
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new FindCommandHandler(namespace.getList("paths"));
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
+                candidates);
+        }
+    }
+
+    private final List<String> paths;
+
+    public FindCommandHandler(List<String> paths) {
+        this.paths = paths;
+    }
+
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) throws Exception {
+        for (String path : CommandUtils.getEffectivePaths(paths)) {
+            manager.visit(new GlobVisitor(path, entryOption -> {
+                if (entryOption.isPresent()) {
+                    find(writer, path, entryOption.get().node());
+                } else {
+                    writer.println("find: " + path + ": no such file or directory.");
+                }
+            }));
+        }
+    }
+
+    private void find(PrintWriter writer, String path, MetadataNode node) {
+        writer.println(path);
+        if (node instanceof DirectoryNode) {
+            DirectoryNode directory = (DirectoryNode) node;
+            for (Entry<String, MetadataNode> entry : directory.children().entrySet()) {
+                String nextPath = path.equals("/") ?
+                    path + entry.getKey() : path + "/" + entry.getKey();
+                find(writer, nextPath, entry.getValue());
+            }
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(paths);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof FindCommandHandler)) return false;
+        FindCommandHandler o = (FindCommandHandler) other;
+        if (!Objects.equals(o.paths, paths)) return false;
+        return true;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java b/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java
new file mode 100644
index 0000000..b93382b
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.regex.Pattern;
+
+/**
+ * Implements a per-path-component glob.
+ */
+public final class GlobComponent {
+    private static final Logger log = LoggerFactory.getLogger(GlobComponent.class);
+
+    /**
+     * Returns true if the character is a special character for regular expressions.
+     */
+    private static boolean isRegularExpressionSpecialCharacter(char ch) {
+        switch (ch) {
+            case '$':
+            case '(':
+            case ')':
+            case '+':
+            case '.':
+            case '[':
+            case ']':
+            case '^':
+            case '{':
+            case '|':
+                return true;
+            default:
+                break;
+        }
+        return false;
+    }
+
+    /**
+     * Returns true if the character is a special character for globs.
+     */
+    private static boolean isGlobSpecialCharacter(char ch) {
+        switch (ch) {
+            case '*':
+            case '?':
+            case '\\':
+            case '{':
+            case '}':
+                return true;
+            default:
+                break;
+        }
+        return false;
+    }
+
+    /**
+     * Converts a glob string to a regular expression string.
+     * Returns null if the glob should be handled as a literal (can only match one string).
+     * Throws an exception if the glob is malformed.
+     */
+    static String toRegularExpression(String glob) {
+        StringBuilder output = new StringBuilder("^");
+        boolean literal = true;
+        boolean processingGroup = false;
+
+        for (int i = 0; i < glob.length(); ) {
+            char c = glob.charAt(i++);
+            switch (c) {
+                case '?':
+                    literal = false;
+                    output.append(".");
+                    break;
+                case '*':
+                    literal = false;
+                    output.append(".*");
+                    break;
+                case '\\':
+                    if (i == glob.length()) {
+                        output.append(c);
+                    } else {
+                        char next = glob.charAt(i);
+                        i++;
+                        if (isGlobSpecialCharacter(next) ||
+                                isRegularExpressionSpecialCharacter(next)) {
+                            output.append('\\');
+                        }
+                        output.append(next);
+                    }
+                    break;
+                case '{':
+                    if (processingGroup) {
+                        throw new RuntimeException("Can't nest glob groups.");
+                    }
+                    literal = false;
+                    output.append("(?:(?:");
+                    processingGroup = true;
+                    break;
+                case ',':
+                    if (processingGroup) {
+                        literal = false;
+                        output.append(")|(?:");
+                    } else {
+                        output.append(c);
+                    }
+                    break;
+                case '}':
+                    if (processingGroup) {
+                        literal = false;
+                        output.append("))");
+                        processingGroup = false;
+                    } else {
+                        output.append(c);
+                    }
+                    break;
+                // TODO: handle character ranges
+                default:
+                    if (isRegularExpressionSpecialCharacter(c)) {
+                        output.append('\\');
+                    }
+                    output.append(c);
+            }
+        }
+        if (processingGroup) {
+            throw new RuntimeException("Unterminated glob group.");
+        }
+        if (literal) {
+            return null;
+        }
+        output.append('$');
+        return output.toString();
+    }
+
+    private final String component;
+    private final Pattern pattern;
+
+    public GlobComponent(String component) {
+        this.component = component;
+        Pattern newPattern = null;
+        try {
+            String regularExpression = toRegularExpression(component);
+            if (regularExpression != null) {
+                newPattern = Pattern.compile(regularExpression);
+            }
+        } catch (RuntimeException e) {
+            log.debug("Invalid glob pattern: " + e.getMessage());
+        }
+        this.pattern = newPattern;
+    }
+
+    public String component() {
+        return component;
+    }
+
+    public boolean literal() {
+        return pattern == null;
+    }
+
+    public boolean matches(String nodeName) {
+        if (pattern == null) {
+            return component.equals(nodeName);
+        } else {
+            return pattern.matcher(nodeName).matches();
+        }
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java b/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java
new file mode 100644
index 0000000..8081b7e
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+/**
+ * Visits metadata paths based on a glob string.
+ */
+public final class GlobVisitor implements Consumer<MetadataNodeManager.Data> {
+    private final String glob;
+    private final Consumer<Optional<MetadataNodeInfo>> handler;
+
+    public GlobVisitor(String glob,
+                       Consumer<Optional<MetadataNodeInfo>> handler) {
+        this.glob = glob;
+        this.handler = handler;
+    }
+
+    public static class MetadataNodeInfo {
+        private final String[] path;
+        private final MetadataNode node;
+
+        MetadataNodeInfo(String[] path, MetadataNode node) {
+            this.path = path;
+            this.node = node;
+        }
+
+        public String[] path() {
+            return path;
+        }
+
+        public MetadataNode node() {
+            return node;
+        }
+
+        public String lastPathComponent() {
+            if (path.length == 0) {
+                return "/";
+            } else {
+                return path[path.length - 1];
+            }
+        }
+
+        public String absolutePath() {
+            return "/" + String.join("/", path);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(path, node);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof MetadataNodeInfo)) return false;
+            MetadataNodeInfo other = (MetadataNodeInfo) o;
+            if (!Arrays.equals(path, other.path)) return false;
+            if (!node.equals(other.node)) return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder("MetadataNodeInfo(path=");
+            for (int i = 0; i < path.length; i++) {
+                bld.append("/");
+                bld.append(path[i]);
+            }
+            bld.append(", node=").append(node).append(")");
+            return bld.toString();
+        }
+    }
+
+    @Override
+    public void accept(MetadataNodeManager.Data data) {
+        String fullGlob = glob.startsWith("/") ? glob :
+            data.workingDirectory() + "/" + glob;
+        List<String> globComponents =
+            CommandUtils.stripDotPathComponents(CommandUtils.splitPath(fullGlob));
+        if (!accept(globComponents, 0, data.root(), new String[0])) {
+            handler.accept(Optional.empty());
+        }
+    }
+
+    private boolean accept(List<String> globComponents,
+                           int componentIndex,
+                           MetadataNode node,
+                           String[] path) {
+        if (componentIndex >= globComponents.size()) {
+            handler.accept(Optional.of(new MetadataNodeInfo(path, node)));
+            return true;
+        }
+        String globComponentString = globComponents.get(componentIndex);
+        GlobComponent globComponent = new GlobComponent(globComponentString);
+        if (globComponent.literal()) {
+            if (!(node instanceof MetadataNode.DirectoryNode)) {
+                return false;
+            }
+            MetadataNode.DirectoryNode directory = (MetadataNode.DirectoryNode) node;
+            MetadataNode child = directory.child(globComponent.component());
+            if (child == null) {
+                return false;
+            }
+            String[] newPath = new String[path.length + 1];
+            System.arraycopy(path, 0, newPath, 0, path.length);
+            newPath[path.length] = globComponent.component();
+            return accept(globComponents, componentIndex + 1, child, newPath);
+        }
+        if (!(node instanceof MetadataNode.DirectoryNode)) {
+            return false;
+        }
+        MetadataNode.DirectoryNode directory = (MetadataNode.DirectoryNode) node;
+        boolean matchedAny = false;
+        for (Entry<String, MetadataNode> entry : directory.children().entrySet()) {
+            String nodeName = entry.getKey();
+            if (globComponent.matches(nodeName)) {
+                String[] newPath = new String[path.length + 1];
+                System.arraycopy(path, 0, newPath, 0, path.length);
+                newPath[path.length] = nodeName;
+                if (accept(globComponents, componentIndex + 1, entry.getValue(), newPath)) {
+                    matchedAny = true;
+                }
+            }
+        }
+        return matchedAny;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java
new file mode 100644
index 0000000..829274e
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Implements the help command.
+ */
+public final class HelpCommandHandler implements Commands.Handler {
+    public final static Commands.Type TYPE = new HelpCommandType();
+
+    public static class HelpCommandType implements Commands.Type {
+        private HelpCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "help";
+        }
+
+        @Override
+        public String description() {
+            return "Display this help message.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return true;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            // nothing to do
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new HelpCommandHandler();
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            // nothing to do
+        }
+    }
+
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) {
+        writer.printf("Welcome to the Apache Kafka metadata shell.%n%n");
+        new Commands(true).parser().printHelp(writer);
+    }
+
+    @Override
+    public int hashCode() {
+        return 0;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof HelpCommandHandler)) return false;
+        return true;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java
new file mode 100644
index 0000000..edf9def
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Implements the history command.
+ */
+public final class HistoryCommandHandler implements Commands.Handler {
+    public final static Commands.Type TYPE = new HistoryCommandType();
+
+    public static class HistoryCommandType implements Commands.Type {
+        private HistoryCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "history";
+        }
+
+        @Override
+        public String description() {
+            return "Print command history.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return true;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            parser.addArgument("numEntriesToShow").
+                nargs("?").
+                type(Integer.class).
+                help("The number of entries to show.");
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            Integer numEntriesToShow = namespace.getInt("numEntriesToShow");
+            return new HistoryCommandHandler(numEntriesToShow == null ?
+                Integer.MAX_VALUE : numEntriesToShow);
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            // nothing to do
+        }
+    }
+
+    private final int numEntriesToShow;
+
+    public HistoryCommandHandler(int numEntriesToShow) {
+        this.numEntriesToShow = numEntriesToShow;
+    }
+
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) throws Exception {
+        if (!shell.isPresent()) {
+            throw new RuntimeException("The history command requires a shell.");
+        }
+        Iterator<Map.Entry<Integer, String>> iter = shell.get().history(numEntriesToShow);
+        while (iter.hasNext()) {
+            Map.Entry<Integer, String> entry = iter.next();
+            writer.printf("% 5d  %s%n", entry.getKey(), entry.getValue());
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return numEntriesToShow;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof HistoryCommandHandler)) return false;
+        HistoryCommandHandler o = (HistoryCommandHandler) other;
+        return o.numEntriesToShow == numEntriesToShow;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java b/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java
new file mode 100644
index 0000000..36f00a4
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.jline.reader.Candidate;
+import org.jline.reader.Completer;
+import org.jline.reader.EndOfFileException;
+import org.jline.reader.History;
+import org.jline.reader.LineReader;
+import org.jline.reader.LineReaderBuilder;
+import org.jline.reader.ParsedLine;
+import org.jline.reader.Parser;
+import org.jline.reader.UserInterruptException;
+import org.jline.reader.impl.DefaultParser;
+import org.jline.reader.impl.history.DefaultHistory;
+import org.jline.terminal.Terminal;
+import org.jline.terminal.TerminalBuilder;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+/**
+ * The Kafka metadata shell.
+ */
+public final class InteractiveShell implements AutoCloseable {
+    static class MetadataShellCompleter implements Completer {
+        private final MetadataNodeManager nodeManager;
+
+        MetadataShellCompleter(MetadataNodeManager nodeManager) {
+            this.nodeManager = nodeManager;
+        }
+
+        @Override
+        public void complete(LineReader reader, ParsedLine line, List<Candidate> candidates) {
+            if (line.words().size() == 0) {
+                CommandUtils.completeCommand("", candidates);
+            } else if (line.words().size() == 1) {
+                CommandUtils.completeCommand(line.words().get(0), candidates);
+            } else {
+                Iterator<String> iter = line.words().iterator();
+                String command = iter.next();
+                List<String> nextWords = new ArrayList<>();
+                while (iter.hasNext()) {
+                    nextWords.add(iter.next());
+                }
+                Commands.Type type = Commands.TYPES.get(command);
+                if (type == null) {
+                    return;
+                }
+                try {
+                    type.completeNext(nodeManager, nextWords, candidates);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    private final MetadataNodeManager nodeManager;
+    private final Terminal terminal;
+    private final Parser parser;
+    private final History history;
+    private final MetadataShellCompleter completer;
+    private final LineReader reader;
+
+    public InteractiveShell(MetadataNodeManager nodeManager) throws IOException {
+        this.nodeManager = nodeManager;
+        TerminalBuilder builder = TerminalBuilder.builder().
+            system(true).
+            nativeSignals(true);
+        this.terminal = builder.build();
+        this.parser = new DefaultParser();
+        this.history = new DefaultHistory();
+        this.completer = new MetadataShellCompleter(nodeManager);
+        this.reader = LineReaderBuilder.builder().
+            terminal(terminal).
+            parser(parser).
+            history(history).
+            completer(completer).
+            option(LineReader.Option.AUTO_FRESH_LINE, false).
+            build();
+    }
+
+    public void runMainLoop() throws Exception {
+        terminal.writer().println("[ Kafka Metadata Shell ]");
+        terminal.flush();
+        Commands commands = new Commands(true);
+        while (true) {
+            try {
+                reader.readLine(">> ");
+                ParsedLine parsedLine = reader.getParsedLine();
+                Commands.Handler handler = commands.parseCommand(parsedLine.words());
+                handler.run(Optional.of(this), terminal.writer(), nodeManager);
+                terminal.writer().flush();
+            } catch (UserInterruptException eof) {
+                // Handle ths user pressing Control-C.
+                // TODO: how can we print this on the same line as the prompt like
+                // bash does?
+                terminal.writer().println("^C");
+            } catch (EndOfFileException eof) {
+                return;
+            }
+        }
+    }
+
+    public int screenWidth() {
+        return terminal.getWidth();
+    }
+
+    public Iterator<Entry<Integer, String>> history(int numEntriesToShow) {
+        if (numEntriesToShow < 0) {
+            numEntriesToShow = 0;
+        }
+        int last = history.last();
+        if (numEntriesToShow > last + 1) {
+            numEntriesToShow = last + 1;
+        }
+        int first = last - numEntriesToShow + 1;
+        if (first < history.first()) {
+            first = history.first();
+        }
+        return new HistoryIterator(first, last);
+    }
+
+    public class HistoryIterator implements  Iterator<Entry<Integer, String>> {
+        private int index;
+        private int last;
+
+        HistoryIterator(int index, int last) {
+            this.index = index;
+            this.last = last;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return index <= last;
+        }
+
+        @Override
+        public Entry<Integer, String> next() {
+            if (index > last) {
+                throw new NoSuchElementException();
+            }
+            int p = index++;
+            return new AbstractMap.SimpleImmutableEntry<>(p, history.get(p));
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        terminal.close();
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java
new file mode 100644
index 0000000..6260d12
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.GlobVisitor.MetadataNodeInfo;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.jline.reader.Candidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * Implements the ls command.
+ */
+public final class LsCommandHandler implements Commands.Handler {
+    private static final Logger log = LoggerFactory.getLogger(LsCommandHandler.class);
+
+    public final static Commands.Type TYPE = new LsCommandType();
+
+    public static class LsCommandType implements Commands.Type {
+        private LsCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "ls";
+        }
+
+        @Override
+        public String description() {
+            return "List metadata nodes.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return false;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            parser.addArgument("targets").
+                nargs("*").
+                help("The metadata node paths to list.");
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new LsCommandHandler(namespace.getList("targets"));
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
+                candidates);
+        }
+    }
+
+    private final List<String> targets;
+
+    public LsCommandHandler(List<String> targets) {
+        this.targets = targets;
+    }
+
+    static class TargetDirectory {
+        private final String name;
+        private final List<String> children;
+
+        TargetDirectory(String name, List<String> children) {
+            this.name = name;
+            this.children = children;
+        }
+    }
+
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) throws Exception {
+        List<String> targetFiles = new ArrayList<>();
+        List<TargetDirectory> targetDirectories = new ArrayList<>();
+        for (String target : CommandUtils.getEffectivePaths(targets)) {
+            manager.visit(new GlobVisitor(target, entryOption -> {
+                if (entryOption.isPresent()) {
+                    MetadataNodeInfo info = entryOption.get();
+                    MetadataNode node = info.node();
+                    if (node instanceof DirectoryNode) {
+                        DirectoryNode directory = (DirectoryNode) node;
+                        List<String> children = new ArrayList<>();
+                        children.addAll(directory.children().keySet());
+                        targetDirectories.add(
+                            new TargetDirectory(info.lastPathComponent(), children));
+                    } else if (node instanceof FileNode) {
+                        targetFiles.add(info.lastPathComponent());
+                    }
+                } else {
+                    writer.println("ls: " + target + ": no such file or directory.");
+                }
+            }));
+        }
+        OptionalInt screenWidth = shell.isPresent() ?
+            OptionalInt.of(shell.get().screenWidth()) : OptionalInt.empty();
+        log.trace("LS : targetFiles = {}, targetDirectories = {}, screenWidth = {}",
+            targetFiles, targetDirectories, screenWidth);
+        printTargets(writer, screenWidth, targetFiles, targetDirectories);
+    }
+
+    static void printTargets(PrintWriter writer,
+                             OptionalInt screenWidth,
+                             List<String> targetFiles,
+                             List<TargetDirectory> targetDirectories) {
+        printEntries(writer, "", screenWidth, targetFiles);
+        boolean needIntro = targetFiles.size() > 0 || targetDirectories.size() > 1;
+        boolean firstIntro = targetFiles.isEmpty();
+        for (TargetDirectory targetDirectory : targetDirectories) {
+            String intro = "";
+            if (needIntro) {
+                if (!firstIntro) {
+                    intro = intro + String.format("%n");
+                }
+                intro = intro + targetDirectory.name + ":";
+                firstIntro = false;
+            }
+            log.trace("LS : targetDirectory name = {}, children = {}",
+                targetDirectory.name, targetDirectory.children);
+            printEntries(writer, intro, screenWidth, targetDirectory.children);
+        }
+    }
+
+    static void printEntries(PrintWriter writer,
+                             String intro,
+                             OptionalInt screenWidth,
+                             List<String> entries) {
+        if (entries.isEmpty()) {
+            return;
+        }
+        if (!intro.isEmpty()) {
+            writer.println(intro);
+        }
+        ColumnSchema columnSchema = calculateColumnSchema(screenWidth, entries);
+        int numColumns = columnSchema.numColumns();
+        int numLines = (entries.size() + numColumns - 1) / numColumns;
+        for (int line = 0; line < numLines; line++) {
+            StringBuilder output = new StringBuilder();
+            for (int column = 0; column < numColumns; column++) {
+                int entryIndex = line + (column * columnSchema.entriesPerColumn());
+                if (entryIndex < entries.size()) {
+                    String entry = entries.get(entryIndex);
+                    output.append(entry);
+                    if (column < numColumns - 1) {
+                        int width = columnSchema.columnWidth(column);
+                        for (int i = 0; i < width - entry.length(); i++) {
+                            output.append(" ");
+                        }
+                    }
+                }
+            }
+            writer.println(output.toString());
+        }
+    }
+
+    static ColumnSchema calculateColumnSchema(OptionalInt screenWidth,
+                                              List<String> entries) {
+        if (!screenWidth.isPresent()) {
+            return new ColumnSchema(1, entries.size());
+        }
+        int maxColumns = screenWidth.getAsInt() / 4;
+        if (maxColumns <= 1) {
+            return new ColumnSchema(1, entries.size());
+        }
+        ColumnSchema[] schemas = new ColumnSchema[maxColumns];
+        for (int numColumns = 1; numColumns <= maxColumns; numColumns++) {
+            schemas[numColumns - 1] = new ColumnSchema(numColumns,
+                (entries.size() + numColumns - 1) / numColumns);
+        }
+        for (int i = 0; i < entries.size(); i++) {
+            String entry = entries.get(i);
+            for (int s = 0; s < schemas.length; s++) {
+                ColumnSchema schema = schemas[s];
+                schema.process(i, entry);
+            }
+        }
+        for (int s = schemas.length - 1; s > 0; s--) {
+            ColumnSchema schema = schemas[s];
+            if (schema.columnWidths[schema.columnWidths.length - 1] != 0 &&
+                    schema.totalWidth() <= screenWidth.getAsInt()) {
+                return schema;
+            }
+        }
+        return schemas[0];
+    }
+
+    static class ColumnSchema {
+        private final int[] columnWidths;
+        private final int entriesPerColumn;
+
+        ColumnSchema(int numColumns, int entriesPerColumn) {
+            this.columnWidths = new int[numColumns];
+            this.entriesPerColumn = entriesPerColumn;
+        }
+
+        ColumnSchema setColumnWidths(Integer... widths) {
+            for (int i = 0; i < widths.length; i++) {
+                columnWidths[i] = widths[i];
+            }
+            return this;
+        }
+
+        void process(int entryIndex, String output) {
+            int columnIndex = entryIndex / entriesPerColumn;
+            columnWidths[columnIndex] = Math.max(
+                columnWidths[columnIndex], output.length() + 2);
+        }
+
+        int totalWidth() {
+            int total = 0;
+            for (int i = 0; i < columnWidths.length; i++) {
+                total += columnWidths[i];
+            }
+            return total;
+        }
+
+        int numColumns() {
+            return columnWidths.length;
+        }
+
+        int columnWidth(int columnIndex) {
+            return columnWidths[columnIndex];
+        }
+
+        int entriesPerColumn() {
+            return entriesPerColumn;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(columnWidths, entriesPerColumn);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof ColumnSchema)) return false;
+            ColumnSchema other = (ColumnSchema) o;
+            if (entriesPerColumn != other.entriesPerColumn) return false;
+            if (!Arrays.equals(columnWidths, other.columnWidths)) return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder("ColumnSchema(columnWidths=[");
+            String prefix = "";
+            for (int i = 0; i < columnWidths.length; i++) {
+                bld.append(prefix);
+                bld.append(columnWidths[i]);
+                prefix = ", ";
+            }
+            bld.append("], entriesPerColumn=").append(entriesPerColumn).append(")");
+            return bld.toString();
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(targets);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof LsCommandHandler)) return false;
+        LsCommandHandler o = (LsCommandHandler) other;
+        if (!Objects.equals(o.targets, targets)) return false;
+        return true;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java
new file mode 100644
index 0000000..dcd0b8c
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Implements the manual command.
+ */
+public final class ManCommandHandler implements Commands.Handler {
+    private final String cmd;
+
+    public final static Commands.Type TYPE = new ManCommandType();
+
+    public static class ManCommandType implements Commands.Type {
+        private ManCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "man";
+        }
+
+        @Override
+        public String description() {
+            return "Show the help text for a specific command.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return true;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            parser.addArgument("cmd").
+                nargs(1).
+                help("The command to get help text for.");
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new ManCommandHandler(namespace.<String>getList("cmd").get(0));
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            if (nextWords.size() == 1) {
+                CommandUtils.completeCommand(nextWords.get(0), candidates);
+            }
+        }
+    }
+
+    public ManCommandHandler(String cmd) {
+        this.cmd = cmd;
+    }
+
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) {
+        Commands.Type type = Commands.TYPES.get(cmd);
+        if (type == null) {
+            writer.println("man: unknown command " + cmd +
+                ". Type help to get a list of commands.");
+        } else {
+            ArgumentParser parser = ArgumentParsers.newArgumentParser(type.name(), false);
+            type.addArguments(parser);
+            writer.printf("%s: %s%n%n", cmd, type.description());
+            parser.printHelp(writer);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return cmd.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof ManCommandHandler)) return false;
+        ManCommandHandler o = (ManCommandHandler) other;
+        if (!o.cmd.equals(cmd)) return false;
+        return true;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java
new file mode 100644
index 0000000..3764a17
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * A node in the metadata tool.
+ */
+public interface MetadataNode {
+    class DirectoryNode implements MetadataNode {
+        private final TreeMap<String, MetadataNode> children = new TreeMap<>();
+
+        public DirectoryNode mkdirs(String... names) {
+            if (names.length == 0) {
+                throw new RuntimeException("Invalid zero-length path");
+            }
+            DirectoryNode node = this;
+            for (int i = 0; i < names.length; i++) {
+                MetadataNode nextNode = node.children.get(names[i]);
+                if (nextNode == null) {
+                    nextNode = new DirectoryNode();
+                    node.children.put(names[i], nextNode);
+                } else {
+                    if (!(nextNode instanceof DirectoryNode)) {
+                        throw new NotDirectoryException();
+                    }
+                }
+                node = (DirectoryNode) nextNode;
+            }
+            return node;
+        }
+
+        public void rmrf(String... names) {
+            if (names.length == 0) {
+                throw new RuntimeException("Invalid zero-length path");
+            }
+            DirectoryNode node = this;
+            for (int i = 0; i < names.length - 1; i++) {
+                MetadataNode nextNode = node.children.get(names[i]);
+                if (nextNode == null || !(nextNode instanceof DirectoryNode)) {
+                    throw new RuntimeException("Unable to locate directory /" +
+                        String.join("/", names));
+                }
+                node = (DirectoryNode) nextNode;
+            }
+            node.children.remove(names[names.length - 1]);
+        }
+
+        public FileNode create(String name) {
+            MetadataNode node = children.get(name);
+            if (node == null) {
+                node = new FileNode();
+                children.put(name, node);
+            } else {
+                if (!(node instanceof FileNode)) {
+                    throw new NotFileException();
+                }
+            }
+            return (FileNode) node;
+        }
+
+        public MetadataNode child(String component) {
+            return children.get(component);
+        }
+
+        public NavigableMap<String, MetadataNode> children() {
+            return children;
+        }
+
+        public void addChild(String name, DirectoryNode child) {
+            children.put(name, child);
+        }
+
+        public DirectoryNode directory(String... names) {
+            if (names.length == 0) {
+                throw new RuntimeException("Invalid zero-length path");
+            }
+            DirectoryNode node = this;
+            for (int i = 0; i < names.length; i++) {
+                MetadataNode nextNode = node.children.get(names[i]);
+                if (nextNode == null || !(nextNode instanceof DirectoryNode)) {
+                    throw new RuntimeException("Unable to locate directory /" +
+                        String.join("/", names));
+                }
+                node = (DirectoryNode) nextNode;
+            }
+            return node;
+        }
+
+        public FileNode file(String... names) {
+            if (names.length == 0) {
+                throw new RuntimeException("Invalid zero-length path");
+            }
+            DirectoryNode node = this;
+            for (int i = 0; i < names.length - 1; i++) {
+                MetadataNode nextNode = node.children.get(names[i]);
+                if (nextNode == null || !(nextNode instanceof DirectoryNode)) {
+                    throw new RuntimeException("Unable to locate file /" +
+                        String.join("/", names));
+                }
+                node = (DirectoryNode) nextNode;
+            }
+            MetadataNode nextNode = node.child(names[names.length -  1]);
+            if (nextNode == null || !(nextNode instanceof FileNode)) {
+                throw new RuntimeException("Unable to locate file /" +
+                    String.join("/", names));
+            }
+            return (FileNode) nextNode;
+        }
+    }
+
+    class FileNode implements MetadataNode {
+        private String contents;
+
+        void setContents(String contents) {
+            this.contents = contents;
+        }
+
+        String contents() {
+            return contents;
+        }
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
new file mode 100644
index 0000000..991712a
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Maintains the in-memory metadata for the metadata tool.
+ */
+public final class MetadataNodeManager implements AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(MetadataNodeManager.class);
+
+    public static class Data {
+        private final DirectoryNode root = new DirectoryNode();
+        private String workingDirectory = "/";
+
+        public DirectoryNode root() {
+            return root;
+        }
+
+        public String workingDirectory() {
+            return workingDirectory;
+        }
+
+        public void setWorkingDirectory(String workingDirectory) {
+            this.workingDirectory = workingDirectory;
+        }
+    }
+
+    class LogListener implements MetaLogListener, RaftClient.Listener<ApiMessageAndVersion> {
+        @Override
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            try {
+                // TODO: handle lastOffset
+                while (reader.hasNext()) {
+                    BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
+                    for (ApiMessageAndVersion messageAndVersion : batch.records()) {
+                        handleMessage(messageAndVersion.message());
+                    }
+                }
+            } finally {
+                reader.close();
+            }
+        }
+
+        @Override
+        public void handleCommits(long lastOffset, List<ApiMessage> messages) {
+            appendEvent("handleCommits", () -> {
+                log.error("handleCommits " + messages + " at offset " + lastOffset);
+                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+                dir.create("offset").setContents(String.valueOf(lastOffset));
+                for (ApiMessage message : messages) {
+                    handleMessage(message);
+                }
+            }, null);
+        }
+
+        @Override
+        public void handleNewLeader(MetaLogLeader leader) {
+            appendEvent("handleNewLeader", () -> {
+                log.error("handleNewLeader " + leader);
+                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+                dir.create("leader").setContents(leader.toString());
+            }, null);
+        }
+
+        @Override
+        public void handleClaim(int epoch) {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")");
+        }
+
+        @Override
+        public void handleRenounce(long epoch) {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")");
+        }
+
+        @Override
+        public void handleResign() {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("MetaLogListener sent handleResign()");
+        }
+
+        @Override
+        public void beginShutdown() {
+            log.debug("MetaLogListener sent beginShutdown");
+        }
+    }
+
+    private final Data data = new Data();
+    private final LogListener logListener = new LogListener();
+    private final ObjectMapper objectMapper;
+    private final KafkaEventQueue queue;
+
+    public MetadataNodeManager() {
+        this.objectMapper = new ObjectMapper();
+        this.objectMapper.registerModule(new Jdk8Module());
+        this.queue = new KafkaEventQueue(Time.SYSTEM,
+            new LogContext("[node-manager-event-queue] "), "");
+    }
+
+    public void setup() throws Exception {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        appendEvent("createShellNodes", () -> {
+            DirectoryNode directory = data.root().mkdirs("local");
+            directory.create("version").setContents(AppInfoParser.getVersion());
+            directory.create("commitId").setContents(AppInfoParser.getCommitId());
+            future.complete(null);
+        }, future);
+        future.get();
+    }
+
+    public LogListener logListener() {
+        return logListener;
+    }
+
+    @Override
+    public void close() throws Exception {
+        queue.close();
+    }
+
+    public void visit(Consumer<Data> consumer) throws Exception {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        appendEvent("visit", () -> {
+            consumer.accept(data);
+            future.complete(null);
+        }, future);
+        future.get();
+    }
+
+    private void appendEvent(String name, Runnable runnable, CompletableFuture<?> future) {
+        queue.append(new EventQueue.Event() {
+            @Override
+            public void run() throws Exception {
+                runnable.run();
+            }
+
+            @Override
+            public void handleException(Throwable e) {
+                log.error("Unexpected error while handling event " + name, e);
+                if (future != null) {
+                    future.completeExceptionally(e);
+                }
+            }
+        });
+    }
+
+    private void handleMessage(ApiMessage message) {
+        try {
+            MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+            handleCommitImpl(type, message);
+        } catch (Exception e) {
+            log.error("Error processing record of type " + message.apiKey(), e);
+        }
+    }
+
+    private void handleCommitImpl(MetadataRecordType type, ApiMessage message)
+            throws Exception {
+        switch (type) {
+            case REGISTER_BROKER_RECORD: {
+                DirectoryNode brokersNode = data.root.mkdirs("brokers");
+                RegisterBrokerRecord record = (RegisterBrokerRecord) message;
+                DirectoryNode brokerNode = brokersNode.
+                    mkdirs(Integer.toString(record.brokerId()));
+                FileNode registrationNode = brokerNode.create("registration");
+                registrationNode.setContents(record.toString());
+                brokerNode.create("isFenced").setContents("true");
+                break;
+            }
+            case UNREGISTER_BROKER_RECORD: {
+                UnregisterBrokerRecord record = (UnregisterBrokerRecord) message;
+                data.root.rmrf("brokers", Integer.toString(record.brokerId()));
+                break;
+            }
+            case TOPIC_RECORD: {
+                TopicRecord record = (TopicRecord) message;
+                DirectoryNode topicsDirectory = data.root.mkdirs("topics");
+                DirectoryNode topicDirectory = topicsDirectory.mkdirs(record.name());
+                topicDirectory.create("id").setContents(record.topicId().toString());
+                topicDirectory.create("name").setContents(record.name().toString());
+                DirectoryNode topicIdsDirectory = data.root.mkdirs("topicIds");
+                topicIdsDirectory.addChild(record.topicId().toString(), topicDirectory);
+                break;
+            }
+            case PARTITION_RECORD: {
+                PartitionRecord record = (PartitionRecord) message;
+                DirectoryNode topicDirectory =
+                    data.root.mkdirs("topicIds").mkdirs(record.topicId().toString());
+                DirectoryNode partitionDirectory =
+                    topicDirectory.mkdirs(Integer.toString(record.partitionId()));
+                JsonNode node = PartitionRecordJsonConverter.
+                    write(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION);
+                partitionDirectory.create("data").setContents(node.toPrettyString());
+                break;
+            }
+            case CONFIG_RECORD: {
+                ConfigRecord record = (ConfigRecord) message;
+                String typeString = "";
+                switch (ConfigResource.Type.forId(record.resourceType())) {
+                    case BROKER:
+                        typeString = "broker";
+                        break;
+                    case TOPIC:
+                        typeString = "topic";
+                        break;
+                    default:
+                        throw new RuntimeException("Error processing CONFIG_RECORD: " +
+                            "Can't handle ConfigResource.Type " + record.resourceType());
+                }
+                DirectoryNode configDirectory = data.root.mkdirs("configs").
+                    mkdirs(typeString).mkdirs(record.resourceName());
+                if (record.value() == null) {
+                    configDirectory.rmrf(record.name());
+                } else {
+                    configDirectory.create(record.name()).setContents(record.value());
+                }
+                break;
+            }
+            case ISR_CHANGE_RECORD: {
+                IsrChangeRecord record = (IsrChangeRecord) message;
+                FileNode file = data.root.file("topicIds", record.topicId().toString(),
+                    Integer.toString(record.partitionId()), "data");
+                JsonNode node = objectMapper.readTree(file.contents());
+                PartitionRecord partition = PartitionRecordJsonConverter.
+                    read(node, PartitionRecord.HIGHEST_SUPPORTED_VERSION);
+                partition.setIsr(record.isr());
+                partition.setLeader(record.leader());
+                partition.setLeaderEpoch(record.leaderEpoch());
+                partition.setPartitionEpoch(record.partitionEpoch());
+                file.setContents(PartitionRecordJsonConverter.write(partition,
+                    PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
+                break;
+            }
+            case FENCE_BROKER_RECORD: {
+                FenceBrokerRecord record = (FenceBrokerRecord) message;
+                data.root.mkdirs("brokers", Integer.toString(record.id())).
+                    create("isFenced").setContents("true");
+                break;
+            }
+            case UNFENCE_BROKER_RECORD: {
+                UnfenceBrokerRecord record = (UnfenceBrokerRecord) message;
+                data.root.mkdirs("brokers", Integer.toString(record.id())).
+                    create("isFenced").setContents("false");
+                break;
+            }
+            case REMOVE_TOPIC_RECORD: {
+                RemoveTopicRecord record = (RemoveTopicRecord) message;
+                DirectoryNode topicsDirectory =
+                    data.root.directory("topicIds", record.topicId().toString());
+                String name = topicsDirectory.file("name").contents();
+                data.root.rmrf("topics", name);
+                data.root.rmrf("topicIds", record.topicId().toString());
+                break;
+            }
+            default:
+                throw new RuntimeException("Unhandled metadata record type");
+        }
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
new file mode 100644
index 0000000..16e5e31
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaConfig;
+import kafka.server.MetaProperties;
+import kafka.server.Server;
+import kafka.tools.TerseFailure;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.compat.java8.OptionConverters;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+
+/**
+ * The Kafka metadata tool.
+ */
+public final class MetadataShell {
+    private static final Logger log = LoggerFactory.getLogger(MetadataShell.class);
+
+    public static class Builder {
+        private String controllers;
+        private String configPath;
+        private File tempDir;
+        private String snapshotPath;
+
+        public Builder setControllers(String controllers) {
+            this.controllers = controllers;
+            return this;
+        }
+
+        public Builder setConfigPath(String configPath) {
+            this.configPath = configPath;
+            return this;
+        }
+
+        public Builder setSnapshotPath(String snapshotPath) {
+            this.snapshotPath = snapshotPath;
+            return this;
+        }
+
+        public Builder setTempDir(File tempDir) {
+            this.tempDir = tempDir;
+            return this;
+        }
+
+        public MetadataShell build() throws Exception {
+            if (snapshotPath != null) {
+                if (controllers != null) {
+                    throw new RuntimeException("If you specify a snapshot path, you " +
+                        "must not also specify controllers to connect to.");
+                }
+                return buildWithSnapshotReader();
+            } else {
+                return buildWithControllerConnect();
+            }
+        }
+
+        public MetadataShell buildWithControllerConnect() throws Exception {
+            Properties properties = null;
+            if (configPath != null) {
+                properties = Utils.loadProps(configPath);
+            } else {
+                properties = new Properties();
+            }
+            if (controllers != null) {
+                properties.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG,
+                    controllers);
+            }
+            if (properties.getProperty(RaftConfig.QUORUM_VOTERS_CONFIG) == null) {
+                throw new TerseFailure("Please use --controllers to specify the quorum voters.");
+            }
+            // TODO: we really shouldn't have to set up a fake broker config like this.
+            // In particular, it should be possible to run the KafkRaftManager without
+            // using a log directory at all.  And we should be able to set -1 as our ID,
+            // since we're not a voter.
+            final int fakeId = 123456;
+            properties.setProperty(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                tempDir.getAbsolutePath());
+            properties.remove(KafkaConfig$.MODULE$.LogDirProp());
+            properties.remove(KafkaConfig$.MODULE$.LogDirsProp());
+            properties.remove(KafkaConfig$.MODULE$.NodeIdProp());
+            properties.setProperty(KafkaConfig$.MODULE$.NodeIdProp(), Integer.toString(fakeId));
+            properties.setProperty(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
+            KafkaConfig config = new KafkaConfig(properties);
+            MetaProperties metaProperties = new MetaProperties(Uuid.ZERO_UUID, fakeId);
+            TopicPartition metadataPartition =
+                new TopicPartition(Server.metadataTopicName(), 0);
+            KafkaRaftManager<ApiMessageAndVersion> raftManager = null;
+            MetadataNodeManager nodeManager = null;
+            try {
+                raftManager = new KafkaRaftManager<ApiMessageAndVersion>(metaProperties,
+                    config,
+                    new MetadataRecordSerde(),
+                    metadataPartition,
+                    Time.SYSTEM,
+                    new Metrics(),
+                    OptionConverters.toScala(Optional.empty()));
+                nodeManager = new MetadataNodeManager();
+            } catch (Throwable e) {
+                log.error("Initialization error", e);
+                if (raftManager != null) {
+                    raftManager.shutdown();
+                }
+                if (nodeManager != null) {
+                    nodeManager.close();
+                }
+                throw e;
+            }
+            return new MetadataShell(raftManager, null, nodeManager);
+        }
+
+        public MetadataShell buildWithSnapshotReader() throws Exception {
+            MetadataNodeManager nodeManager = null;
+            SnapshotReader snapshotReader = null;
+            try {
+                nodeManager = new MetadataNodeManager();
+                snapshotReader = new SnapshotReader(snapshotPath, nodeManager.logListener());
+                return new MetadataShell(null, snapshotReader, nodeManager);
+            } catch (Throwable e) {
+                log.error("Initialization error", e);
+                if (snapshotReader != null) {
+                    snapshotReader.close();
+                }
+                if (nodeManager != null) {
+                    nodeManager.close();
+                }
+                throw e;
+            }
+        }
+    }
+
+    private final KafkaRaftManager<ApiMessageAndVersion> raftManager;
+
+    private final SnapshotReader snapshotReader;
+
+    private final MetadataNodeManager nodeManager;
+
+    public MetadataShell(KafkaRaftManager<ApiMessageAndVersion> raftManager,
+                        SnapshotReader snapshotReader,
+                        MetadataNodeManager nodeManager) {
+        this.raftManager = raftManager;
+        this.snapshotReader = snapshotReader;
+        this.nodeManager = nodeManager;
+    }
+
+    public void run(List<String> args) throws Exception {
+        nodeManager.setup();
+        if (raftManager != null) {
+            raftManager.startup();
+            raftManager.register(nodeManager.logListener());
+        } else if (snapshotReader != null) {
+            snapshotReader.startup();
+        } else {
+            throw new RuntimeException("Expected either a raft manager or snapshot reader");
+        }
+        if (args == null || args.isEmpty()) {
+            // Interactive mode.
+            try (InteractiveShell shell = new InteractiveShell(nodeManager)) {
+                shell.runMainLoop();
+            }
+        } else {
+            // Non-interactive mode.
+            Commands commands = new Commands(false);
+            try (PrintWriter writer = new PrintWriter(new BufferedWriter(
+                    new OutputStreamWriter(System.out, StandardCharsets.UTF_8)))) {
+                Commands.Handler handler = commands.parseCommand(args);
+                handler.run(Optional.empty(), writer, nodeManager);
+                writer.flush();
+            }
+        }
+    }
+
+    public void close() throws Exception {
+        if (raftManager != null) {
+            raftManager.shutdown();
+        }
+        if (snapshotReader != null) {
+            snapshotReader.close();
+        }
+        nodeManager.close();
+    }
+
+    public static void main(String[] args) throws Exception {
+        ArgumentParser parser = ArgumentParsers
+            .newArgumentParser("metadata-tool")
+            .defaultHelp(true)
+            .description("The Apache Kafka metadata tool");
+        parser.addArgument("--controllers", "-C")
+            .type(String.class)
+            .help("The quorum voter connection string to use.");
+        parser.addArgument("--config", "-c")
+            .type(String.class)
+            .help("The configuration file to use.");
+        parser.addArgument("--snapshot", "-s")
+            .type(String.class)
+            .help("The snapshot file to read.");
+        parser.addArgument("command")
+            .nargs("*")
+            .help("The command to run.");
+        Namespace res = parser.parseArgsOrFail(args);
+        try {
+            Builder builder = new Builder();
+            builder.setControllers(res.getString("controllers"));
+            builder.setConfigPath(res.getString("config"));
+            builder.setSnapshotPath(res.getString("snapshot"));
+            Path tempDir = Files.createTempDirectory("MetadataShell");
+            Exit.addShutdownHook("agent-shutdown-hook", () -> {
+                log.debug("Removing temporary directory " + tempDir.toAbsolutePath().toString());
+                try {
+                    Utils.delete(tempDir.toFile());
+                } catch (Exception e) {
+                    log.error("Got exception while removing temporary directory " +
+                        tempDir.toAbsolutePath().toString());
+                }
+            });
+            builder.setTempDir(tempDir.toFile());
+            MetadataShell shell = builder.build();
+            try {
+                shell.run(res.getList("command"));
+            } finally {
+                shell.close();
+            }
+            Exit.exit(0);
+        } catch (TerseFailure e) {
+            System.err.println("Error: " + e.getMessage());
+            Exit.exit(1);
+        } catch (Throwable e) {
+            System.err.println("Unexpected error: " +
+                (e.getMessage() == null ? "" : e.getMessage()));
+            e.printStackTrace(System.err);
+            Exit.exit(1);
+        }
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
new file mode 100644
index 0000000..1756ba7
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import java.io.PrintWriter;
+import java.util.Optional;
+
+/**
+ * Does nothing.
+ */
+public final class NoOpCommandHandler implements Commands.Handler {
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) {
+    }
+
+    @Override
+    public int hashCode() {
+        return 0;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof NoOpCommandHandler)) return false;
+        return true;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java b/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java
new file mode 100644
index 0000000..6925347
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+/**
+ * An exception that is thrown when a non-directory node is treated like a
+ * directory.
+ */
+public class NotDirectoryException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public NotDirectoryException() {
+        super();
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NotFileException.java b/shell/src/main/java/org/apache/kafka/shell/NotFileException.java
new file mode 100644
index 0000000..cbc2a83
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/NotFileException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+/**
+ * An exception that is thrown when a non-file node is treated like a
+ * file.
+ */
+public class NotFileException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public NotFileException() {
+        super();
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java
new file mode 100644
index 0000000..1e5b5da
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.jline.reader.Candidate;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Implements the pwd command.
+ */
+public final class PwdCommandHandler implements Commands.Handler {
+    public final static Commands.Type TYPE = new PwdCommandType();
+
+    public static class PwdCommandType implements Commands.Type {
+        private PwdCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "pwd";
+        }
+
+        @Override
+        public String description() {
+            return "Print the current working directory.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return true;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            // nothing to do
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new PwdCommandHandler();
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            // nothing to do
+        }
+    }
+
+    @Override
+    public void run(Optional<InteractiveShell> shell,
+                    PrintWriter writer,
+                    MetadataNodeManager manager) throws Exception {
+        manager.visit(data -> {
+            writer.println(data.workingDirectory());
+        });
+    }
+
+    @Override
+    public int hashCode() {
+        return 0;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof PwdCommandHandler)) return false;
+        return true;
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/SnapshotReader.java b/shell/src/main/java/org/apache/kafka/shell/SnapshotReader.java
new file mode 100644
index 0000000..5de1dfb
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/SnapshotReader.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * The Kafka metadata tool.
+ */
+public final class SnapshotReader implements AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(SnapshotReader.class);
+
+    private final String snapshotPath;
+    private final MetaLogListener listener;
+    private final KafkaEventQueue queue;
+    private FileRecords fileRecords;
+    private Iterator<FileChannelRecordBatch> batchIterator;
+
+    public SnapshotReader(String snapshotPath, MetaLogListener listener) {
+        this.snapshotPath = snapshotPath;
+        this.listener = listener;
+        this.queue = new KafkaEventQueue(Time.SYSTEM,
+            new LogContext("[snapshotReaderQueue] "), "snapshotReaderQueue_");
+    }
+
+    public void startup() throws Exception {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        queue.append(new EventQueue.Event() {
+            @Override
+            public void run() throws Exception {
+                fileRecords = FileRecords.open(new File(snapshotPath), false);
+                batchIterator = fileRecords.batches().iterator();
+                scheduleHandleNextBatch();
+                future.complete(null);
+            }
+
+            @Override
+            public void handleException(Throwable e) {
+                future.completeExceptionally(e);
+                beginShutdown("startup error");
+            }
+        });
+        future.get();
+    }
+
+    private void handleNextBatch() {
+        if (!batchIterator.hasNext()) {
+            beginShutdown("done");
+            return;
+        }
+        FileChannelRecordBatch batch = batchIterator.next();
+        if (batch.isControlBatch()) {
+            handleControlBatch(batch);
+        } else {
+            handleMetadataBatch(batch);
+        }
+        scheduleHandleNextBatch();
+    }
+
+    private void scheduleHandleNextBatch() {
+        queue.append(new EventQueue.Event() {
+            @Override
+            public void run() throws Exception {
+                handleNextBatch();
+            }
+
+            @Override
+            public void handleException(Throwable e) {
+                log.error("Unexpected error while handling a batch of events", e);
+                beginShutdown("handleBatch error");
+            }
+        });
+    }
+
+    private void handleControlBatch(FileChannelRecordBatch batch) {
+        for (Iterator<Record> iter = batch.iterator(); iter.hasNext(); ) {
+            Record record = iter.next();
+            try {
+                short typeId = ControlRecordType.parseTypeId(record.key());
+                ControlRecordType type = ControlRecordType.fromTypeId(typeId);
+                switch (type) {
+                    case LEADER_CHANGE:
+                        LeaderChangeMessage message = new LeaderChangeMessage();
+                        message.read(new ByteBufferAccessor(record.value()), (short) 0);
+                        listener.handleNewLeader(new MetaLogLeader(message.leaderId(),
+                            batch.partitionLeaderEpoch()));
+                        break;
+                    default:
+                        log.error("Ignoring control record with type {} at offset {}",
+                            type, record.offset());
+                }
+            } catch (Throwable e) {
+                log.error("unable to read control record at offset {}", record.offset(), e);
+            }
+        }
+    }
+
+    private void handleMetadataBatch(FileChannelRecordBatch batch) {
+        List<ApiMessage> messages = new ArrayList<>();
+        for (Iterator<Record> iter = batch.iterator(); iter.hasNext(); ) {
+            Record record = iter.next();
+            ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
+            try {
+                int apiKey = accessor.readUnsignedVarint();
+                if (apiKey > Short.MAX_VALUE || apiKey < 0) {
+                    throw new RuntimeException("Invalid apiKey value " + apiKey);
+                }
+                int apiVersion = accessor.readUnsignedVarint();
+                if (apiVersion > Short.MAX_VALUE || apiVersion < 0) {
+                    throw new RuntimeException("Invalid apiVersion value " + apiVersion);
+                }
+                ApiMessage message = MetadataRecordType.fromId((short) apiKey).newMetadataRecord();
+                message.read(accessor, (short) apiVersion);
+                messages.add(message);
+            } catch (Throwable e) {
+                log.error("unable to read metadata record at offset {}", record.offset(), e);
+            }
+        }
+        listener.handleCommits(batch.lastOffset(), messages);
+    }
+
+    public void beginShutdown(String reason) {
+        queue.beginShutdown(reason, new EventQueue.Event() {
+            @Override
+            public void run() throws Exception {
+                listener.beginShutdown();
+                if (fileRecords != null) {
+                    fileRecords.close();
+                    fileRecords = null;
+                }
+                batchIterator = null;
+            }
+
+            @Override
+            public void handleException(Throwable e) {
+                log.error("shutdown error", e);
+            }
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        beginShutdown("closing");
+        queue.close();
+    }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/CommandTest.java b/shell/src/test/java/org/apache/kafka/shell/CommandTest.java
new file mode 100644
index 0000000..c896a06
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/CommandTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class CommandTest {
+    @Test
+    public void testParseCommands() {
+        assertEquals(new CatCommandHandler(Arrays.asList("foo")),
+            new Commands(true).parseCommand(Arrays.asList("cat", "foo")));
+        assertEquals(new CdCommandHandler(Optional.empty()),
+            new Commands(true).parseCommand(Arrays.asList("cd")));
+        assertEquals(new CdCommandHandler(Optional.of("foo")),
+            new Commands(true).parseCommand(Arrays.asList("cd", "foo")));
+        assertEquals(new ExitCommandHandler(),
+            new Commands(true).parseCommand(Arrays.asList("exit")));
+        assertEquals(new HelpCommandHandler(),
+            new Commands(true).parseCommand(Arrays.asList("help")));
+        assertEquals(new HistoryCommandHandler(3),
+            new Commands(true).parseCommand(Arrays.asList("history", "3")));
+        assertEquals(new HistoryCommandHandler(Integer.MAX_VALUE),
+            new Commands(true).parseCommand(Arrays.asList("history")));
+        assertEquals(new LsCommandHandler(Collections.emptyList()),
+            new Commands(true).parseCommand(Arrays.asList("ls")));
+        assertEquals(new LsCommandHandler(Arrays.asList("abc", "123")),
+            new Commands(true).parseCommand(Arrays.asList("ls", "abc", "123")));
+        assertEquals(new PwdCommandHandler(),
+            new Commands(true).parseCommand(Arrays.asList("pwd")));
+    }
+
+    @Test
+    public void testParseInvalidCommand() {
+        assertEquals(new ErroneousCommandHandler("invalid choice: 'blah' (choose " +
+            "from 'cat', 'cd', 'exit', 'find', 'help', 'history', 'ls', 'man', 'pwd')"),
+            new Commands(true).parseCommand(Arrays.asList("blah")));
+    }
+
+    @Test
+    public void testEmptyCommandLine() {
+        assertEquals(new NoOpCommandHandler(),
+            new Commands(true).parseCommand(Arrays.asList("")));
+        assertEquals(new NoOpCommandHandler(),
+            new Commands(true).parseCommand(Collections.emptyList()));
+    }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java b/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java
new file mode 100644
index 0000000..90c3b5c
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class CommandUtilsTest {
+    @Test
+    public void testSplitPath() {
+        assertEquals(Arrays.asList("alpha", "beta"),
+            CommandUtils.splitPath("/alpha/beta"));
+        assertEquals(Arrays.asList("alpha", "beta"),
+            CommandUtils.splitPath("//alpha/beta/"));
+    }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java b/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java
new file mode 100644
index 0000000..da3a7ec
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class GlobComponentTest {
+    private void verifyIsLiteral(GlobComponent globComponent, String component) {
+        assertTrue(globComponent.literal());
+        assertEquals(component, globComponent.component());
+        assertTrue(globComponent.matches(component));
+        assertFalse(globComponent.matches(component + "foo"));
+    }
+
+    @Test
+    public void testLiteralComponent() {
+        verifyIsLiteral(new GlobComponent("abc"), "abc");
+        verifyIsLiteral(new GlobComponent(""), "");
+        verifyIsLiteral(new GlobComponent("foobar_123"), "foobar_123");
+        verifyIsLiteral(new GlobComponent("$blah+"), "$blah+");
+    }
+
+    @Test
+    public void testToRegularExpression() {
+        assertEquals(null, GlobComponent.toRegularExpression("blah"));
+        assertEquals(null, GlobComponent.toRegularExpression(""));
+        assertEquals(null, GlobComponent.toRegularExpression("does not need a regex, actually"));
+        assertEquals("^\\$blah.*$", GlobComponent.toRegularExpression("$blah*"));
+        assertEquals("^.*$", GlobComponent.toRegularExpression("*"));
+        assertEquals("^foo(?:(?:bar)|(?:baz))$", GlobComponent.toRegularExpression("foo{bar,baz}"));
+    }
+
+    @Test
+    public void testGlobMatch() {
+        GlobComponent star = new GlobComponent("*");
+        assertFalse(star.literal());
+        assertTrue(star.matches(""));
+        assertTrue(star.matches("anything"));
+        GlobComponent question = new GlobComponent("b?b");
+        assertFalse(question.literal());
+        assertFalse(question.matches(""));
+        assertTrue(question.matches("bob"));
+        assertTrue(question.matches("bib"));
+        assertFalse(question.matches("bic"));
+        GlobComponent foobarOrFoobaz = new GlobComponent("foo{bar,baz}");
+        assertFalse(foobarOrFoobaz.literal());
+        assertTrue(foobarOrFoobaz.matches("foobar"));
+        assertTrue(foobarOrFoobaz.matches("foobaz"));
+        assertFalse(foobarOrFoobaz.matches("foobah"));
+        assertFalse(foobarOrFoobaz.matches("foo"));
+        assertFalse(foobarOrFoobaz.matches("baz"));
+    }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java b/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java
new file mode 100644
index 0000000..59eeb5d
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.kafka.shell.GlobVisitor.MetadataNodeInfo;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class GlobVisitorTest {
+    static private final MetadataNodeManager.Data DATA;
+
+    static {
+        DATA = new MetadataNodeManager.Data();
+        DATA.root().mkdirs("alpha", "beta", "gamma");
+        DATA.root().mkdirs("alpha", "theta");
+        DATA.root().mkdirs("foo", "a");
+        DATA.root().mkdirs("foo", "beta");
+        DATA.root().mkdirs("zeta").create("c");
+        DATA.root().mkdirs("zeta");
+        DATA.root().create("zzz");
+        DATA.setWorkingDirectory("foo");
+    }
+
+    static class InfoConsumer implements Consumer<Optional<MetadataNodeInfo>> {
+        private Optional<List<MetadataNodeInfo>> infos = null;
+
+        @Override
+        public void accept(Optional<MetadataNodeInfo> info) {
+            if (infos == null) {
+                if (info.isPresent()) {
+                    infos = Optional.of(new ArrayList<>());
+                    infos.get().add(info.get());
+                } else {
+                    infos = Optional.empty();
+                }
+            } else {
+                if (info.isPresent()) {
+                    infos.get().add(info.get());
+                } else {
+                    throw new RuntimeException("Saw non-empty info after seeing empty info");
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testStarGlob() {
+        InfoConsumer consumer = new InfoConsumer();
+        GlobVisitor visitor = new GlobVisitor("*", consumer);
+        visitor.accept(DATA);
+        assertEquals(Optional.of(Arrays.asList(
+            new MetadataNodeInfo(new String[] {"foo", "a"},
+                DATA.root().directory("foo").child("a")),
+            new MetadataNodeInfo(new String[] {"foo", "beta"},
+                DATA.root().directory("foo").child("beta")))), consumer.infos);
+    }
+
+    @Test
+    public void testDotDot() {
+        InfoConsumer consumer = new InfoConsumer();
+        GlobVisitor visitor = new GlobVisitor("..", consumer);
+        visitor.accept(DATA);
+        assertEquals(Optional.of(Arrays.asList(
+            new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos);
+    }
+
+    @Test
+    public void testDoubleDotDot() {
+        InfoConsumer consumer = new InfoConsumer();
+        GlobVisitor visitor = new GlobVisitor("../..", consumer);
+        visitor.accept(DATA);
+        assertEquals(Optional.of(Arrays.asList(
+            new MetadataNodeInfo(new String[0], DATA.root()))), consumer.infos);
+    }
+
+    @Test
+    public void testZGlob() {
+        InfoConsumer consumer = new InfoConsumer();
+        GlobVisitor visitor = new GlobVisitor("../z*", consumer);
+        visitor.accept(DATA);
+        assertEquals(Optional.of(Arrays.asList(
+            new MetadataNodeInfo(new String[] {"zeta"},
+                DATA.root().child("zeta")),
+            new MetadataNodeInfo(new String[] {"zzz"},
+                DATA.root().child("zzz")))), consumer.infos);
+    }
+
+    @Test
+    public void testBetaOrThetaGlob() {
+        InfoConsumer consumer = new InfoConsumer();
+        GlobVisitor visitor = new GlobVisitor("../*/{beta,theta}", consumer);
+        visitor.accept(DATA);
+        assertEquals(Optional.of(Arrays.asList(
+            new MetadataNodeInfo(new String[] {"alpha", "beta"},
+                DATA.root().directory("alpha").child("beta")),
+            new MetadataNodeInfo(new String[] {"alpha", "theta"},
+                DATA.root().directory("alpha").child("theta")),
+            new MetadataNodeInfo(new String[] {"foo", "beta"},
+                DATA.root().directory("foo").child("beta")))), consumer.infos);
+    }
+
+    @Test
+    public void testNotFoundGlob() {
+        InfoConsumer consumer = new InfoConsumer();
+        GlobVisitor visitor = new GlobVisitor("epsilon", consumer);
+        visitor.accept(DATA);
+        assertEquals(Optional.empty(), consumer.infos);
+    }
+
+    @Test
+    public void testAbsoluteGlob() {
+        InfoConsumer consumer = new InfoConsumer();
+        GlobVisitor visitor = new GlobVisitor("/a?pha", consumer);
+        visitor.accept(DATA);
+        assertEquals(Optional.of(Arrays.asList(
+            new MetadataNodeInfo(new String[] {"alpha"},
+                DATA.root().directory("alpha")))), consumer.infos);
+    }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java b/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java
new file mode 100644
index 0000000..c845706
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.kafka.shell.LsCommandHandler.ColumnSchema;
+import org.apache.kafka.shell.LsCommandHandler.TargetDirectory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.OptionalInt;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class LsCommandHandlerTest {
+    @Test
+    public void testCalculateColumnSchema() {
+        assertEquals(new ColumnSchema(1, 3),
+            LsCommandHandler.calculateColumnSchema(OptionalInt.empty(),
+                Arrays.asList("abc", "def", "ghi")));
+        assertEquals(new ColumnSchema(1, 2),
+            LsCommandHandler.calculateColumnSchema(OptionalInt.of(0),
+                Arrays.asList("abc", "def")));
+        assertEquals(new ColumnSchema(3, 1).setColumnWidths(3, 8, 6),
+            LsCommandHandler.calculateColumnSchema(OptionalInt.of(80),
+                Arrays.asList("a", "abcdef", "beta")));
+        assertEquals(new ColumnSchema(2, 3).setColumnWidths(10, 7),
+            LsCommandHandler.calculateColumnSchema(OptionalInt.of(18),
+                Arrays.asList("alphabet", "beta", "gamma", "theta", "zeta")));
+    }
+
+    @Test
+    public void testPrintEntries() throws Exception {
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+            try (PrintWriter writer = new PrintWriter(new OutputStreamWriter(
+                    stream, StandardCharsets.UTF_8))) {
+                LsCommandHandler.printEntries(writer, "", OptionalInt.of(18),
+                    Arrays.asList("alphabet", "beta", "gamma", "theta", "zeta"));
+            }
+            assertEquals(String.join(String.format("%n"), Arrays.asList(
+                "alphabet  theta",
+                "beta      zeta",
+                "gamma")), stream.toString().trim());
+        }
+    }
+
+    @Test
+    public void testPrintTargets() throws Exception {
+        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+            try (PrintWriter writer = new PrintWriter(new OutputStreamWriter(
+                    stream, StandardCharsets.UTF_8))) {
+                LsCommandHandler.printTargets(writer, OptionalInt.of(18),
+                    Arrays.asList("foo", "foobarbaz", "quux"), Arrays.asList(
+                        new TargetDirectory("/some/dir",
+                            Collections.singletonList("supercalifragalistic")),
+                        new TargetDirectory("/some/other/dir",
+                            Arrays.asList("capability", "delegation", "elephant",
+                                "fungible", "green"))));
+            }
+            assertEquals(String.join(String.format("%n"), Arrays.asList(
+                "foo        quux",
+                "foobarbaz  ",
+                "",
+                "/some/dir:",
+                "supercalifragalistic",
+                "",
+                "/some/other/dir:",
+                "capability",
+                "delegation",
+                "elephant",
+                "fungible",
+                "green")), stream.toString().trim());
+        }
+    }
+}
+
diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java
new file mode 100644
index 0000000..42223c7
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+@Timeout(value = 120000, unit = MILLISECONDS)
+public class MetadataNodeTest {
+    @Test
+    public void testMkdirs() {
+        DirectoryNode root = new DirectoryNode();
+        DirectoryNode defNode = root.mkdirs("abc", "def");
+        DirectoryNode defNode2 = root.mkdirs("abc", "def");
+        assertTrue(defNode == defNode2);
+        DirectoryNode defNode3 = root.directory("abc", "def");
+        assertTrue(defNode == defNode3);
+        root.mkdirs("ghi");
+        assertEquals(new HashSet<>(Arrays.asList("abc", "ghi")), root.children().keySet());
+        assertEquals(Collections.singleton("def"), root.mkdirs("abc").children().keySet());
+        assertEquals(Collections.emptySet(), defNode.children().keySet());
+    }
+
+    @Test
+    public void testRmrf() {
+        DirectoryNode root = new DirectoryNode();
+        DirectoryNode foo = root.mkdirs("foo");
+        foo.mkdirs("a");
+        foo.mkdirs("b");
+        root.mkdirs("baz");
+        assertEquals(new HashSet<>(Arrays.asList("foo", "baz")), root.children().keySet());
+        root.rmrf("foo", "a");
+        assertEquals(new HashSet<>(Arrays.asList("b")), foo.children().keySet());
+        root.rmrf("foo");
+        assertEquals(new HashSet<>(Collections.singleton("baz")), root.children().keySet());
+    }
+
+    @Test
+    public void testCreateFiles() {
+        DirectoryNode root = new DirectoryNode();
+        DirectoryNode abcdNode = root.mkdirs("abcd");
+        FileNode quuxNodde = abcdNode.create("quux");
+        quuxNodde.setContents("quux contents");
+        assertEquals("quux contents", quuxNodde.contents());
+        assertThrows(NotDirectoryException.class, () -> root.mkdirs("abcd", "quux"));
+    }
+}


Mime
View raw message