gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hut...@apache.org
Subject [1/2] incubator-gobblin git commit: [GOBBLIN-298] Add metric and event reporters that emit using a KafkaProducer
Date Fri, 27 Oct 2017 19:00:50 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 90d8495ae -> ee770f5c5


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
new file mode 100644
index 0000000..1c935b4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.reporter.MetricReportReporter;
+import org.apache.gobblin.metrics.reporter.util.AvroJsonSerializer;
+import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
+import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
+import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Kafka reporter for metrics.
+ *
+ * @author ibuenros
+ */
+@Slf4j
+public class KafkaReporter extends MetricReportReporter {
+  public static final String SCHEMA_VERSION_WRITER_TYPE = "metrics.kafka.schemaVersionWriterType";
+  private static final String METRICS_KAFKA_PREFIX = "metrics.kafka";
+
+  protected final AvroSerializer<MetricReport> serializer;
+  protected final Pusher kafkaPusher;
+
+  protected KafkaReporter(Builder<?> builder, Config config) throws IOException {
+    super(builder, config);
+
+    SchemaVersionWriter versionWriter;
+    if (config.hasPath(SCHEMA_VERSION_WRITER_TYPE)) {
+      try {
+        ClassAliasResolver<SchemaVersionWriter> resolver = new ClassAliasResolver<>(SchemaVersionWriter.class);
+        Class<? extends SchemaVersionWriter> klazz = resolver.resolveClass(config.getString(SCHEMA_VERSION_WRITER_TYPE));
+        versionWriter = klazz.newInstance();
+      } catch (ReflectiveOperationException roe) {
+        throw new IOException("Could not instantiate version writer.", roe);
+      }
+    } else {
+      versionWriter = new FixedSchemaVersionWriter();
+    }
+
+    log.info("Schema version writer: " + versionWriter.getClass().getName());
+    this.serializer = this.closer.register(createSerializer(versionWriter));
+
+    if (builder.kafkaPusher.isPresent()) {
+      this.kafkaPusher = builder.kafkaPusher.get();
+    } else {
+      Config kafkaConfig = ConfigUtils.getConfigOrEmpty(config, PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX);
+      String pusherClassName = ConfigUtils.getString(config, PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+          PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+
+      this.kafkaPusher = PusherUtils.getPusher(pusherClassName, builder.brokers, builder.topic,
Optional.of(kafkaConfig));
+    }
+  }
+
+  protected AvroSerializer<MetricReport> createSerializer(SchemaVersionWriter schemaVersionWriter)
throws IOException {
+    return new AvroJsonSerializer<>(MetricReport.SCHEMA$, schemaVersionWriter);
+  }
+
+  /**
+   * A static factory class for obtaining new {@link Builder}s
+   *
+   * @see Builder
+   */
+  public static class BuilderFactory {
+
+    public static BuilderImpl newBuilder() {
+      return new BuilderImpl();
+    }
+  }
+
+  public static class BuilderImpl extends Builder<BuilderImpl> {
+
+    @Override
+    protected BuilderImpl self() {
+      return this;
+    }
+  }
+
+  /**
+   * Builder for {@link KafkaReporter}. Defaults to no filter, reporting rates in seconds
and times in milliseconds.
+   */
+  public static abstract class Builder<T extends MetricReportReporter.Builder<T>>
+      extends MetricReportReporter.Builder<T> {
+
+    protected String brokers;
+    protected String topic;
+    protected Optional<Pusher> kafkaPusher;
+
+    protected Builder() {
+      super();
+      this.name = "KafkaReporter";
+      this.kafkaPusher = Optional.absent();
+    }
+
+    /**
+     * Set {@link Pusher} to use.
+     */
+    public T withKafkaPusher(Pusher pusher) {
+      this.kafkaPusher = Optional.of(pusher);
+      return self();
+    }
+
+    /**
+     * Builds and returns {@link KafkaReporter}.
+     *
+     * @param brokers string of Kafka brokers
+     * @param topic topic to send metrics to
+     * @return KafkaReporter
+     */
+    public KafkaReporter build(String brokers, String topic, Properties props) throws IOException
{
+      this.brokers = brokers;
+      this.topic = topic;
+
+      return new KafkaReporter(this, ConfigUtils.propertiesToConfig(props, Optional.of(ConfigurationKeys.METRICS_CONFIGURATIONS_PREFIX)));
+    }
+  }
+
+  @Override
+  protected void emitReport(MetricReport report) {
+    this.kafkaPusher.pushMessages(Lists.newArrayList(this.serializer.serializeRecord(report)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
new file mode 100644
index 0000000..9faac33
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaReporterFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.CustomCodahaleReporterFactory;
+import org.apache.gobblin.metrics.KafkaReportingFormats;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class KafkaReporterFactory implements CustomCodahaleReporterFactory {
+  @Override
+  public ScheduledReporter newScheduledReporter(MetricRegistry registry, Properties properties)
+      throws IOException {
+    if (!Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_ENABLED_KEY,
+        ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_ENABLED))) {
+      return null;
+    }
+    log.info("Reporting metrics to Kafka");
+
+    Optional<String> defaultTopic = Optional.fromNullable(properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC));
+    Optional<String> metricsTopic = Optional.fromNullable(
+        properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_METRICS));
+    Optional<String> eventsTopic = Optional.fromNullable(
+        properties.getProperty(ConfigurationKeys.METRICS_KAFKA_TOPIC_EVENTS));
+
+    boolean metricsEnabled = metricsTopic.or(defaultTopic).isPresent();
+    if (metricsEnabled) log.info("Reporting metrics to Kafka");
+    boolean eventsEnabled = eventsTopic.or(defaultTopic).isPresent();
+    if (eventsEnabled) log.info("Reporting events to Kafka");
+
+    try {
+      Preconditions.checkArgument(properties.containsKey(ConfigurationKeys.METRICS_KAFKA_BROKERS),
+          "Kafka metrics brokers missing.");
+      Preconditions.checkArgument(metricsTopic.or(eventsTopic).or(defaultTopic).isPresent(),
"Kafka topic missing.");
+    } catch (IllegalArgumentException exception) {
+      log.error("Not reporting metrics to Kafka due to missing Kafka configuration(s).",
exception);
+      return null;
+    }
+
+    String brokers = properties.getProperty(ConfigurationKeys.METRICS_KAFKA_BROKERS);
+
+    String reportingFormat = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_FORMAT,
+        ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_FORMAT);
+
+    KafkaReportingFormats formatEnum;
+    try {
+      formatEnum = KafkaReportingFormats.valueOf(reportingFormat.toUpperCase());
+    } catch (IllegalArgumentException exception) {
+      log.warn("Kafka metrics reporting format " + reportingFormat +
+          " not recognized. Will report in json format.", exception);
+      formatEnum = KafkaReportingFormats.JSON;
+    }
+
+    if (metricsEnabled) {
+      try {
+        formatEnum.metricReporterBuilder(properties)
+            .build(brokers, metricsTopic.or(defaultTopic).get(), properties);
+      } catch (IOException exception) {
+        log.error("Failed to create Kafka metrics reporter. Will not report metrics to Kafka.",
exception);
+      }
+    }
+
+    if (eventsEnabled) {
+      try {
+        KafkaEventReporter.Builder<?> builder = formatEnum.eventReporterBuilder(RootMetricContext.get(),
+            properties);
+
+        Config kafkaConfig = ConfigUtils.getConfigOrEmpty(ConfigUtils.propertiesToConfig(properties),
+            PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX);
+        builder.withConfig(kafkaConfig);
+
+        builder.withPusherClassName(properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+            PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME));
+
+        return builder.build(brokers, eventsTopic.or(defaultTopic).get());
+      } catch (IOException exception) {
+        log.error("Failed to create Kafka events reporter. Will not report events to Kafka.",
exception);
+      }
+    }
+
+    log.info("Will start reporting metrics to Kafka");
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
new file mode 100644
index 0000000..5abd503
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.Closeable;
+import java.util.List;
+
+
+/**
+ * Establish a connection to a Kafka cluster and push byte messages to a specified topic.
+ */
+public interface Pusher extends Closeable {
+  /**
+   * Push all byte array messages to the Kafka topic.
+   * @param messages List of byte array messages to push to Kakfa.
+   */
+  void pushMessages(List<byte[]> messages);
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
new file mode 100644
index 0000000..a76c750
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metrics.kafka;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+public class PusherUtils {
+  public static final String METRICS_REPORTING_KAFKA_CONFIG_PREFIX = "metrics.reporting.kafka.config";
+  public static final String KAFKA_PUSHER_CLASS_NAME_KEY = "metrics.reporting.kafkaPusherClass";
+  public static final String DEFAULT_KAFKA_PUSHER_CLASS_NAME = "org.apache.gobblin.metrics.kafka.KafkaPusher";
+
+  /**
+   * Create a {@link Pusher}
+   * @param pusherClassName the {@link Pusher} class to instantiate
+   * @param brokers brokers to connect to
+   * @param topic the topic to write to
+   * @param config additional configuration for configuring the {@link Pusher}
+   * @return a {@link Pusher}
+   */
+  public static Pusher getPusher(String pusherClassName, String brokers, String topic, Optional<Config>
config) {
+    try {
+      Class<?> pusherClass = Class.forName(pusherClassName);
+
+     return (Pusher) GobblinConstructorUtils.invokeLongestConstructor(pusherClass,
+          brokers, topic, config);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Could not instantiate kafka pusher", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
new file mode 100644
index 0000000..e240a53
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.EventUtils;
+
+
+@Test(groups = {"gobblin.metrics"})
+public class KafkaAvroEventReporterTest extends KafkaEventReporterTest {
+
+  @Override
+  public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext
context,
+      Pusher pusher) {
+    return KafkaAvroEventReporter.forContext(context).withKafkaPusher(pusher);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it)
+      throws IOException {
+    Assert.assertTrue(it.hasNext());
+    return EventUtils.deserializeReportFromAvroSerialization(new GobblinTrackingEvent(),
it.next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
new file mode 100644
index 0000000..e7be31d
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroReporterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
+import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+
+/**
+ * Test for KafkaAvroReporter
+ * Extends KafkaReporterTest and just redefines the builder and the metrics deserializer
+ *
+ * @author ibuenros
+ */
+@Test(groups = {"gobblin.metrics"})
+public class KafkaAvroReporterTest extends KafkaReporterTest {
+
+  public KafkaAvroReporterTest(String topic)
+      throws IOException, InterruptedException {
+    super();
+  }
+
+  public KafkaAvroReporterTest() throws IOException, InterruptedException {
+    this("KafkaAvroReporterTest");
+  }
+
+  @Override
+  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher)
{
+    return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+  }
+
+  @Override
+  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher
pusher) {
+    return KafkaAvroReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected MetricReport nextReport(Iterator<byte[]> it)
+      throws IOException {
+    Assert.assertTrue(it.hasNext());
+    return MetricReportUtils.deserializeReportFromAvroSerialization(new MetricReport(), it.next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
new file mode 100644
index 0000000..add42f4
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.EventUtils;
+
+
+@Test(groups = {"gobblin.metrics"})
+public class KafkaEventReporterTest {
+
+  /**
+   * Get builder for KafkaReporter (override if testing an extension of KafkaReporter)
+   * @param context metricregistry
+   * @return KafkaReporter builder
+   */
+  public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext
context,
+      Pusher pusher) {
+    return KafkaEventReporter.Factory.forContext(context).withKafkaPusher(pusher);
+  }
+
+
+  @Test
+  public void testKafkaEventReporter() throws IOException {
+    MetricContext context = MetricContext.builder("context").build();
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000",
"topic");
+
+    String namespace = "gobblin.metrics.test";
+    String eventName = "testEvent";
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent();
+    event.setName(eventName);
+    event.setNamespace(namespace);
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put("m1", "v1");
+    metadata.put("m2", null);
+    event.setMetadata(metadata);
+    context.submitEvent(event);
+
+    try {
+      Thread.sleep(100);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    kafkaReporter.report();
+
+    try {
+      Thread.sleep(100);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator());
+    Assert.assertEquals(retrievedEvent.getNamespace(), namespace);
+    Assert.assertEquals(retrievedEvent.getName(), eventName);
+    Assert.assertEquals(retrievedEvent.getMetadata().size(), 4);
+
+  }
+
+  @Test
+  public void testTagInjection() throws IOException {
+
+    String tag1 = "tag1";
+    String value1 = "value1";
+    String metadataValue1 = "metadata1";
+    String tag2 = "tag2";
+    String value2 = "value2";
+
+    MetricContext context = MetricContext.builder("context").addTag(new Tag<String>(tag1,
value1)).
+        addTag(new Tag<String>(tag2, value2)).build();
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaEventReporter kafkaReporter = getBuilder(context, pusher).build("localhost:0000",
"topic");
+
+    String namespace = "gobblin.metrics.test";
+    String eventName = "testEvent";
+
+    GobblinTrackingEvent event = new GobblinTrackingEvent();
+    event.setName(eventName);
+    event.setNamespace(namespace);
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put(tag1, metadataValue1);
+    event.setMetadata(metadata);
+    context.submitEvent(event);
+
+    try {
+      Thread.sleep(100);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    kafkaReporter.report();
+
+    try {
+      Thread.sleep(100);
+    } catch(InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    GobblinTrackingEvent retrievedEvent = nextEvent(pusher.messageIterator());
+    Assert.assertEquals(retrievedEvent.getNamespace(), namespace);
+    Assert.assertEquals(retrievedEvent.getName(), eventName);
+    Assert.assertEquals(retrievedEvent.getMetadata().size(), 4);
+    Assert.assertEquals(retrievedEvent.getMetadata().get(tag1), metadataValue1);
+    Assert.assertEquals(retrievedEvent.getMetadata().get(tag2), value2);
+  }
+
+  /**
+   * Extract the next metric from the Kafka iterator
+   * Assumes existence of the metric has already been checked.
+   * @param it Kafka ConsumerIterator
+   * @return next metric in the stream
+   * @throws IOException
+   */
+  protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it) throws IOException
{
+    Assert.assertTrue(it.hasNext());
+    return EventUtils.deserializeReportFromJson(new GobblinTrackingEvent(), it.next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
new file mode 100644
index 0000000..f653dd9
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaReporterTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.metrics.Measurements;
+import org.apache.gobblin.metrics.Metric;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.MetricReport;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.Pusher;
+import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
+
+@Test(groups = { "gobblin.metrics" })
+public class KafkaReporterTest {
+
+  public KafkaReporterTest() throws IOException, InterruptedException {}
+
+  /**
+   * Get builder for KafkaReporter (override if testing an extension of KafkaReporter)
+   * @return KafkaReporter builder
+   */
+  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher)
{
+    return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+  }
+
+  public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher
pusher) {
+    return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
+  }
+
+  @Test
+  public void testKafkaReporter() throws IOException {
+    MetricContext metricContext =
+        MetricContext.builder(this.getClass().getCanonicalName() + ".testKafkaReporter").build();
+    Counter counter = metricContext.counter("com.linkedin.example.counter");
+    Meter meter = metricContext.meter("com.linkedin.example.meter");
+    Histogram histogram = metricContext.histogram("com.linkedin.example.histogram");
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaReporter kafkaReporter = getBuilder(pusher).build("localhost:0000", "topic", new
Properties());
+
+    counter.inc();
+    meter.mark(2);
+    histogram.update(1);
+    histogram.update(1);
+    histogram.update(2);
+
+    kafkaReporter.report(metricContext);
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Map<String, Double> expected = new HashMap<>();
+    expected.put("com.linkedin.example.counter." + Measurements.COUNT, 1.0);
+    expected.put("com.linkedin.example.meter." + Measurements.COUNT, 2.0);
+    expected.put("com.linkedin.example.histogram." + Measurements.COUNT, 3.0);
+
+    MetricReport nextReport = nextReport(pusher.messageIterator());
+
+    expectMetricsWithValues(nextReport, expected);
+
+    kafkaReporter.report(metricContext);
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    Set<String> expectedSet = new HashSet<>();
+    expectedSet.add("com.linkedin.example.counter." + Measurements.COUNT);
+    expectedSet.add("com.linkedin.example.meter." + Measurements.COUNT);
+    expectedSet.add("com.linkedin.example.meter." + Measurements.MEAN_RATE);
+    expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_1MIN);
+    expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_5MIN);
+    expectedSet.add("com.linkedin.example.meter." + Measurements.RATE_15MIN);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.MEAN);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.MIN);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.MAX);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.MEDIAN);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_75TH);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_95TH);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_99TH);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_999TH);
+    expectedSet.add("com.linkedin.example.histogram." + Measurements.COUNT);
+
+    nextReport = nextReport(pusher.messageIterator());
+    expectMetrics(nextReport, expectedSet, true);
+
+    kafkaReporter.close();
+
+  }
+
+  @Test
+  public void kafkaReporterTagsTest() throws IOException {
+    MetricContext metricContext =
+        MetricContext.builder(this.getClass().getCanonicalName() + ".kafkaReporterTagsTest").build();
+    Counter counter = metricContext.counter("com.linkedin.example.counter");
+
+    Tag<?> tag1 = new Tag<>("tag1", "value1");
+    Tag<?> tag2 = new Tag<>("tag2", 2);
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaReporter kafkaReporter =
+        getBuilder(pusher).withTags(Lists.newArrayList(tag1, tag2)).build("localhost:0000",
"topic", new Properties());
+
+    counter.inc();
+
+    kafkaReporter.report(metricContext);
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MetricReport metricReport = nextReport(pusher.messageIterator());
+
+    Assert.assertEquals(4, metricReport.getTags().size());
+    Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey()));
+    Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString());
+    Assert.assertTrue(metricReport.getTags().containsKey(tag2.getKey()));
+    Assert.assertEquals(metricReport.getTags().get(tag2.getKey()), tag2.getValue().toString());
+  }
+
+  @Test
+  public void kafkaReporterContextTest() throws IOException {
+    Tag<?> tag1 = new Tag<>("tag1", "value1");
+    MetricContext context = MetricContext.builder("context").addTag(tag1).build();
+    Counter counter = context.counter("com.linkedin.example.counter");
+
+    MockKafkaPusher pusher = new MockKafkaPusher();
+    KafkaReporter kafkaReporter = getBuilderFromContext(pusher).build("localhost:0000", "topic",
new Properties());
+
+    counter.inc();
+
+    kafkaReporter.report(context);
+
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+    }
+
+    MetricReport metricReport = nextReport(pusher.messageIterator());
+
+    Assert.assertEquals(3, metricReport.getTags().size());
+    Assert.assertTrue(metricReport.getTags().containsKey(tag1.getKey()));
+    Assert.assertEquals(metricReport.getTags().get(tag1.getKey()), tag1.getValue().toString());
+
+  }
+
+  /**
+   * Expect a list of metrics with specific values.
+   * Fail if not all metrics are received, or some metric has the wrong value.
+   * @param report MetricReport.
+   * @param expected map of expected metric names and their values
+   * @throws IOException
+   */
+  private void expectMetricsWithValues(MetricReport report, Map<String, Double> expected)
throws IOException {
+    List<Metric> metricIterator = report.getMetrics();
+
+    for (Metric metric : metricIterator) {
+      if (expected.containsKey(metric.getName())) {
+        Assert.assertEquals(expected.get(metric.getName()), metric.getValue());
+        expected.remove(metric.getName());
+      }
+    }
+
+    Assert.assertTrue(expected.isEmpty());
+
+  }
+
+  /**
+   * Expect a set of metric names. Will fail if not all of these metrics are received.
+   * @param report MetricReport
+   * @param expected set of expected metric names
+   * @param strict if set to true, will fail if receiving any metric that is not expected
+   * @throws IOException
+   */
+  private void expectMetrics(MetricReport report, Set<String> expected, boolean strict)
throws IOException {
+    List<Metric> metricIterator = report.getMetrics();
+    for (Metric metric : metricIterator) {
+      //System.out.println(String.format("expectedSet.add(\"%s\")", metric.name));
+      if (expected.contains(metric.getName())) {
+        expected.remove(metric.getName());
+      } else if (strict && !metric.getName().contains(MetricContext.GOBBLIN_METRICS_NOTIFICATIONS_TIMER_NAME))
{
+        Assert.assertTrue(false, "Metric present in report not expected: " + metric.toString());
+      }
+    }
+    Assert.assertTrue(expected.isEmpty());
+  }
+
+  /**
+   * Extract the next metric from the Kafka iterator
+   * Assumes existence of the metric has already been checked.
+   * @param it Kafka ConsumerIterator
+   * @return next metric in the stream
+   * @throws IOException
+   */
+  protected MetricReport nextReport(Iterator<byte[]> it) throws IOException {
+    Assert.assertTrue(it.hasNext());
+    return MetricReportUtils.deserializeReportFromJson(new MetricReport(), it.next());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ee770f5c/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
new file mode 100644
index 0000000..71decbb
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/MockKafkaPusher.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.reporter;
+
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+import com.google.common.collect.Queues;
+
+import org.apache.gobblin.metrics.kafka.Pusher;
+
+
+/**
+ * Mock instance of {@link org.apache.gobblin.metrics.kafka.Pusher} used for testing.
+ */
+public class MockKafkaPusher implements Pusher {
+
+  Queue<byte[]> messages = Queues.newLinkedBlockingQueue();
+
+  public MockKafkaPusher() {
+  }
+
+  @Override
+  public void pushMessages(List<byte[]> messages) {
+    this.messages.addAll(messages);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  public Iterator<byte[]> messageIterator() {
+    return this.messages.iterator();
+  }
+
+}


Mime
View raw message