gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibuen...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-282] Azkaban templates
Date Wed, 11 Oct 2017 15:27:33 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 7d970606b -> a9c9f781f


[GOBBLIN-282] Azkaban templates

Closes #2135 from ibuenros/azkaban-templates


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a9c9f781
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a9c9f781
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a9c9f781

Branch: refs/heads/master
Commit: a9c9f781f43bef85f96178999e266bcb5b5fb3ff
Parents: 7d97060
Author: ibuenros <issac.buenrostro@gmail.com>
Authored: Wed Oct 11 08:27:25 2017 -0700
Committer: Issac Buenrostro <ibuenros@apache.org>
Committed: Wed Oct 11 08:27:25 2017 -0700

----------------------------------------------------------------------
 .../gobblin/azkaban/AzkabanJobLauncher.java     | 25 +++++++++++++++-----
 1 file changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a9c9f781/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index bdbb04f..20b630b 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.azkaban;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
@@ -30,6 +31,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.gobblin.runtime.job_catalog.PackagedTemplatesJobCatalogDecorator;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -42,6 +45,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -88,6 +92,7 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
   private static final Logger LOG = Logger.getLogger(AzkabanJobLauncher.class);
 
   public static final String GOBBLIN_LOG_LEVEL_KEY = "gobblin.log.levelOverride";
+  public static final String TEMPLATE_KEY = "gobblin.template.uri";
 
   private static final String HADOOP_FS_DEFAULT_NAME = "fs.default.name";
   private static final String AZKABAN_LINK_JOBEXEC_URL = "azkaban.link.jobexec.url";
@@ -158,30 +163,38 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
       this.props.setProperty("env." + HADOOP_TOKEN_FILE_LOCATION, tokenFile.getAbsolutePath());
     }
 
+    Properties jobProps = this.props;
+    if (jobProps.containsKey(TEMPLATE_KEY)) {
+      URI templateUri = new URI(jobProps.getProperty(TEMPLATE_KEY));
+      Config resolvedJob = new PackagedTemplatesJobCatalogDecorator().getTemplate(templateUri)
+          .getResolvedConfig(ConfigUtils.propertiesToConfig(jobProps));
+      jobProps = ConfigUtils.configToProperties(resolvedJob);
+    }
+
     List<Tag<?>> tags = Lists.newArrayList();
     tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
     RootMetricContext.get(tags);
-    GobblinMetrics.addCustomTagsToProperties(this.props, tags);
+    GobblinMetrics.addCustomTagsToProperties(jobProps, tags);
 
     // If the job launcher type is not specified in the job configuration,
     // override the default to use the MAPREDUCE launcher.
-    if (!this.props.containsKey(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY)) {
-      this.props.setProperty(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY,
+    if (!jobProps.containsKey(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY)) {
+      jobProps.setProperty(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY,
           JobLauncherFactory.JobLauncherType.MAPREDUCE.toString());
     }
 
     this.ownAzkabanSla = Long.parseLong(
-        this.props.getProperty(AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS, DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS));
+        jobProps.getProperty(AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS, DEFAULT_AZKABAN_GOBBLIN_JOB_SLA_IN_SECONDS));
 
     // Create a JobLauncher instance depending on the configuration. The same properties
object is
     // used for both system and job configuration properties because Azkaban puts configuration
     // properties in the .job file and in the .properties file into the same Properties object.
-    this.jobLauncher = this.closer.register(JobLauncherFactory.newJobLauncher(this.props,
this.props));
+    this.jobLauncher = this.closer.register(JobLauncherFactory.newJobLauncher(jobProps, jobProps));
 
     // Since Java classes cannot extend multiple classes and Azkaban jobs must extend AbstractJob,
we must use composition
     // verses extending ServiceBasedAppLauncher
     this.applicationLauncher =
-        this.closer.register(new ServiceBasedAppLauncher(this.props, "Azkaban-" + UUID.randomUUID()));
+        this.closer.register(new ServiceBasedAppLauncher(jobProps, "Azkaban-" + UUID.randomUUID()));
   }
 
   @Override


Mime
View raw message