kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [12/13] Rename client package from kafka.* to org.apache.kafka.*
Date Fri, 07 Feb 2014 00:26:43 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/kafka/clients/producer/internals/Sender.java
deleted file mode 100644
index 5ac487b..0000000
--- a/clients/src/main/java/kafka/clients/producer/internals/Sender.java
+++ /dev/null
@@ -1,504 +0,0 @@
-package kafka.clients.producer.internals;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import kafka.common.Cluster;
-import kafka.common.Node;
-import kafka.common.TopicPartition;
-import kafka.common.errors.NetworkException;
-import kafka.common.network.NetworkReceive;
-import kafka.common.network.NetworkSend;
-import kafka.common.network.Selectable;
-import kafka.common.protocol.ApiKeys;
-import kafka.common.protocol.Errors;
-import kafka.common.protocol.ProtoUtils;
-import kafka.common.protocol.types.Struct;
-import kafka.common.requests.RequestHeader;
-import kafka.common.requests.RequestSend;
-import kafka.common.requests.ResponseHeader;
-import kafka.common.utils.Time;
-
-/**
- * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
- * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
- */
-public class Sender implements Runnable {
-
-    private final Map<Integer, NodeState> nodeState;
-    private final RecordAccumulator accumulator;
-    private final Selectable selector;
-    private final String clientId;
-    private final int maxRequestSize;
-    private final long reconnectBackoffMs;
-    private final short acks;
-    private final int requestTimeout;
-    private final InFlightRequests inFlightRequests;
-    private final Metadata metadata;
-    private final Time time;
-    private int correlation;
-    private boolean metadataFetchInProgress;
-    private volatile boolean running;
-
-    public Sender(Selectable selector,
-                  Metadata metadata,
-                  RecordAccumulator accumulator,
-                  String clientId,
-                  int maxRequestSize,
-                  long reconnectBackoffMs,
-                  short acks,
-                  int requestTimeout,
-                  Time time) {
-        this.nodeState = new HashMap<Integer, NodeState>();
-        this.accumulator = accumulator;
-        this.selector = selector;
-        this.maxRequestSize = maxRequestSize;
-        this.reconnectBackoffMs = reconnectBackoffMs;
-        this.metadata = metadata;
-        this.clientId = clientId;
-        this.running = true;
-        this.requestTimeout = requestTimeout;
-        this.acks = acks;
-        this.inFlightRequests = new InFlightRequests();
-        this.correlation = 0;
-        this.metadataFetchInProgress = false;
-        this.time = time;
-    }
-
-    /**
-     * The main run loop for the sender thread
-     */
-    public void run() {
-        // main loop, runs until close is called
-        while (running) {
-            try {
-                run(time.milliseconds());
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-        // send anything left in the accumulator
-        int unsent = 0;
-        do {
-            try {
-                unsent = run(time.milliseconds());
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        } while (unsent > 0);
-
-        // close all the connections
-        this.selector.close();
-    }
-
-    /**
-     * Run a single iteration of sending
-     * 
-     * @param now The current time
-     * @return The total number of topic/partitions that had data ready (regardless of what we actually sent)
-     */
-    public int run(long now) {
-        Cluster cluster = metadata.fetch();
-        // get the list of partitions with data ready to send
-        List<TopicPartition> ready = this.accumulator.ready(now);
-
-        // prune the list of ready topics to eliminate any that we aren't ready to send yet
-        List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
-
-        // should we update our metadata?
-        List<NetworkSend> sends = new ArrayList<NetworkSend>(sendable.size());
-        InFlightRequest metadataReq = maybeMetadataRequest(cluster, now);
-        if (metadataReq != null) {
-            sends.add(metadataReq.request);
-            this.inFlightRequests.add(metadataReq);
-        }
-
-        // create produce requests
-        List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
-        List<InFlightRequest> requests = collate(cluster, batches);
-        for (int i = 0; i < requests.size(); i++) {
-            InFlightRequest request = requests.get(i);
-            this.inFlightRequests.add(request);
-            sends.add(request.request);
-        }
-
-        // do the I/O
-        try {
-            this.selector.poll(5L, sends);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        // handle responses, connections, and disconnections
-        handleSends(this.selector.completedSends());
-        handleResponses(this.selector.completedReceives(), now);
-        handleDisconnects(this.selector.disconnected());
-        handleConnects(this.selector.connected());
-
-        return ready.size();
-    }
-
-    private InFlightRequest maybeMetadataRequest(Cluster cluster, long now) {
-        if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
-            return null;
-        Node node = cluster.nextNode();
-        NodeState state = nodeState.get(node.id());
-        if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
-            // we don't have a connection to this node right now, make one
-            initiateConnect(node, now);
-            return null;
-        } else if (state.state == ConnectionState.CONNECTED) {
-            this.metadataFetchInProgress = true;
-            return metadataRequest(node.id(), metadata.topics());
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Start closing the sender (won't actually complete until all data is sent out)
-     */
-    public void initiateClose() {
-        this.running = false;
-        this.accumulator.close();
-    }
-
-    /**
-     * Process the set of topic-partitions with data ready to send. If we have a connection to the appropriate node, add
-     * it to the returned set. For any partitions we have no connection to either make one, fetch the appropriate
-     * metdata to be able to do so
-     */
-    private List<TopicPartition> processReadyPartitions(Cluster cluster, List<TopicPartition> ready, long now) {
-        List<TopicPartition> sendable = new ArrayList<TopicPartition>(ready.size());
-        for (TopicPartition tp : ready) {
-            Node node = cluster.leaderFor(tp);
-            if (node == null) {
-                // we don't know about this topic/partition or it has no leader, re-fetch metadata
-                metadata.forceUpdate();
-            } else {
-                NodeState state = nodeState.get(node.id());
-                // TODO: encapsulate this logic somehow
-                if (state == null || (state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttempt > this.reconnectBackoffMs)) {
-                    // we don't have a connection to this node right now, make one
-                    initiateConnect(node, now);
-                } else if (state.state == ConnectionState.CONNECTED && inFlightRequests.canSendMore(node.id())) {
-                    sendable.add(tp);
-                }
-            }
-        }
-        return sendable;
-    }
-
-    /**
-     * Initiate a connection to the given node
-     */
-    private void initiateConnect(Node node, long now) {
-        try {
-            selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 64 * 1024 * 1024, 64 * 1024 * 1024); // TODO
-                                                                                                                              // socket
-                                                                                                                              // buffers
-            nodeState.put(node.id(), new NodeState(ConnectionState.CONNECTING, now));
-        } catch (IOException e) {
-            /* attempt failed, we'll try again after the backoff */
-            nodeState.put(node.id(), new NodeState(ConnectionState.DISCONNECTED, now));
-            /* maybe the problem is our metadata, update it */
-            metadata.forceUpdate();
-        }
-    }
-
-    /**
-     * Handle any closed connections
-     */
-    private void handleDisconnects(List<Integer> disconnects) {
-        for (int node : disconnects) {
-            for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
-                if (request.batches != null) {
-                    for (RecordBatch batch : request.batches.values())
-                        batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
-                    this.accumulator.deallocate(request.batches.values());
-                }
-                NodeState state = this.nodeState.get(request.request.destination());
-                if (state != null)
-                    state.state = ConnectionState.DISCONNECTED;
-            }
-        }
-    }
-
-    /**
-     * Record any connections that completed in our node state
-     */
-    private void handleConnects(List<Integer> connects) {
-        for (Integer id : connects)
-            this.nodeState.get(id).state = ConnectionState.CONNECTED;
-    }
-
-    /**
-     * Process completed sends
-     */
-    public void handleSends(List<NetworkSend> sends) {
-        /* if acks = 0 then the request is satisfied once sent */
-        for (NetworkSend send : sends) {
-            Deque<InFlightRequest> requests = this.inFlightRequests.requestQueue(send.destination());
-            InFlightRequest request = requests.peekFirst();
-            if (!request.expectResponse) {
-                requests.pollFirst();
-                if (request.request.header().apiKey() == ApiKeys.PRODUCE.id) {
-                    for (RecordBatch batch : request.batches.values())
-                        batch.done(-1L, Errors.NONE.exception());
-                    this.accumulator.deallocate(request.batches.values());
-                }
-            }
-        }
-    }
-
-    /**
-     * Handle responses from the server
-     */
-    private void handleResponses(List<NetworkReceive> receives, long now) {
-        for (NetworkReceive receive : receives) {
-            int source = receive.source();
-            InFlightRequest req = inFlightRequests.nextCompleted(source);
-            ResponseHeader header = ResponseHeader.parse(receive.payload());
-            short apiKey = req.request.header().apiKey();
-            Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
-            correlate(req.request.header(), header);
-            if (req.request.header().apiKey() == ApiKeys.PRODUCE.id)
-                handleProduceResponse(req, body);
-            else if (req.request.header().apiKey() == ApiKeys.METADATA.id)
-                handleMetadataResponse(body, now);
-            else
-                throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
-        }
-    }
-
-    private void handleMetadataResponse(Struct body, long now) {
-        this.metadataFetchInProgress = false;
-        Cluster cluster = ProtoUtils.parseMetadataResponse(body);
-        this.metadata.update(cluster, now);
-    }
-
-    /**
-     * Handle a produce response
-     */
-    private void handleProduceResponse(InFlightRequest request, Struct response) {
-        for (Object topicResponse : (Object[]) response.get("responses")) {
-            Struct topicRespStruct = (Struct) topicResponse;
-            String topic = (String) topicRespStruct.get("topic");
-            for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) {
-                Struct partRespStruct = (Struct) partResponse;
-                int partition = (Integer) partRespStruct.get("partition");
-                short errorCode = (Short) partRespStruct.get("error_code");
-                long offset = (Long) partRespStruct.get("base_offset");
-                RecordBatch batch = request.batches.get(new TopicPartition(topic, partition));
-                batch.done(offset, Errors.forCode(errorCode).exception());
-            }
-        }
-        this.accumulator.deallocate(request.batches.values());
-    }
-
-    /**
-     * Validate that the response corresponds to the request we expect or else explode
-     */
-    private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
-        if (requestHeader.correlationId() != responseHeader.correlationId())
-            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
-                                            + ") does not match request ("
-                                            + requestHeader.correlationId()
-                                            + ")");
-    }
-
-    /**
-     * Create a metadata request for the given topics
-     */
-    private InFlightRequest metadataRequest(int node, Set<String> topics) {
-        String[] ts = new String[topics.size()];
-        topics.toArray(ts);
-        Struct body = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id));
-        body.set("topics", topics.toArray());
-        RequestSend send = new RequestSend(node, new RequestHeader(ApiKeys.METADATA.id, clientId, correlation++), body);
-        return new InFlightRequest(true, send, null);
-    }
-
-    /**
-     * Collate the record batches into a list of produce requests on a per-node basis
-     */
-    private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches) {
-        Map<Integer, List<RecordBatch>> collated = new HashMap<Integer, List<RecordBatch>>();
-        for (RecordBatch batch : batches) {
-            Node node = cluster.leaderFor(batch.topicPartition);
-            List<RecordBatch> found = collated.get(node.id());
-            if (found == null) {
-                found = new ArrayList<RecordBatch>();
-                collated.put(node.id(), found);
-            }
-            found.add(batch);
-        }
-        List<InFlightRequest> requests = new ArrayList<InFlightRequest>(collated.size());
-        for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
-            requests.add(produceRequest(entry.getKey(), acks, requestTimeout, entry.getValue()));
-        return requests;
-    }
-
-    /**
-     * Create a produce request from the given record batches
-     */
-    private InFlightRequest produceRequest(int destination, short acks, int timeout, List<RecordBatch> batches) {
-        Map<TopicPartition, RecordBatch> batchesByPartition = new HashMap<TopicPartition, RecordBatch>();
-        Map<String, List<RecordBatch>> batchesByTopic = new HashMap<String, List<RecordBatch>>();
-        for (RecordBatch batch : batches) {
-            batchesByPartition.put(batch.topicPartition, batch);
-            List<RecordBatch> found = batchesByTopic.get(batch.topicPartition.topic());
-            if (found == null) {
-                found = new ArrayList<RecordBatch>();
-                batchesByTopic.put(batch.topicPartition.topic(), found);
-            }
-            found.add(batch);
-        }
-        Struct produce = new Struct(ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id));
-        produce.set("acks", acks);
-        produce.set("timeout", timeout);
-        List<Struct> topicDatas = new ArrayList<Struct>(batchesByTopic.size());
-        for (Map.Entry<String, List<RecordBatch>> entry : batchesByTopic.entrySet()) {
-            Struct topicData = produce.instance("topic_data");
-            topicData.set("topic", entry.getKey());
-            List<RecordBatch> parts = entry.getValue();
-            Object[] partitionData = new Object[parts.size()];
-            for (int i = 0; i < parts.size(); i++) {
-                ByteBuffer buffer = parts.get(i).records.buffer();
-                buffer.flip();
-                Struct part = topicData.instance("data")
-                                       .set("partition", parts.get(i).topicPartition.partition())
-                                       .set("record_set", buffer);
-                partitionData[i] = part;
-            }
-            topicData.set("data", partitionData);
-            topicDatas.add(topicData);
-        }
-        produce.set("topic_data", topicDatas.toArray());
-
-        RequestHeader header = new RequestHeader(ApiKeys.PRODUCE.id, clientId, correlation++);
-        RequestSend send = new RequestSend(destination, header, produce);
-        return new InFlightRequest(acks != 0, send, batchesByPartition);
-    }
-
-    /**
-     * Wake up the selector associated with this send thread
-     */
-    public void wakeup() {
-        this.selector.wakeup();
-    }
-
-    /**
-     * The states of a node connection
-     */
-    private static enum ConnectionState {
-        DISCONNECTED, CONNECTING, CONNECTED
-    }
-
-    /**
-     * The state of a node
-     */
-    private static final class NodeState {
-        private ConnectionState state;
-        private long lastConnectAttempt;
-
-        public NodeState(ConnectionState state, long lastConnectAttempt) {
-            this.state = state;
-            this.lastConnectAttempt = lastConnectAttempt;
-        }
-
-        public String toString() {
-            return "NodeState(" + state + ", " + lastConnectAttempt + ")";
-        }
-    }
-
-    /**
-     * An request that hasn't been fully processed yet
-     */
-    private static final class InFlightRequest {
-        public boolean expectResponse;
-        public Map<TopicPartition, RecordBatch> batches;
-        public RequestSend request;
-
-        /**
-         * @param expectResponse Should we expect a response message or is this request complete once it is sent?
-         * @param request The request
-         * @param batches The record batches contained in the request if it is a produce request
-         */
-        public InFlightRequest(boolean expectResponse, RequestSend request, Map<TopicPartition, RecordBatch> batches) {
-            this.batches = batches;
-            this.request = request;
-            this.expectResponse = expectResponse;
-        }
-    }
-
-    /**
-     * A set of outstanding request queues for each node that have not yet received responses
-     */
-    private static final class InFlightRequests {
-        private final Map<Integer, Deque<InFlightRequest>> requests = new HashMap<Integer, Deque<InFlightRequest>>();
-
-        /**
-         * Add the given request to the queue for the node it was directed to
-         */
-        public void add(InFlightRequest request) {
-            Deque<InFlightRequest> reqs = this.requests.get(request.request.destination());
-            if (reqs == null) {
-                reqs = new ArrayDeque<InFlightRequest>();
-                this.requests.put(request.request.destination(), reqs);
-            }
-            reqs.addFirst(request);
-        }
-
-        public Deque<InFlightRequest> requestQueue(int node) {
-            Deque<InFlightRequest> reqs = requests.get(node);
-            if (reqs == null || reqs.isEmpty())
-                throw new IllegalStateException("Response from server for which there are no in-flight requests.");
-            return reqs;
-        }
-
-        /**
-         * Get the oldest request (the one that that will be completed next) for the given node
-         */
-        public InFlightRequest nextCompleted(int node) {
-            return requestQueue(node).pollLast();
-        }
-
-        /**
-         * Can we send more requests to this node?
-         * 
-         * @param node Node in question
-         * @return true iff we have no requests still being sent to the given node
-         */
-        public boolean canSendMore(int node) {
-            Deque<InFlightRequest> queue = requests.get(node);
-            return queue == null || queue.isEmpty() || queue.peekFirst().request.complete();
-        }
-
-        /**
-         * Clear out all the in-flight requests for the given node and return them
-         * 
-         * @param node The node
-         * @return All the in-flight requests for that node that have been removed
-         */
-        public Iterable<InFlightRequest> clearAll(int node) {
-            Deque<InFlightRequest> reqs = requests.get(node);
-            if (reqs == null) {
-                return Collections.emptyList();
-            } else {
-                return requests.remove(node);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
deleted file mode 100644
index 973eb5e..0000000
--- a/clients/src/main/java/kafka/clients/tools/ProducerPerformance.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package kafka.clients.tools;
-
-import java.util.Arrays;
-import java.util.Properties;
-
-import kafka.clients.producer.Callback;
-import kafka.clients.producer.KafkaProducer;
-import kafka.clients.producer.ProducerConfig;
-import kafka.clients.producer.ProducerRecord;
-import kafka.clients.producer.RecordMetadata;
-import kafka.common.record.Records;
-
-public class ProducerPerformance {
-
-    public static void main(String[] args) throws Exception {
-        if (args.length != 3) {
-            System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url num_records record_size");
-            System.exit(1);
-        }
-        String url = args[0];
-        int numRecords = Integer.parseInt(args[1]);
-        int recordSize = Integer.parseInt(args[2]);
-        Properties props = new Properties();
-        props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1");
-        props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
-        props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
-        props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
-
-        KafkaProducer producer = new KafkaProducer(props);
-        Callback callback = new Callback() {
-            public void onCompletion(RecordMetadata metadata, Exception e) {
-                if (e != null)
-                    e.printStackTrace();
-            }
-        };
-        byte[] payload = new byte[recordSize];
-        Arrays.fill(payload, (byte) 1);
-        ProducerRecord record = new ProducerRecord("test", payload);
-        long start = System.currentTimeMillis();
-        long maxLatency = -1L;
-        long totalLatency = 0;
-        int reportingInterval = 1000000;
-        for (int i = 0; i < numRecords; i++) {
-            long sendStart = System.currentTimeMillis();
-            producer.send(record, callback);
-            long sendEllapsed = System.currentTimeMillis() - sendStart;
-            maxLatency = Math.max(maxLatency, sendEllapsed);
-            totalLatency += sendEllapsed;
-            if (i % reportingInterval == 0) {
-                System.out.printf("%d  max latency = %d ms, avg latency = %.5f\n",
-                                  i,
-                                  maxLatency,
-                                  (totalLatency / (double) reportingInterval));
-                totalLatency = 0L;
-                maxLatency = -1L;
-            }
-        }
-        long ellapsed = System.currentTimeMillis() - start;
-        double msgsSec = 1000.0 * numRecords / (double) ellapsed;
-        double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
-        System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).", numRecords, ellapsed, msgsSec, mbSec);
-        producer.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Cluster.java b/clients/src/main/java/kafka/common/Cluster.java
deleted file mode 100644
index 8d045d5..0000000
--- a/clients/src/main/java/kafka/common/Cluster.java
+++ /dev/null
@@ -1,123 +0,0 @@
-package kafka.common;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import kafka.common.utils.Utils;
-
-/**
- * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
- */
-public final class Cluster {
-
-    private final AtomicInteger counter = new AtomicInteger(0);
-    private final List<Node> nodes;
-    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
-    private final Map<String, List<PartitionInfo>> partitionsByTopic;
-
-    /**
-     * Create a new cluster with the given nodes and partitions
-     * @param nodes The nodes in the cluster
-     * @param partitions Information about a subset of the topic-partitions this cluster hosts
-     */
-    public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
-        // make a randomized, unmodifiable copy of the nodes
-        List<Node> copy = new ArrayList<Node>(nodes);
-        Collections.shuffle(copy);
-        this.nodes = Collections.unmodifiableList(copy);
-
-        // index the partitions by topic/partition for quick lookup
-        this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
-        for (PartitionInfo p : partitions)
-            this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
-
-        // index the partitions by topic and make the lists unmodifiable so we can handle them out in
-        // user-facing apis without risk of the client modifying the contents
-        HashMap<String, List<PartitionInfo>> parts = new HashMap<String, List<PartitionInfo>>();
-        for (PartitionInfo p : partitions) {
-            if (!parts.containsKey(p.topic()))
-                parts.put(p.topic(), new ArrayList<PartitionInfo>());
-            List<PartitionInfo> ps = parts.get(p.topic());
-            ps.add(p);
-        }
-        this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(parts.size());
-        for (Map.Entry<String, List<PartitionInfo>> entry : parts.entrySet())
-            this.partitionsByTopic.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
-    }
-
-    /**
-     * Create an empty cluster instance with no nodes and no topic-partitions.
-     */
-    public static Cluster empty() {
-        return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0));
-    }
-
-    /**
-     * Create a "bootstrap" cluster using the given list of host/ports
-     * @param addresses The addresses
-     * @return A cluster for these hosts/ports
-     */
-    public static Cluster bootstrap(List<InetSocketAddress> addresses) {
-        List<Node> nodes = new ArrayList<Node>();
-        int nodeId = Integer.MIN_VALUE;
-        for (InetSocketAddress address : addresses)
-            nodes.add(new Node(nodeId++, address.getHostName(), address.getPort()));
-        return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
-    }
-
-    /**
-     * @return The known set of nodes
-     */
-    public List<Node> nodes() {
-        return this.nodes;
-    }
-
-    /**
-     * Get the current leader for the given topic-partition
-     * @param topicPartition The topic and partition we want to know the leader for
-     * @return The node that is the leader for this topic-partition, or null if there is currently no leader
-     */
-    public Node leaderFor(TopicPartition topicPartition) {
-        PartitionInfo info = partitionsByTopicPartition.get(topicPartition);
-        if (info == null)
-            return null;
-        else
-            return info.leader();
-    }
-
-    /**
-     * Get the metadata for the specified partition
-     * @param topicPartition The topic and partition to fetch info for
-     * @return The metadata about the given topic and partition
-     */
-    public PartitionInfo partition(TopicPartition topicPartition) {
-        return partitionsByTopicPartition.get(topicPartition);
-    }
-
-    /**
-     * Get the list of partitions for this topic
-     * @param topic The topic name
-     * @return A list of partitions
-     */
-    public List<PartitionInfo> partitionsFor(String topic) {
-        return this.partitionsByTopic.get(topic);
-    }
-
-    /**
-     * Round-robin over the nodes in this cluster
-     */
-    public Node nextNode() {
-        int size = nodes.size();
-        if (size == 0)
-            throw new IllegalStateException("No known nodes.");
-        int idx = Utils.abs(counter.getAndIncrement()) % size;
-        return this.nodes.get(idx);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/Configurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Configurable.java b/clients/src/main/java/kafka/common/Configurable.java
deleted file mode 100644
index 1af9dd4..0000000
--- a/clients/src/main/java/kafka/common/Configurable.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package kafka.common;
-
-import java.util.Map;
-
-/**
- * A Mix-in style interface for classes that are instantiated by reflection and need to take configuration parameters
- */
-public interface Configurable {
-
-    /**
-     * Configure this class with the given key-value pairs
-     */
-    public void configure(Map<String, ?> configs);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/KafkaException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/KafkaException.java b/clients/src/main/java/kafka/common/KafkaException.java
deleted file mode 100644
index 7182cac..0000000
--- a/clients/src/main/java/kafka/common/KafkaException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package kafka.common;
-
-/**
- * The base class of all other Kafka exceptions
- */
-public class KafkaException extends RuntimeException {
-
-    private final static long serialVersionUID = 1L;
-
-    public KafkaException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public KafkaException(String message) {
-        super(message);
-    }
-
-    public KafkaException(Throwable cause) {
-        super(cause);
-    }
-
-    public KafkaException() {
-        super();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/Metric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Metric.java b/clients/src/main/java/kafka/common/Metric.java
deleted file mode 100644
index c29e331..0000000
--- a/clients/src/main/java/kafka/common/Metric.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common;
-
-/**
- * A numerical metric tracked for monitoring purposes
- */
-public interface Metric {
-
-    /**
-     * A unique name for this metric
-     */
-    public String name();
-
-    /**
-     * A description of what is measured...this will be "" if no description was given
-     */
-    public String description();
-
-    /**
-     * The value of the metric
-     */
-    public double value();
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/Node.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/Node.java b/clients/src/main/java/kafka/common/Node.java
deleted file mode 100644
index 81fc907..0000000
--- a/clients/src/main/java/kafka/common/Node.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package kafka.common;
-
-/**
- * Information about a Kafka node
- */
-public class Node {
-
-    private final int id;
-    private final String host;
-    private final int port;
-
-    public Node(int id, String host, int port) {
-        super();
-        this.id = id;
-        this.host = host;
-        this.port = port;
-    }
-
-    /**
-     * The node id of this node
-     */
-    public int id() {
-        return id;
-    }
-
-    /**
-     * The host name for this node
-     */
-    public String host() {
-        return host;
-    }
-
-    /**
-     * The port for this node
-     */
-    public int port() {
-        return port;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((host == null) ? 0 : host.hashCode());
-        result = prime * result + id;
-        result = prime * result + port;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        Node other = (Node) obj;
-        if (host == null) {
-            if (other.host != null)
-                return false;
-        } else if (!host.equals(other.host))
-            return false;
-        if (id != other.id)
-            return false;
-        if (port != other.port)
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "Node(" + id + ", " + host + ", " + port + ")";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/PartitionInfo.java b/clients/src/main/java/kafka/common/PartitionInfo.java
deleted file mode 100644
index 0e50ed7..0000000
--- a/clients/src/main/java/kafka/common/PartitionInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package kafka.common;
-
-/**
- * Information about a topic-partition.
- */
-public class PartitionInfo {
-
-    private final String topic;
-    private final int partition;
-    private final Node leader;
-    private final Node[] replicas;
-    private final Node[] inSyncReplicas;
-
-    public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
-        this.topic = topic;
-        this.partition = partition;
-        this.leader = leader;
-        this.replicas = replicas;
-        this.inSyncReplicas = inSyncReplicas;
-    }
-
-    /**
-     * The topic name
-     */
-    public String topic() {
-        return topic;
-    }
-
-    /**
-     * The partition id
-     */
-    public int partition() {
-        return partition;
-    }
-
-    /**
-     * The node id of the node currently acting as a leader for this partition or -1 if there is no leader
-     */
-    public Node leader() {
-        return leader;
-    }
-
-    /**
-     * The complete set of replicas for this partition regardless of whether they are alive or up-to-date
-     */
-    public Node[] replicas() {
-        return replicas;
-    }
-
-    /**
-     * The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
-     * the leader should fail
-     */
-    public Node[] inSyncReplicas() {
-        return inSyncReplicas;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/TopicPartition.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/TopicPartition.java b/clients/src/main/java/kafka/common/TopicPartition.java
deleted file mode 100644
index e7be96c..0000000
--- a/clients/src/main/java/kafka/common/TopicPartition.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package kafka.common;
-
-/**
- * A topic name and partition number
- */
-public final class TopicPartition {
-
-    private int hash = 0;
-    private final int partition;
-    private final String topic;
-
-    public TopicPartition(String topic, int partition) {
-        this.partition = partition;
-        this.topic = topic;
-    }
-
-    public int partition() {
-        return partition;
-    }
-
-    public String topic() {
-        return topic;
-    }
-
-    @Override
-    public int hashCode() {
-        if (hash != 0)
-            return hash;
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + partition;
-        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
-        this.hash = result;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        TopicPartition other = (TopicPartition) obj;
-        if (partition != other.partition)
-            return false;
-        if (topic == null) {
-            if (other.topic != null)
-                return false;
-        } else if (!topic.equals(other.topic))
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return topic + "-" + partition;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/config/AbstractConfig.java b/clients/src/main/java/kafka/common/config/AbstractConfig.java
deleted file mode 100644
index 5db302d..0000000
--- a/clients/src/main/java/kafka/common/config/AbstractConfig.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package kafka.common.config;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import kafka.common.Configurable;
-import kafka.common.KafkaException;
-import kafka.common.utils.Utils;
-
-/**
- * A convenient base class for configurations to extend.
- * <p>
- * This class holds both the original configuration that was provided as well as the parsed
- */
-public class AbstractConfig {
-
-    private final Set<String> used;
-    private final Map<String, Object> values;
-    private final Map<String, ?> originals;
-
-    @SuppressWarnings("unchecked")
-    public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
-        /* check that all the keys are really strings */
-        for (Object key : originals.keySet())
-            if (!(key instanceof String))
-                throw new ConfigException(key.toString(), originals.get(key), "Key must be a string.");
-        this.originals = (Map<String, ?>) originals;
-        this.values = definition.parse(this.originals);
-        this.used = Collections.synchronizedSet(new HashSet<String>());
-    }
-
-    protected Object get(String key) {
-        if (!values.containsKey(key))
-            throw new ConfigException(String.format("Unknown configuration '%s'", key));
-        used.add(key);
-        return values.get(key);
-    }
-
-    public int getInt(String key) {
-        return (Integer) get(key);
-    }
-
-    public Long getLong(String key) {
-        return (Long) get(key);
-    }
-
-    @SuppressWarnings("unchecked")
-    public List<String> getList(String key) {
-        return (List<String>) get(key);
-    }
-
-    public boolean getBoolean(String key) {
-        return (Boolean) get(key);
-    }
-
-    public String getString(String key) {
-        return (String) get(key);
-    }
-
-    public Class<?> getClass(String key) {
-        return (Class<?>) get(key);
-    }
-
-    public Set<String> unused() {
-        Set<String> keys = new HashSet<String>(originals.keySet());
-        keys.remove(used);
-        return keys;
-    }
-
-    /**
-     * Get a configured instance of the give class specified by the given configuration key. If the object implements
-     * Configurable configure it using the configuration.
-     * 
-     * @param key The configuration key for the class
-     * @param t The interface the class should implement
-     * @return A configured instance of the class
-     */
-    public <T> T getConfiguredInstance(String key, Class<T> t) {
-        Class<?> c = getClass(key);
-        if (c == null)
-            return null;
-        Object o = Utils.newInstance(c);
-        if (!t.isInstance(o))
-            throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
-        if (o instanceof Configurable)
-            ((Configurable) o).configure(this.originals);
-        return t.cast(o);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/config/ConfigDef.java b/clients/src/main/java/kafka/common/config/ConfigDef.java
deleted file mode 100644
index 2507c9c..0000000
--- a/clients/src/main/java/kafka/common/config/ConfigDef.java
+++ /dev/null
@@ -1,253 +0,0 @@
-package kafka.common.config;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class is used for specifying the set of expected configurations, their type, their defaults, their
- * documentation, and any special validation logic used for checking the correctness of the values the user provides.
- * <p>
- * Usage of this class looks something like this:
- * 
- * <pre>
- * ConfigDef defs = new ConfigDef();
- * defs.define(&quot;config_name&quot;, Type.STRING, &quot;default string value&quot;, &quot;This configuration is used for blah blah blah.&quot;);
- * defs.define(&quot;another_config_name&quot;, Type.INT, 42, Range.atLeast(0), &quot;More documentation on this config&quot;);
- * 
- * Properties props = new Properties();
- * props.setProperty(&quot;config_name&quot;, &quot;some value&quot;);
- * Map&lt;String, Object&gt; configs = defs.parse(props);
- * 
- * String someConfig = (String) configs.get(&quot;config_name&quot;); // will return &quot;some value&quot;
- * int anotherConfig = (Integer) configs.get(&quot;another_config_name&quot;); // will return default value of 42
- * </pre>
- * 
- * This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
- * functionality for accessing configs.
- */
-public class ConfigDef {
-
-    private static final Object NO_DEFAULT_VALUE = new Object();
-
-    private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
-
-    /**
-     * Define a new configuration
-     * @param name The name of the config parameter
-     * @param type The type of the config
-     * @param defaultValue The default value to use if this config isn't present
-     * @param validator A validator to use in checking the correctness of the config
-     * @param documentation The documentation string for the config
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, String documentation) {
-        if (configKeys.containsKey(name))
-            throw new ConfigException("Configuration " + name + " is defined twice.");
-        Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
-        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, documentation));
-        return this;
-    }
-
-    /**
-     * Define a new configuration with no special validation logic
-     * @param name The name of the config parameter
-     * @param type The type of the config
-     * @param defaultValue The default value to use if this config isn't present
-     * @param documentation The documentation string for the config
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Object defaultValue, String documentation) {
-        return define(name, type, defaultValue, null, documentation);
-    }
-
-    /**
-     * Define a required parameter with no default value
-     * @param name The name of the config parameter
-     * @param type The type of the config
-     * @param validator A validator to use in checking the correctness of the config
-     * @param documentation The documentation string for the config
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, Validator validator, String documentation) {
-        return define(name, type, NO_DEFAULT_VALUE, validator, documentation);
-    }
-
-    /**
-     * Define a required parameter with no default value and no special validation logic
-     * @param name The name of the config parameter
-     * @param type The type of the config
-     * @param documentation The documentation string for the config
-     * @return This ConfigDef so you can chain calls
-     */
-    public ConfigDef define(String name, Type type, String documentation) {
-        return define(name, type, NO_DEFAULT_VALUE, null, documentation);
-    }
-
-    /**
-     * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
-     * that the keys of the map are strings, but the values can either be strings or they may already be of the
-     * appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
-     * programmatically constructed map.
-     * @param props The configs to parse and validate
-     * @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
-     *         the appropriate type (int, string, etc)
-     */
-    public Map<String, Object> parse(Map<?, ?> props) {
-        /* parse all known keys */
-        Map<String, Object> values = new HashMap<String, Object>();
-        for (ConfigKey key : configKeys.values()) {
-            Object value;
-            if (props.containsKey(key.name))
-                value = parseType(key.name, props.get(key.name), key.type);
-            else if (key.defaultValue == NO_DEFAULT_VALUE)
-                throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
-            else
-                value = key.defaultValue;
-            values.put(key.name, value);
-        }
-        return values;
-    }
-
-    /**
-     * Parse a value according to its expected type.
-     * @param name The config name
-     * @param value The config value
-     * @param type The expected type
-     * @return The parsed object
-     */
-    private Object parseType(String name, Object value, Type type) {
-        try {
-            String trimmed = null;
-            if (value instanceof String)
-                trimmed = ((String) value).trim();
-            switch (type) {
-                case BOOLEAN:
-                    if (value instanceof String)
-                        return Boolean.parseBoolean(trimmed);
-                    else if (value instanceof Boolean)
-                        return value;
-                    else
-                        throw new ConfigException(name, value, "Expected value to be either true or false");
-                case STRING:
-                    if (value instanceof String)
-                        return trimmed;
-                    else
-                        throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
-                case INT:
-                    if (value instanceof Integer) {
-                        return (Integer) value;
-                    } else if (value instanceof String) {
-                        return Integer.parseInt(trimmed);
-                    } else {
-                        throw new ConfigException(name, value, "Expected value to be an number.");
-                    }
-                case LONG:
-                    if (value instanceof Integer)
-                        return ((Integer) value).longValue();
-                    if (value instanceof Long)
-                        return (Long) value;
-                    else if (value instanceof String)
-                        return Long.parseLong(trimmed);
-                    else
-                        throw new ConfigException(name, value, "Expected value to be an number.");
-                case DOUBLE:
-                    if (value instanceof Number)
-                        return ((Number) value).doubleValue();
-                    else if (value instanceof String)
-                        return Double.parseDouble(trimmed);
-                    else
-                        throw new ConfigException(name, value, "Expected value to be an number.");
-                case LIST:
-                    if (value instanceof List)
-                        return (List<?>) value;
-                    else if (value instanceof String)
-                        return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
-                    else
-                        throw new ConfigException(name, value, "Expected a comma seperated list.");
-                case CLASS:
-                    if (value instanceof Class)
-                        return (Class<?>) value;
-                    else if (value instanceof String)
-                        return Class.forName(trimmed);
-                    else
-                        throw new ConfigException(name, value, "Expected a Class instance or class name.");
-                default:
-                    throw new IllegalStateException("Unknown type.");
-            }
-        } catch (NumberFormatException e) {
-            throw new ConfigException(name, value, "Not a number of type " + type);
-        } catch (ClassNotFoundException e) {
-            throw new ConfigException(name, value, "Class " + value + " could not be found.");
-        }
-    }
-
-    /**
-     * The config types
-     */
-    public enum Type {
-        BOOLEAN, STRING, INT, LONG, DOUBLE, LIST, CLASS;
-    }
-
-    /**
-     * Validation logic the user may provide
-     */
-    public interface Validator {
-        public void ensureValid(String name, Object o);
-    }
-
-    /**
-     * Validation logic for numeric ranges
-     */
-    public static class Range implements Validator {
-        private final Number min;
-        private final Number max;
-
-        private Range(Number min, Number max) {
-            this.min = min;
-            this.max = max;
-        }
-
-        /**
-         * A numeric range that checks only the lower bound
-         * @param min The minimum acceptable value
-         */
-        public static Range atLeast(Number min) {
-            return new Range(min, Double.MAX_VALUE);
-        }
-
-        /**
-         * A numeric range that checks both the upper and lower bound
-         */
-        public static Range between(Number min, Number max) {
-            return new Range(min, max);
-        }
-
-        public void ensureValid(String name, Object o) {
-            Number n = (Number) o;
-            if (n.doubleValue() < min.doubleValue() || n.doubleValue() > max.doubleValue())
-                throw new ConfigException(name, o, "Value must be in the range [" + min + ", " + max + "]");
-        }
-    }
-
-    private static class ConfigKey {
-        public final String name;
-        public final Type type;
-        public final String documentation;
-        public final Object defaultValue;
-        public final Validator validator;
-
-        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, String documentation) {
-            super();
-            this.name = name;
-            this.type = type;
-            this.defaultValue = defaultValue;
-            this.validator = validator;
-            if (this.validator != null)
-                this.validator.ensureValid(name, defaultValue);
-            this.documentation = documentation;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/config/ConfigException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/config/ConfigException.java b/clients/src/main/java/kafka/common/config/ConfigException.java
deleted file mode 100644
index fad141e..0000000
--- a/clients/src/main/java/kafka/common/config/ConfigException.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package kafka.common.config;
-
-import kafka.common.KafkaException;
-
-/**
- * Thrown if the user supplies an invalid configuration
- */
-public class ConfigException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public ConfigException(String message) {
-        super(message);
-    }
-
-    public ConfigException(String name, Object value) {
-        this(name, value, null);
-    }
-
-    public ConfigException(String name, Object value, String message) {
-        super("Invalid value " + value + " for configuration " + name + (message == null ? "" : ": " + message));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/ApiException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/ApiException.java b/clients/src/main/java/kafka/common/errors/ApiException.java
deleted file mode 100644
index 28f5411..0000000
--- a/clients/src/main/java/kafka/common/errors/ApiException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package kafka.common.errors;
-
-import kafka.common.KafkaException;
-
-/**
- * Any API exception that is part of the public protocol and should be a subclass of this class and be part of this
- * package.
- */
-public abstract class ApiException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public ApiException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ApiException(String message) {
-        super(message);
-    }
-
-    public ApiException(Throwable cause) {
-        super(cause);
-    }
-
-    public ApiException() {
-        super();
-    }
-
-    /* avoid the expensive and useless stack trace for api exceptions */
-    @Override
-    public Throwable fillInStackTrace() {
-        return this;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/CorruptRecordException.java b/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
deleted file mode 100644
index 492f2e3..0000000
--- a/clients/src/main/java/kafka/common/errors/CorruptRecordException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class CorruptRecordException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public CorruptRecordException() {
-        super("This message has failed it's CRC checksum or is otherwise corrupt.");
-    }
-
-    public CorruptRecordException(String message) {
-        super(message);
-    }
-
-    public CorruptRecordException(Throwable cause) {
-        super(cause);
-    }
-
-    public CorruptRecordException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java b/clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java
deleted file mode 100644
index d7b86f8..0000000
--- a/clients/src/main/java/kafka/common/errors/LeaderNotAvailableException.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package kafka.common.errors;
-
-public class LeaderNotAvailableException extends RetryableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public LeaderNotAvailableException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public LeaderNotAvailableException(String message) {
-        super(message);
-    }
-
-    public LeaderNotAvailableException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/NetworkException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/NetworkException.java b/clients/src/main/java/kafka/common/errors/NetworkException.java
deleted file mode 100644
index daedbf4..0000000
--- a/clients/src/main/java/kafka/common/errors/NetworkException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class NetworkException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public NetworkException() {
-        super();
-    }
-
-    public NetworkException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public NetworkException(String message) {
-        super(message);
-    }
-
-    public NetworkException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java
deleted file mode 100644
index 5d750fd..0000000
--- a/clients/src/main/java/kafka/common/errors/NotLeaderForPartitionException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class NotLeaderForPartitionException extends RetryableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public NotLeaderForPartitionException() {
-        super();
-    }
-
-    public NotLeaderForPartitionException(String message) {
-        super(message);
-    }
-
-    public NotLeaderForPartitionException(Throwable cause) {
-        super(cause);
-    }
-
-    public NotLeaderForPartitionException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java b/clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java
deleted file mode 100644
index ab9cd62..0000000
--- a/clients/src/main/java/kafka/common/errors/OffsetMetadataTooLarge.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package kafka.common.errors;
-
-public class OffsetMetadataTooLarge extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public OffsetMetadataTooLarge() {
-    }
-
-    public OffsetMetadataTooLarge(String message) {
-        super(message);
-    }
-
-    public OffsetMetadataTooLarge(Throwable cause) {
-        super(cause);
-    }
-
-    public OffsetMetadataTooLarge(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java b/clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java
deleted file mode 100644
index 93210cd..0000000
--- a/clients/src/main/java/kafka/common/errors/OffsetOutOfRangeException.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package kafka.common.errors;
-
-public class OffsetOutOfRangeException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public OffsetOutOfRangeException() {
-    }
-
-    public OffsetOutOfRangeException(String message) {
-        super(message);
-    }
-
-    public OffsetOutOfRangeException(Throwable cause) {
-        super(cause);
-    }
-
-    public OffsetOutOfRangeException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java b/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
deleted file mode 100644
index bef4293..0000000
--- a/clients/src/main/java/kafka/common/errors/RecordTooLargeException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class RecordTooLargeException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public RecordTooLargeException() {
-        super();
-    }
-
-    public RecordTooLargeException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public RecordTooLargeException(String message) {
-        super(message);
-    }
-
-    public RecordTooLargeException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/RetryableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/RetryableException.java b/clients/src/main/java/kafka/common/errors/RetryableException.java
deleted file mode 100644
index 5aa8684..0000000
--- a/clients/src/main/java/kafka/common/errors/RetryableException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package kafka.common.errors;
-
-/**
- * A retryable exception is an exception that is safe to retry. To be retryable an exception should be
- * <ol>
- * <li>Transient, there is no point retrying a error due to a non-existant topic or message too large
- * <li>Idempotent, the exception is known to not change any state on the server
- * </ol>
- * A client may choose to retry any request they like, but exceptions extending this class are always safe and sane to
- * retry.
- */
-public abstract class RetryableException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public RetryableException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public RetryableException(String message) {
-        super(message);
-    }
-
-    public RetryableException(Throwable cause) {
-        super(cause);
-    }
-
-    public RetryableException() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/TimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/TimeoutException.java b/clients/src/main/java/kafka/common/errors/TimeoutException.java
deleted file mode 100644
index da27a98..0000000
--- a/clients/src/main/java/kafka/common/errors/TimeoutException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package kafka.common.errors;
-
-public class TimeoutException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public TimeoutException() {
-        super();
-    }
-
-    public TimeoutException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public TimeoutException(String message) {
-        super(message);
-    }
-
-    public TimeoutException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/UnknownServerException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/UnknownServerException.java b/clients/src/main/java/kafka/common/errors/UnknownServerException.java
deleted file mode 100644
index d0b56d6..0000000
--- a/clients/src/main/java/kafka/common/errors/UnknownServerException.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package kafka.common.errors;
-
-public class UnknownServerException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public UnknownServerException() {
-    }
-
-    public UnknownServerException(String message) {
-        super(message);
-    }
-
-    public UnknownServerException(Throwable cause) {
-        super(cause);
-    }
-
-    public UnknownServerException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java b/clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
deleted file mode 100644
index 5c1ca12..0000000
--- a/clients/src/main/java/kafka/common/errors/UnknownTopicOrPartitionException.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package kafka.common.errors;
-
-public class UnknownTopicOrPartitionException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public UnknownTopicOrPartitionException() {
-    }
-
-    public UnknownTopicOrPartitionException(String message) {
-        super(message);
-    }
-
-    public UnknownTopicOrPartitionException(Throwable throwable) {
-        super(throwable);
-    }
-
-    public UnknownTopicOrPartitionException(String message, Throwable throwable) {
-        super(message, throwable);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/CompoundStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/CompoundStat.java b/clients/src/main/java/kafka/common/metrics/CompoundStat.java
deleted file mode 100644
index 5541e32..0000000
--- a/clients/src/main/java/kafka/common/metrics/CompoundStat.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package kafka.common.metrics;
-
-import java.util.List;
-
-/**
- * A compound stat is a stat where a single measurement and associated data structure feeds many metrics. This is the
- * example for a histogram which has many associated percentiles.
- */
-public interface CompoundStat extends Stat {
-
-    public List<NamedMeasurable> stats();
-
-    public static class NamedMeasurable {
-
-        private final String name;
-        private final String description;
-        private final Measurable stat;
-
-        public NamedMeasurable(String name, String description, Measurable stat) {
-            super();
-            this.name = name;
-            this.description = description;
-            this.stat = stat;
-        }
-
-        public String name() {
-            return name;
-        }
-
-        public String description() {
-            return description;
-        }
-
-        public Measurable stat() {
-            return stat;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/kafka/common/metrics/JmxReporter.java
deleted file mode 100644
index a0cee01..0000000
--- a/clients/src/main/java/kafka/common/metrics/JmxReporter.java
+++ /dev/null
@@ -1,184 +0,0 @@
-package kafka.common.metrics;
-
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.management.Attribute;
-import javax.management.AttributeList;
-import javax.management.AttributeNotFoundException;
-import javax.management.DynamicMBean;
-import javax.management.InvalidAttributeValueException;
-import javax.management.JMException;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanException;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.ReflectionException;
-
-import kafka.common.KafkaException;
-
-/**
- * Register metrics in JMX as dynamic mbeans based on the metric names
- */
-public class JmxReporter implements MetricsReporter {
-
-    private final String prefix;
-    private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
-
-    public JmxReporter() {
-        this("");
-    }
-
-    /**
-     * Create a JMX reporter that prefixes all metrics with the given string.
-     */
-    public JmxReporter(String prefix) {
-        this.prefix = prefix;
-    }
-
-    @Override
-    public synchronized void init(List<KafkaMetric> metrics) {
-        for (KafkaMetric metric : metrics)
-            addAttribute(metric);
-        for (KafkaMbean mbean : mbeans.values())
-            reregister(mbean);
-
-    }
-
-    @Override
-    public synchronized void metricChange(KafkaMetric metric) {
-        KafkaMbean mbean = addAttribute(metric);
-        reregister(mbean);
-    }
-
-    private KafkaMbean addAttribute(KafkaMetric metric) {
-        try {
-            String[] names = split(prefix + metric.name());
-            String qualifiedName = names[0] + "." + names[1];
-            if (!this.mbeans.containsKey(qualifiedName))
-                mbeans.put(qualifiedName, new KafkaMbean(names[0], names[1]));
-            KafkaMbean mbean = this.mbeans.get(qualifiedName);
-            mbean.setAttribute(names[2], metric);
-            return mbean;
-        } catch (JMException e) {
-            throw new KafkaException("Error creating mbean attribute " + metric.name(), e);
-        }
-    }
-
-    public synchronized void close() {
-        for (KafkaMbean mbean : this.mbeans.values())
-            unregister(mbean);
-
-    }
-
-    private void unregister(KafkaMbean mbean) {
-        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-        try {
-            if (server.isRegistered(mbean.name()))
-                server.unregisterMBean(mbean.name());
-        } catch (JMException e) {
-            throw new KafkaException("Error unregistering mbean", e);
-        }
-    }
-
-    private void reregister(KafkaMbean mbean) {
-        unregister(mbean);
-        try {
-            ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name());
-        } catch (JMException e) {
-            throw new KafkaException("Error registering mbean " + mbean.name(), e);
-        }
-    }
-
-    private String[] split(String name) {
-        int attributeStart = name.lastIndexOf('.');
-        if (attributeStart < 0)
-            throw new IllegalArgumentException("No MBean name in metric name: " + name);
-        String attributeName = name.substring(attributeStart + 1, name.length());
-        String remainder = name.substring(0, attributeStart);
-        int beanStart = remainder.lastIndexOf('.');
-        if (beanStart < 0)
-            return new String[] { "", remainder, attributeName };
-        String packageName = remainder.substring(0, beanStart);
-        String beanName = remainder.substring(beanStart + 1, remainder.length());
-        return new String[] { packageName, beanName, attributeName };
-    }
-
-    private static class KafkaMbean implements DynamicMBean {
-        private final String beanName;
-        private final ObjectName objectName;
-        private final Map<String, KafkaMetric> metrics;
-
-        public KafkaMbean(String packageName, String beanName) throws MalformedObjectNameException {
-            this.beanName = beanName;
-            this.metrics = new HashMap<String, KafkaMetric>();
-            this.objectName = new ObjectName(packageName + ":type=" + beanName);
-        }
-
-        public ObjectName name() {
-            return objectName;
-        }
-
-        public void setAttribute(String name, KafkaMetric metric) {
-            this.metrics.put(name, metric);
-        }
-
-        @Override
-        public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
-            if (this.metrics.containsKey(name))
-                return this.metrics.get(name).value();
-            else
-                throw new AttributeNotFoundException("Could not find attribute " + name);
-        }
-
-        @Override
-        public AttributeList getAttributes(String[] names) {
-            try {
-                AttributeList list = new AttributeList();
-                for (String name : names)
-                    list.add(new Attribute(name, getAttribute(name)));
-                return list;
-            } catch (Exception e) {
-                e.printStackTrace();
-                return new AttributeList();
-            }
-        }
-
-        @Override
-        public MBeanInfo getMBeanInfo() {
-            MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()];
-            int i = 0;
-            for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
-                String attribute = entry.getKey();
-                KafkaMetric metric = entry.getValue();
-                attrs[i] = new MBeanAttributeInfo(attribute, double.class.getName(), metric.description(), true, false, false);
-                i += 1;
-            }
-            return new MBeanInfo(beanName, "", attrs, null, null, null);
-        }
-
-        @Override
-        public Object invoke(String name, Object[] params, String[] sig) throws MBeanException, ReflectionException {
-            throw new UnsupportedOperationException("Set not allowed.");
-        }
-
-        @Override
-        public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
-                                                     InvalidAttributeValueException,
-                                                     MBeanException,
-                                                     ReflectionException {
-            throw new UnsupportedOperationException("Set not allowed.");
-        }
-
-        @Override
-        public AttributeList setAttributes(AttributeList list) {
-            throw new UnsupportedOperationException("Set not allowed.");
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/kafka/common/metrics/KafkaMetric.java
deleted file mode 100644
index 33212b0..0000000
--- a/clients/src/main/java/kafka/common/metrics/KafkaMetric.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package kafka.common.metrics;
-
-import kafka.common.Metric;
-import kafka.common.utils.Time;
-
-public final class KafkaMetric implements Metric {
-
-    private final String name;
-    private final String description;
-    private final Object lock;
-    private final Time time;
-    private final Measurable measurable;
-    private MetricConfig config;
-
-    KafkaMetric(Object lock, String name, String description, Measurable measurable, MetricConfig config, Time time) {
-        super();
-        this.name = name;
-        this.description = description;
-        this.lock = lock;
-        this.measurable = measurable;
-        this.config = config;
-        this.time = time;
-    }
-
-    MetricConfig config() {
-        return this.config;
-    }
-
-    @Override
-    public String name() {
-        return this.name;
-    }
-
-    @Override
-    public String description() {
-        return this.description;
-    }
-
-    @Override
-    public double value() {
-        synchronized (this.lock) {
-            return value(time.nanoseconds());
-        }
-    }
-
-    double value(long time) {
-        return this.measurable.measure(config, time);
-    }
-
-    public void config(MetricConfig config) {
-        synchronized (lock) {
-            this.config = config;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/Measurable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/Measurable.java b/clients/src/main/java/kafka/common/metrics/Measurable.java
deleted file mode 100644
index f5511ea..0000000
--- a/clients/src/main/java/kafka/common/metrics/Measurable.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package kafka.common.metrics;
-
-/**
- * A measurable quantity that can be registered as a metric
- */
-public interface Measurable {
-
-    /**
-     * Measure this quantity and return the result as a double
-     * @param config The configuration for this metric
-     * @param now The time the measurement is being taken
-     * @return The measured value
-     */
-    public double measure(MetricConfig config, long now);
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/MeasurableStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/MeasurableStat.java b/clients/src/main/java/kafka/common/metrics/MeasurableStat.java
deleted file mode 100644
index 74d3bb4..0000000
--- a/clients/src/main/java/kafka/common/metrics/MeasurableStat.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package kafka.common.metrics;
-
-/**
- * A MeasurableStat is a {@link Stat} that is also {@link Measurable} (i.e. can produce a single floating point value).
- * This is the interface used for most of the simple statistics such as {@link kafka.common.metrics.stats.Avg},
- * {@link kafka.common.metrics.stats.Max}, {@link kafka.common.metrics.stats.Count}, etc.
- */
-public interface MeasurableStat extends Stat, Measurable {
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa6339c1/clients/src/main/java/kafka/common/metrics/MetricConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/kafka/common/metrics/MetricConfig.java b/clients/src/main/java/kafka/common/metrics/MetricConfig.java
deleted file mode 100644
index 92f67f0..0000000
--- a/clients/src/main/java/kafka/common/metrics/MetricConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package kafka.common.metrics;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Configuration values for metrics
- */
-public class MetricConfig {
-
-    private Quota quota;
-    private int samples;
-    private long eventWindow;
-    private long timeWindowNs;
-    private TimeUnit unit;
-
-    public MetricConfig() {
-        super();
-        this.quota = null;
-        this.samples = 2;
-        this.eventWindow = Long.MAX_VALUE;
-        this.timeWindowNs = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
-        this.unit = TimeUnit.SECONDS;
-    }
-
-    public Quota quota() {
-        return this.quota;
-    }
-
-    public MetricConfig quota(Quota quota) {
-        this.quota = quota;
-        return this;
-    }
-
-    public long eventWindow() {
-        return eventWindow;
-    }
-
-    public MetricConfig eventWindow(long window) {
-        this.eventWindow = window;
-        return this;
-    }
-
-    public long timeWindowNs() {
-        return timeWindowNs;
-    }
-
-    public MetricConfig timeWindow(long window, TimeUnit unit) {
-        this.timeWindowNs = TimeUnit.NANOSECONDS.convert(window, unit);
-        return this;
-    }
-
-    public int samples() {
-        return this.samples;
-    }
-
-    public MetricConfig samples(int samples) {
-        if (samples < 1)
-            throw new IllegalArgumentException("The number of samples must be at least 1.");
-        this.samples = samples;
-        return this;
-    }
-
-    public TimeUnit timeUnit() {
-        return unit;
-    }
-
-    public MetricConfig timeUnit(TimeUnit unit) {
-        this.unit = unit;
-        return this;
-    }
-}


Mime
View raw message