kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject [1/2] KAFKA-1251: Add metrics to the producer.
Date Fri, 28 Mar 2014 03:53:54 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9bc47bc13 -> 23d7fc470


http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 12c9500..a2b7722 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -70,6 +70,7 @@ public class SenderTest {
                                        REQUEST_TIMEOUT_MS,
                                        SEND_BUFFER_SIZE,
                                        RECEIVE_BUFFER_SIZE,
+                                       metrics,
                                        time);
 
     @Before
@@ -114,6 +115,7 @@ public class SenderTest {
                                    REQUEST_TIMEOUT_MS,
                                    SEND_BUFFER_SIZE,
                                    RECEIVE_BUFFER_SIZE,
+                                   new Metrics(),
                                    time);
         Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(),
CompressionType.NONE, null);
         RequestSend request1 = completeSend(sender);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index fdd8914..9ff73f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics;
 
@@ -22,25 +18,16 @@ import static org.junit.Assert.fail;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
-
 import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Quota;
-import org.apache.kafka.common.metrics.QuotaViolationException;
-import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Percentile;
 import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
-import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.Test;
 
@@ -154,9 +141,10 @@ public class MetricsTest {
     public void testOldDataHasNoEffect() {
         Max max = new Max();
         long windowMs = 100;
-        MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS);
+        int samples = 2;
+        MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
         max.record(config, 50, time.nanoseconds());
-        time.sleep(windowMs);
+        time.sleep(samples * windowMs);
         assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.nanoseconds()), EPS);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 90e2dcf..99856e9 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.network;
 
@@ -27,16 +23,12 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.nio.channels.UnresolvedAddressException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-
-import org.apache.kafka.common.network.NetworkReceive;
-import org.apache.kafka.common.network.NetworkSend;
-import org.apache.kafka.common.network.Selectable;
-import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -58,7 +50,7 @@ public class SelectorTest {
     public void setup() throws Exception {
         this.server = new EchoServer();
         this.server.start();
-        this.selector = new Selector();
+        this.selector = new Selector(new Metrics(), new MockTime());
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
index 7239b4a..9d98c11 100644
--- a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
+++ b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.test;
 
@@ -27,7 +23,6 @@ import org.apache.kafka.common.metrics.stats.Percentile;
 import org.apache.kafka.common.metrics.stats.Percentiles;
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 
-
 public class MetricsBench {
 
     public static void main(String[] args) {
@@ -48,7 +43,7 @@ public class MetricsBench {
         }
         long start = System.nanoTime();
         for (int i = 0; i < iters; i++)
-            child.record(i);
+            parent.record(i);
         double ellapsed = (System.nanoTime() - start) / (double) iters;
         System.out.println(String.format("%.2f ns per metric recording.", ellapsed));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
index 46cf86e..6aab854 100644
--- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership.
The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file
except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.test;
 
@@ -24,11 +20,11 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.SystemTime;
 
-
 public class Microbenchmarks {
 
     public static void main(String[] args) throws Exception {
@@ -112,6 +108,43 @@ public class Microbenchmarks {
         t1.join();
         t2.join();
 
+        System.out.println("Testing locks");
+        done.set(false);
+        final ReentrantLock lock2 = new ReentrantLock();
+        Thread t3 = new Thread() {
+            public void run() {
+                time.sleep(1);
+                int counter = 0;
+                long start = time.nanoseconds();
+                for (int i = 0; i < iters; i++) {
+                    lock2.lock();
+                    counter++;
+                    lock2.unlock();
+                }
+                System.out.println("lock: " + ((System.nanoTime() - start) / iters));
+                System.out.println(counter);
+                done.set(true);
+            }
+        };
+
+        Thread t4 = new Thread() {
+            public void run() {
+                int counter = 0;
+                while (!done.get()) {
+                    time.sleep(1);
+                    lock2.lock();
+                    counter++;
+                    lock2.unlock();
+                }
+                System.out.println("Counter: " + counter);
+            }
+        };
+
+        t3.start();
+        t4.start();
+        t3.join();
+        t4.join();
+
         Map<String, Integer> values = new HashMap<String, Integer>();
         for (int i = 0; i < 100; i++)
             values.put(Integer.toString(i), i);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index baa698b..9502254 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -41,7 +41,7 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
 log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
-log4j.appender.cleanerAppender.File=log-cleaner.log
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
 log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 


Mime
View raw message