kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject git commit: KAFKA-1337 Rationalize the producer configs.
Date Fri, 04 Apr 2014 20:56:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 372182110 -> 640f3b05e


KAFKA-1337 Rationalize the producer configs.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/640f3b05
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/640f3b05
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/640f3b05

Branch: refs/heads/trunk
Commit: 640f3b05efd00d5e1d7bfc8fe9c90c7aadc3d087
Parents: 3721821
Author: Jay Kreps <jay.kreps@gmail.com>
Authored: Thu Mar 27 12:12:32 2014 -0700
Committer: Jay Kreps <jay.kreps@gmail.com>
Committed: Fri Apr 4 13:55:47 2014 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  38 ++-
 .../kafka/clients/producer/ProducerConfig.java  | 285 ++++++++++---------
 .../clients/tools/ProducerPerformance.java      |  10 +-
 .../kafka/common/config/AbstractConfig.java     |  21 ++
 .../apache/kafka/common/config/ConfigDef.java   | 130 +++++++--
 .../kafka/common/metrics/JmxReporter.java       |   6 +-
 .../kafka/common/metrics/MetricsReporter.java   |  30 +-
 .../kafka/common/config/ConfigDefTest.java      |  52 ++--
 .../kafka/api/ProducerCompressionTest.scala     |   2 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |  14 +-
 .../kafka/api/ProducerSendTest.scala            |  10 +-
 .../scala/kafka/perf/ProducerPerformance.scala  |  17 +-
 .../config/mirror_producer.properties           |   7 +-
 13 files changed, 376 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e6eb5b0..a6423f4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -95,28 +95,30 @@ public class KafkaProducer implements Producer {
     private KafkaProducer(ProducerConfig config) {
         log.trace("Starting the Kafka producer");
         Time time = new SystemTime();
-        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES))
-                                                      .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS),
+        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
+                                                      .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                                                                   TimeUnit.MILLISECONDS);
         String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
         String jmxPrefix = "kafka.producer." + (clientId.length() > 0 ? clientId + "." : "");
-        List<MetricsReporter> reporters = Collections.singletonList((MetricsReporter) new JmxReporter(jmxPrefix));
+        List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                                                                        MetricsReporter.class);
+        reporters.add(new JmxReporter(jmxPrefix));
         this.metrics = new Metrics(metricConfig, reporters, time);
         this.partitioner = new Partitioner();
+        long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
         this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-        this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG),
-                                     config.getLong(ProducerConfig.METADATA_EXPIRY_CONFIG));
+        this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
         this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
-        this.totalMemorySize = config.getLong(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
+        this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
         this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
-        this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
+        this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                                                  this.totalMemorySize,
                                                  config.getLong(ProducerConfig.LINGER_MS_CONFIG),
-                                                 config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
+                                                 retryBackoffMs,
                                                  config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
                                                  metrics,
                                                  time);
-        List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG));
+        List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
         this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
         this.sender = new Sender(new Selector(this.metrics, time),
                                  this.metadata,
@@ -124,9 +126,9 @@ public class KafkaProducer implements Producer {
                                  clientId,
                                  config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                                  config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
-                                 (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG),
-                                 config.getInt(ProducerConfig.MAX_RETRIES_CONFIG),
-                                 config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG),
+                                 (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
+                                 config.getInt(ProducerConfig.RETRIES_CONFIG),
+                                 config.getInt(ProducerConfig.TIMEOUT_CONFIG),
                                  config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                                  config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                                  this.metrics,
@@ -140,13 +142,21 @@ public class KafkaProducer implements Producer {
         log.debug("Kafka producer started");
     }
 
+    private static int parseAcks(String acksString) {
+        try {
+            return acksString.trim().toLowerCase().equals("all") ? -1 : Integer.parseInt(acksString.trim());
+        } catch (NumberFormatException e) {
+            throw new ConfigException("Invalid configuration value for 'acks': " + acksString);
+        }
+    }
+
     private static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
         List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
         for (String url : urls) {
             if (url != null && url.length() > 0) {
                 String[] pieces = url.split(":");
                 if (pieces.length != 2)
-                    throw new ConfigException("Invalid url in " + ProducerConfig.BROKER_LIST_CONFIG + ": " + url);
+                    throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
                 try {
                     InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
                     if (address.isUnresolved())
@@ -268,7 +278,7 @@ public class KafkaProducer implements Producer {
         if (size > this.totalMemorySize)
             throw new RecordTooLargeException("The message is " + size +
                                               " bytes when serialized which is larger than the total memory buffer you have configured with the " +
-                                              ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG +
+                                              ProducerConfig.BUFFER_MEMORY_CONFIG +
                                               " configuration.");
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 259c14b..bc4074e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -19,171 +19,198 @@ import java.util.Map;
 
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 
 /**
- * The producer configuration keys
+ * Configuration for the Kafka Producer. Documentation for these configurations can be found in the <a
+ * href="http://kafka.apache.org/documentation.html#new-producer">Kafka documentation</a>
  */
 public class ProducerConfig extends AbstractConfig {
 
-    private static final ConfigDef config;
-
-    /**
-     * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form
-     * <code>host1:port1,host2:port2,...</code>. These urls are just used for the initial connection to discover the
-     * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you
-     * may want more than one, though, in case a server is down).
-     */
-    public static final String BROKER_LIST_CONFIG = "metadata.broker.list";
-
-    /**
-     * The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that
-     * topic.
-     */
-    public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
-
-    /**
-     * The minimum amount of time between metadata fetches. This prevents polling for metadata too quickly.
-     */
-    public static final String METADATA_FETCH_BACKOFF_CONFIG = "metadata.fetch.backoff.ms";
-
-    /**
-     * The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any
-     * leadership changes.
-     */
-    public static final String METADATA_EXPIRY_CONFIG = "metadata.expiry.ms";
-
-    /**
-     * The buffer size allocated for a partition. When records are received which are smaller than this size the
-     * producer will attempt to optimistically group them together until this size is reached.
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS THESE ARE PART OF THE PUBLIC API AND
+     * CHANGE WILL BREAK USER CODE.
      */
-    public static final String MAX_PARTITION_SIZE_CONFIG = "max.partition.bytes";
 
-    /**
-     * The total memory used by the producer to buffer records waiting to be sent to the server. If records are sent
-     * faster than they can be delivered to the server the producer will either block or throw an exception based on the
-     * preference specified by {@link #BLOCK_ON_BUFFER_FULL_CONFIG}.
-     */
-    public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
+    private static final ConfigDef config;
 
-    /**
-     * The number of acknowledgments the producer requires from the server before considering a request complete.
-     */
-    public static final String REQUIRED_ACKS_CONFIG = "request.required.acks";
+    /** <code>bootstrap.servers</code> */
+    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+    private static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Data will be load " + "balanced over all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only "
+                                                       + "impacts the initial hosts used to discover the full set of servers. This list should be in the form "
+                                                       + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to "
+                                                       + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
+                                                       + "servers (you may want more than one, though, in case a server is down). If no server in this list is available sending "
+                                                       + "data will fail until on becomes available.";
 
-    /**
-     * The maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment
-     * requirements the producer has specified. If the requested number of acknowledgments are not met an error will be
-     * returned.
-     */
-    public static final String REQUEST_TIMEOUT_CONFIG = "request.timeout.ms";
-
-    /**
-     * The producer groups together any records that arrive in between request sends. Normally this occurs only under
-     * load when records arrive faster than they can be sent out. However the client can reduce the number of requests
-     * and increase throughput by adding a small amount of artificial delay to force more records to batch together.
-     * This setting gives an upper bound on this delay. If we get {@link #MAX_PARTITION_SIZE_CONFIG} worth of records
-     * for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many
-     * bytes accumulated for this partition we will "linger" for the specified time waiting for more records to show up.
-     * This setting defaults to 0.
-     */
+    /** <code>metadata.fetch.timeout.ms</code> */
+    public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
+    private static final String METADATA_FETCH_TIMEOUT_DOC = "The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the " + "topic's partitions. This configuration controls the maximum amount of time we will block waiting for the metadata "
+                                                             + "fetch to succeed before throwing an exception back to the client.";
+
+    /** <code>metadata.max.age.ms</code> */
+    public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
+    private static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any " + " partition leadership changes to proactively discover any new brokers or partitions.";
+
+    /** <code>batch.size</code> */
+    public static final String BATCH_SIZE_CONFIG = "batch.size";
+    private static final String BATCH_SIZE_DOC = "The producer will attempt to batch records together into fewer requests whenever multiple records are being sent" + " to the same partition. This helps performance on both the client and the server. This configuration controls the "
+                                                 + "default batch size in bytes. "
+                                                 + "<p>"
+                                                 + "No attempt will be made to batch records larger than this size. "
+                                                 + "<p>"
+                                                 + "Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. "
+                                                 + "<p>"
+                                                 + "A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable "
+                                                 + "batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a "
+                                                 + "buffer of the specified batch size in anticipation of additional records.";
+
+    /** <code>buffer.memory</code> */
+    public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
+    private static final String BUFFER_MEMORY_DOC = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are " + "sent faster than they can be delivered to the server the producer will either block or throw an exception based "
+                                                    + "on the preference specified by <code>block.on.buffer.full</code>. "
+                                                    + "<p>"
+                                                    + "This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since "
+                                                    + "not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if "
+                                                    + "compression is enabled) as well as for maintaining in-flight requests.";
+
+    /** <code>acks</code> */
+    public static final String ACKS_CONFIG = "acks";
+    private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the " + " durability of records that are sent. The following settings are common: "
+                                           + " <ul>"
+                                           + " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
+                                           + " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
+                                           + " made that the server has received the record in this case, and the <code>retries</code> configuration will not"
+                                           + " take effect (as the client won't generally know of any failures). The offset given back for each record will"
+                                           + " always be set to -1."
+                                           + " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"
+                                           + " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
+                                           + " acknowledging the record but before the followers have replicated it then the record will be lost."
+                                           + " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
+                                           + " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
+                                           + " remains alive. This is the strongest available guarantee."
+                                           + " <li>Other settings such as <code>acks=2</code> are also possible, and will require the given number of"
+                                           + " acknowledgements but this is generally less useful.";
+
+    /** <code>timeout.ms</code> */
+    public static final String TIMEOUT_CONFIG = "timeout.ms";
+    private static final String TIMEOUT_DOC = "The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to " + "meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the "
+                                              + "requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout "
+                                              + "is measured on the server side and does not include the network latency of the request.";
+
+    /** <code>linger.ms</code> */
     public static final String LINGER_MS_CONFIG = "linger.ms";
-
-    /**
-     * The id string to pass to the server when making requests. The purpose of this is to be able to track the source
-     * of requests beyond just ip/port by allowing a logical application name to be included.
-     */
+    private static final String LINGER_MS_DOC = "The producer groups together any records that arrive in between request transmissions into a single batched request. " + "Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to "
+                                                + "reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount "
+                                                + "of artificial delay&mdash;that is, rather than immediately sending out a record the producer will wait for up to "
+                                                + "the given delay to allow other records to be sent so that the sends can be batched together. This can be thought "
+                                                + "of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once "
+                                                + "we get <code>batch.size</code> worth of records for a partition it will be sent immediately regardless of this "
+                                                + "setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the "
+                                                + "specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>linger.ms=5</code>, "
+                                                + "for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.";
+
+    /** <code>client.id</code> */
     public static final String CLIENT_ID_CONFIG = "client.id";
+    private static final String CLIENT_ID_DOC = "The id string to pass to the server when making requests. The purpose of this is to be able to track the source " + "of requests beyond just ip/port by allowing a logical application name to be included with the request. The "
+                                                + "application can set any string it wants as this has no functional purpose other than in logging and metrics.";
 
-    /**
-     * The size of the TCP send buffer to use when sending data
-     */
+    /** <code>send.buffer.bytes</code> */
     public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
+    private static final String SEND_BUFFER_DOC = "The size of the TCP send buffer to use when sending data";
 
-    /**
-     * The size of the TCP receive buffer to use when reading data (you generally shouldn't need to change this)
-     */
+    /** <code>receive.buffer.bytes</code> */
     public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
+    private static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer to use when reading data";
 
-    /**
-     * The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server
-     * has its own cap on record size which may be different from this.
-     */
+    /** <code>max.request.size</code> */
     public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
+    private static final String MAX_REQUEST_SIZE_DOC = "The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server " + "has its own cap on record size which may be different from this. This setting will limit the number of record "
+                                                       + "batches the producer will send in a single request to avoid sending huge requests.";
 
-    /**
-     * The amount of time to wait before attempting to reconnect to a given host. This avoids repeated connecting to a
-     * host in a tight loop.
-     */
+    /** <code>reconnect.backoff.ms</code> */
     public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
+    private static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time to wait before attempting to reconnect to a given host when a connection fails." + " This avoids a scenario where the client repeatedly attempts to connect to a host in a tight loop.";
 
-    /**
-     * When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default
-     * this setting is true and we block, however users who want to guarantee we never block can turn this into an
-     * error.
-     */
+    /** <code>block.on.buffer.full</code> */
     public static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";
+    private static final String BLOCK_ON_BUFFER_FULL_DOC = "When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default " + "this setting is true and we block, however in some scenarios blocking is not desirable and it is better to "
+                                                           + "immediately give an error. Setting this to <code>false</code> will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full.";
 
-    /**
-     * The maximum number of times to attempt resending the request before giving up.
-     */
-    public static final String MAX_RETRIES_CONFIG = "request.retries";
+    /** <code>retries</code> */
+    public static final String RETRIES_CONFIG = "retries";
+    private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." + " Note that this retry is no different than if the client resent the record upon receiving the "
+                                              + "error. Allowing retries will potentially change the ordering of records because if two records are "
+                                              + "sent to a single partition, and the first fails and is retried but the second succeeds, then the second record "
+                                              + "may appear first.";
 
-    /**
-     * The amount of time to wait before attempting to resend produce request to a given topic partition. This avoids
-     * repeated sending-and-failing in a tight loop
-     */
+    /** <code>retry.backoff.ms</code> */
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
+    private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed produce request to a given topic partition." + " This avoids repeated sending-and-failing in a tight loop.";
 
-    /**
-     * The compression type for all data generated. The default is none (i.e. no compression)
-     */
+    /** <code>compression.type</code> */
     public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
+    private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, or <code>snappy</code>. Compression is of full batches of data, "
+                                                       + " so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
 
-    /**
-     * The window size for a single metrics sample in ms. Defaults to 30 seconds.
-     */
-    public static final String METRICS_SAMPLE_WINDOW_MS = "metrics.sample.window.ms";
+    /** <code>metrics.sample.window.ms</code> */
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
+    private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. "
+                                                               + "When a window expires we erase and overwrite the oldest window.";
 
-    /**
-     * The number of samples used when reporting metrics. Defaults to two. So by default we use two 30 second windows,
-     * so metrics are computed over up to 60 seconds.
-     */
-    public static final String METRICS_NUM_SAMPLES = "metrics.num.samples";
+    /** <code>metrics.num.samples</code> */
+    public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
+    private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";
 
-    /**
-     * Should we register the Kafka metrics as JMX mbeans?
-     */
-    public static final String ENABLE_JMX_CONFIG = "enable.jmx";
+    /** <code>metric.reporters</code> */
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
+    private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
 
     static {
-        /* TODO: add docs */
-        config = new ConfigDef().define(BROKER_LIST_CONFIG, Type.LIST, "blah blah")
-                                .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), "blah blah")
-                                .define(METADATA_FETCH_BACKOFF_CONFIG, Type.LONG, 50, atLeast(0), "blah blah")
-                                .define(METADATA_EXPIRY_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), "blah blah")
-                                .define(MAX_PARTITION_SIZE_CONFIG, Type.INT, 16384, atLeast(0), "blah blah")
-                                .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), "blah blah")
-                                /* TODO: should be a string to handle acks=in-sync */
-                                .define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah")
-                                .define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah")
-                                .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah")
-                                .define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
-                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
-                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah")
-                                .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), "blah blah")
-                                .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
-                                .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah")
-                                .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "")
-                                .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah")
-                                .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah")
-                                .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
-                                .define(METRICS_SAMPLE_WINDOW_MS, Type.LONG, 30000, atLeast(0), "")
-                                .define(METRICS_NUM_SAMPLES, Type.INT, 2, atLeast(1), "");
+        config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC)
+                                .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
+                                .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
+                                .define(ACKS_CONFIG, Type.STRING, "1", Importance.HIGH, ACKS_DOC)
+                                .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
+                                .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
+                                .define(TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), Importance.MEDIUM, TIMEOUT_DOC)
+                                .define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), Importance.MEDIUM, LINGER_MS_DOC)
+                                .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
+                                .define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.MEDIUM, SEND_BUFFER_DOC)
+                                .define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), Importance.MEDIUM, RECEIVE_BUFFER_DOC)
+                                .define(MAX_REQUEST_SIZE_CONFIG,
+                                        Type.INT,
+                                        1 * 1024 * 1024,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        MAX_REQUEST_SIZE_DOC)
+                                .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, Importance.LOW, BLOCK_ON_BUFFER_FULL_DOC)
+                                .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, RECONNECT_BACKOFF_MS_DOC)
+                                .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC)
+                                .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, RETRY_BACKOFF_MS_DOC)
+                                .define(METADATA_FETCH_TIMEOUT_CONFIG,
+                                        Type.LONG,
+                                        60 * 1000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        METADATA_FETCH_TIMEOUT_DOC)
+                                .define(METADATA_MAX_AGE_CONFIG, Type.LONG, 5 * 60 * 1000, atLeast(0), Importance.LOW, METADATA_MAX_AGE_DOC)
+                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        METRICS_SAMPLE_WINDOW_MS_DOC)
+                                .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC);
     }
 
     ProducerConfig(Map<? extends Object, ? extends Object> props) {
         super(config, props);
     }
 
+    public static void main(String[] args) {
+        System.out.println(config.toHtmlTable());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 3b3fb2c..eb18739 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -36,12 +36,12 @@ public class ProducerPerformance {
         int recordSize = Integer.parseInt(args[3]);
         int acks = Integer.parseInt(args[4]);
         Properties props = new Properties();
-        props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, Integer.toString(acks));
-        props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
+        props.setProperty(ProducerConfig.ACKS_CONFIG, Integer.toString(acks));
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
         props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
-        props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
-        props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024));
-        props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(64 * 1024));
+        props.setProperty(ProducerConfig.TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
+        props.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024));
+        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(64 * 1024));
         if (args.length == 6)
             props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 84a327e..8d88610 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.common.config;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -111,6 +112,9 @@ public class AbstractConfig {
         log.info(b.toString());
     }
 
+    /**
+     * Log warnings for any unused configurations
+     */
     public void logUnused() {
         for (String key : unused())
             log.warn("The configuration {} = {} was supplied but isn't a known config.", key, this.values.get(key));
@@ -136,4 +140,21 @@ public class AbstractConfig {
         return t.cast(o);
     }
 
+    public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
+        List<String> klasses = getList(key);
+        List<T> objects = new ArrayList<T>();
+        for (String klass : klasses) {
+            Class<?> c = getClass(klass);
+            if (c == null)
+                return null;
+            Object o = Utils.newInstance(c);
+            if (!t.isInstance(o))
+                throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
+            if (o instanceof Configurable)
+                ((Configurable) o).configure(this.originals);
+            objects.add(t.cast(o));
+        }
+        return objects;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 67b349d..9ba7ee7 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -1,22 +1,21 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.config;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,7 +44,7 @@ import java.util.Map;
  */
 public class ConfigDef {
 
-    private static final Object NO_DEFAULT_VALUE = new Object();
+    private static final Object NO_DEFAULT_VALUE = new String("");
 
     private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
 
@@ -55,14 +54,15 @@ public class ConfigDef {
      * @param type The type of the config
      * @param defaultValue The default value to use if this config isn't present
      * @param validator A validator to use in checking the correctness of the config
+     * @param importance The importance of this config: is this something you will likely need to change.
      * @param documentation The documentation string for the config
      * @return This ConfigDef so you can chain calls
      */
-    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, String documentation) {
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
         if (configKeys.containsKey(name))
             throw new ConfigException("Configuration " + name + " is defined twice.");
         Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
-        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, documentation));
+        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation));
         return this;
     }
 
@@ -71,11 +71,12 @@ public class ConfigDef {
      * @param name The name of the config parameter
      * @param type The type of the config
      * @param defaultValue The default value to use if this config isn't present
+     * @param importance The importance of this config: is this something you will likely need to change.
      * @param documentation The documentation string for the config
      * @return This ConfigDef so you can chain calls
      */
-    public ConfigDef define(String name, Type type, Object defaultValue, String documentation) {
-        return define(name, type, defaultValue, null, documentation);
+    public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) {
+        return define(name, type, defaultValue, null, importance, documentation);
     }
 
     /**
@@ -83,22 +84,24 @@ public class ConfigDef {
      * @param name The name of the config parameter
      * @param type The type of the config
      * @param validator A validator to use in checking the correctness of the config
+     * @param importance The importance of this config: is this something you will likely need to change.
      * @param documentation The documentation string for the config
      * @return This ConfigDef so you can chain calls
      */
-    public ConfigDef define(String name, Type type, Validator validator, String documentation) {
-        return define(name, type, NO_DEFAULT_VALUE, validator, documentation);
+    public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) {
+        return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation);
     }
 
     /**
      * Define a required parameter with no default value and no special validation logic
      * @param name The name of the config parameter
      * @param type The type of the config
+     * @param importance The importance of this config: is this something you will likely need to change.
      * @param documentation The documentation string for the config
      * @return This ConfigDef so you can chain calls
      */
-    public ConfigDef define(String name, Type type, String documentation) {
-        return define(name, type, NO_DEFAULT_VALUE, null, documentation);
+    public ConfigDef define(String name, Type type, Importance importance, String documentation) {
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation);
     }
 
     /**
@@ -206,6 +209,10 @@ public class ConfigDef {
         BOOLEAN, STRING, INT, LONG, DOUBLE, LIST, CLASS;
     }
 
+    public enum Importance {
+        HIGH, MEDIUM, LOW
+    }
+
     /**
      * Validation logic the user may provide
      */
@@ -230,7 +237,7 @@ public class ConfigDef {
          * @param min The minimum acceptable value
          */
         public static Range atLeast(Number min) {
-            return new Range(min, Double.MAX_VALUE);
+            return new Range(min, null);
         }
 
         /**
@@ -242,8 +249,19 @@ public class ConfigDef {
 
         public void ensureValid(String name, Object o) {
             Number n = (Number) o;
-            if (n.doubleValue() < min.doubleValue() || n.doubleValue() > max.doubleValue())
-                throw new ConfigException(name, o, "Value must be in the range [" + min + ", " + max + "]");
+            if (min != null && n.doubleValue() < min.doubleValue())
+                throw new ConfigException(name, o, "Value must be at least " + min);
+            if (max != null && n.doubleValue() > max.doubleValue())
+                throw new ConfigException(name, o, "Value must be no more than " + max);
+        }
+
+        public String toString() {
+            if (min == null)
+                return "[...," + max + "]";
+            else if (max == null)
+                return "[" + min + ",...]";
+            else
+                return "[" + min + ",...," + max + "]";
         }
     }
 
@@ -253,17 +271,75 @@ public class ConfigDef {
         public final String documentation;
         public final Object defaultValue;
         public final Validator validator;
+        public final Importance importance;
 
-        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, String documentation) {
+        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
             super();
             this.name = name;
             this.type = type;
             this.defaultValue = defaultValue;
             this.validator = validator;
+            this.importance = importance;
             if (this.validator != null)
                 this.validator.ensureValid(name, defaultValue);
             this.documentation = documentation;
         }
 
+        public boolean hasDefault() {
+            return this.defaultValue != NO_DEFAULT_VALUE;
+        }
+
+    }
+
+    public String toHtmlTable() {
+        // sort first required fields, then by importance, then name
+        List<ConfigDef.ConfigKey> configs = new ArrayList<ConfigDef.ConfigKey>(this.configKeys.values());
+        Collections.sort(configs, new Comparator<ConfigDef.ConfigKey>() {
+            public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) {
+                // first take anything with no default value
+                if (!k1.hasDefault() && k2.hasDefault())
+                    return -1;
+                else if (!k2.hasDefault() && k1.hasDefault())
+                    return 1;
+
+                // then sort by importance
+                int cmp = k1.importance.compareTo(k2.importance);
+                if (cmp == 0)
+                    // then sort in alphabetical order
+                    return k1.name.compareTo(k2.name);
+                else
+                    return cmp;
+            }
+        });
+        StringBuilder b = new StringBuilder();
+        b.append("<table>\n");
+        b.append("<tr>\n");
+        b.append("<th>Name</th>\n");
+        b.append("<th>Type</th>\n");
+        b.append("<th>Default</th>\n");
+        b.append("<th>Importance</th>\n");
+        b.append("<th>Description</th>\n");
+        b.append("</tr>\n");
+        for (ConfigKey def : configs) {
+            b.append("<tr>\n");
+            b.append("<td>");
+            b.append(def.name);
+            b.append("</td>");
+            b.append("<td>");
+            b.append(def.type.toString().toLowerCase());
+            b.append("</td>");
+            b.append("<td>");
+            b.append(def.defaultValue == null ? "" : def.defaultValue);
+            b.append("</td>");
+            b.append("<td>");
+            b.append(def.importance.toString().toLowerCase());
+            b.append("</td>");
+            b.append("<td>");
+            b.append(def.documentation);
+            b.append("</td>");
+            b.append("</tr>\n");
+        }
+        b.append("</table>");
+        return b.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 3950eb1..3c31201 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -42,7 +42,7 @@ public class JmxReporter implements MetricsReporter {
 
     private static final Logger log = LoggerFactory.getLogger(JmxReporter.class);
     private static final Object lock = new Object();
-    private final String prefix;
+    private String prefix;
     private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
 
     public JmxReporter() {
@@ -57,6 +57,10 @@ public class JmxReporter implements MetricsReporter {
     }
 
     @Override
+    public void configure(Map<String, ?> configs) {
+    }
+
+    @Override
     public void init(List<KafkaMetric> metrics) {
         synchronized (lock) {
             for (KafkaMetric metric : metrics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index 2c395b1..7acc19e 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -1,27 +1,25 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics;
 
 import java.util.List;
 
+import org.apache.kafka.common.Configurable;
+
 /**
- * A plugin interface to allow things to listen as new metrics are created so they can be reported
+ * A plugin interface to allow things to listen as new metrics are created so they can be reported.
  */
-public interface MetricsReporter {
+public interface MetricsReporter extends Configurable {
 
     /**
      * This is called when the reporter is first registered to initially register all existing metrics

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 29543df..09a82fe 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.config;
 
@@ -24,9 +20,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Range;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.junit.Test;
@@ -35,13 +29,13 @@ public class ConfigDefTest {
 
     @Test
     public void testBasicTypes() {
-        ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), "docs")
-                                       .define("b", Type.LONG, "docs")
-                                       .define("c", Type.STRING, "hello", "docs")
-                                       .define("d", Type.LIST, "docs")
-                                       .define("e", Type.DOUBLE, "docs")
-                                       .define("f", Type.CLASS, "docs")
-                                       .define("g", Type.BOOLEAN, "docs");
+        ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), Importance.HIGH, "docs")
+                                       .define("b", Type.LONG, Importance.HIGH, "docs")
+                                       .define("c", Type.STRING, "hello", Importance.HIGH, "docs")
+                                       .define("d", Type.LIST, Importance.HIGH, "docs")
+                                       .define("e", Type.DOUBLE, Importance.HIGH, "docs")
+                                       .define("f", Type.CLASS, Importance.HIGH, "docs")
+                                       .define("g", Type.BOOLEAN, Importance.HIGH, "docs");
 
         Properties props = new Properties();
         props.put("a", "1   ");
@@ -63,22 +57,22 @@ public class ConfigDefTest {
 
     @Test(expected = ConfigException.class)
     public void testInvalidDefault() {
-        new ConfigDef().define("a", Type.INT, "hello", "docs");
+        new ConfigDef().define("a", Type.INT, "hello", Importance.HIGH, "docs");
     }
 
     @Test(expected = ConfigException.class)
     public void testNullDefault() {
-        new ConfigDef().define("a", Type.INT, null, null, "docs");
+        new ConfigDef().define("a", Type.INT, null, null, null, "docs");
     }
 
     @Test(expected = ConfigException.class)
     public void testMissingRequired() {
-        new ConfigDef().define("a", Type.INT, "docs").parse(new HashMap<String, Object>());
+        new ConfigDef().define("a", Type.INT, Importance.HIGH, "docs").parse(new HashMap<String, Object>());
     }
 
     @Test(expected = ConfigException.class)
     public void testDefinedTwice() {
-        new ConfigDef().define("a", Type.STRING, "docs").define("a", Type.INT, "docs");
+        new ConfigDef().define("a", Type.STRING, Importance.HIGH, "docs").define("a", Type.INT, Importance.HIGH, "docs");
     }
 
     @Test
@@ -94,7 +88,7 @@ public class ConfigDefTest {
         for (Object value : values) {
             Map<String, Object> m = new HashMap<String, Object>();
             m.put("name", value);
-            ConfigDef def = new ConfigDef().define("name", type, "docs");
+            ConfigDef def = new ConfigDef().define("name", type, Importance.HIGH, "docs");
             try {
                 def.parse(m);
                 fail("Expected a config exception on bad input for value " + value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 1d73aca..2dad20e 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -71,7 +71,7 @@ class ProducerCompressionTest(compression: String) extends JUnit3Suite with ZooK
   def testCompression() {
 
     val props = new Properties()
-    props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config)))
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression)
     var producer = new KafkaProducer(props)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 525a060..ef56044 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -67,11 +67,11 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
   private def makeProducer(brokerList: String, acks: Int, metadataFetchTimeout: Long,
                            blockOnBufferFull: Boolean, bufferSize: Long) : KafkaProducer = {
     val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, acks.toString)
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString)
     producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString)
     producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, blockOnBufferFull.toString)
-    producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString)
+    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
     return new KafkaProducer(producerProps)
   }
 
@@ -314,10 +314,10 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
     var failed = false
 
     val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, (-1).toString)
-    producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString)
-    producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString)
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.ACKS_CONFIG, (-1).toString)
+    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString)
+    producerProps.put(ProducerConfig.RETRIES_CONFIG, 10.toString)
     producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString)
     val producer = new KafkaProducer(producerProps)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
index 3c37330..60e68c7 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
@@ -91,7 +91,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
   @Test
   def testSendOffset() {
     val props = new Properties()
-    props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     var producer = new KafkaProducer(props)
 
     val callback = new CheckErrorCallback
@@ -149,7 +149,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
   @Test
   def testClose() {
     val props = new Properties()
-    props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     var producer = new KafkaProducer(props)
 
     try {
@@ -187,8 +187,8 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
   @Test
   def testSendToPartition() {
     val props = new Properties()
-    props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
-    props.put(ProducerConfig.REQUIRED_ACKS_CONFIG, "-1")
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    props.put(ProducerConfig.ACKS_CONFIG, "-1")
     var producer = new KafkaProducer(props)
 
     try {
@@ -245,7 +245,7 @@ class ProducerSendTest extends JUnit3Suite with ZooKeeperTestHarness {
   @Test
   def testAutoCreateTopic() {
     val props = new Properties()
-    props.put(ProducerConfig.BROKER_LIST_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2)))
     var producer = new KafkaProducer(props)
 
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 9e4ebaf..1490bdb 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -204,15 +204,16 @@ object ProducerPerformance extends Logging {
   }
 
   class NewShinyProducer(config: ProducerPerfConfig) extends Producer {
+    import org.apache.kafka.clients.producer.ProducerConfig
     val props = new Properties()
-    props.put("metadata.broker.list", config.brokerList)
-    props.put("send.buffer.bytes", (64 * 1024).toString)
-    props.put("client.id", "perf-test")
-    props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
-    props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
-    props.put("request.retries", config.producerNumRetries.toString)
-    props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
-    props.put("compression.type", config.compressionCodec.name)
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
+    props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
+    props.put(ProducerConfig.CLIENT_ID_CONFIG, "perf-test")
+    props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString)
+    props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString)
+    props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
+    props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
+    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)
     val producer = new KafkaProducer(props)
 
     def send(topic: String, partition: Long, bytes: Array[Byte]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/640f3b05/system_test/mirror_maker_testsuite/config/mirror_producer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/config/mirror_producer.properties b/system_test/mirror_maker_testsuite/config/mirror_producer.properties
index 3ec69fa..7f48b07 100644
--- a/system_test/mirror_maker_testsuite/config/mirror_producer.properties
+++ b/system_test/mirror_maker_testsuite/config/mirror_producer.properties
@@ -1,6 +1,5 @@
 block.on.buffer.full=true
-metadata.broker.list=localhost:9094
-compression.codec=0
+bootstrap.servers=localhost:9094
 compression.type=none
-request.retries=3
-request.required.acks=1
+retries=3
+acks=1


Mime
View raw message