sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [1/2] git commit: SQOOP-684 Encode type of the job into executed map reduce job (Jarek Jarcec Cecho)
Date Fri, 09 Nov 2012 22:28:34 GMT
Updated Branches:
  refs/heads/sqoop2 fdfc18c83 -> 30a1af0eb


SQOOP-684 Encode type of the job into executed map reduce job
(Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/30a1af0e
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/30a1af0e
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/30a1af0e

Branch: refs/heads/sqoop2
Commit: 30a1af0ebff33d505d72956f5400e79254915e93
Parents: c47a153
Author: Bilung Lee <blee@apache.org>
Authored: Fri Nov 9 10:11:57 2012 -0800
Committer: Bilung Lee <blee@apache.org>
Committed: Fri Nov 9 10:11:57 2012 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/sqoop/core/CoreError.java |    7 ++-
 .../apache/sqoop/framework/ExecutionEngine.java    |   16 +----
 .../apache/sqoop/framework/FrameworkManager.java   |   16 +++--
 .../apache/sqoop/framework/SubmissionRequest.java  |   51 +++++++++++----
 .../execution/mapreduce/MRSubmissionRequest.java   |   12 +---
 .../mapreduce/MapreduceExecutionEngine.java        |   12 +---
 .../java/org/apache/sqoop/job/JobConstants.java    |    1 +
 .../apache/sqoop/job/mr/ConfigurationUtils.java    |    5 ++
 .../java/org/apache/sqoop/job/mr/SqoopMapper.java  |   25 ++++++-
 .../java/org/apache/sqoop/job/TestHdfsLoad.java    |    4 +
 .../java/org/apache/sqoop/job/TestMapReduce.java   |    3 +
 .../mapreduce/MapreduceSubmissionEngine.java       |    9 +--
 12 files changed, 100 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/core/src/main/java/org/apache/sqoop/core/CoreError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/core/CoreError.java b/core/src/main/java/org/apache/sqoop/core/CoreError.java
index 29c0809..08034ed 100644
--- a/core/src/main/java/org/apache/sqoop/core/CoreError.java
+++ b/core/src/main/java/org/apache/sqoop/core/CoreError.java
@@ -93,7 +93,12 @@ public enum CoreError implements ErrorCode {
   CORE_0021("Error occurs during partitioner run"),
 
   /** Unable to parse because it is not properly delimited */
-  CORE_0022("Unable to parse because it is not properly delimited");
+  CORE_0022("Unable to parse because it is not properly delimited"),
+
+  /** Unknown job type */
+  CORE_0023("Unknown job type"),
+
+  ;
 
   private final String message;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
index e1ccdf6..ae14d9a 100644
--- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
@@ -45,22 +45,10 @@ public abstract class ExecutionEngine {
    * Return new SubmissionRequest class or any subclass if it's needed by
    * execution and submission engine combination.
    *
-   * @param summary Submission summary
-   * @param connector Appropriate connector structure
-   * @param connectorConnection Connector connection configuration
-   * @param connectorJob Connector job configuration
-   * @param frameworkConnection Framework connection configuration
-   * @param frameworkJob Framework job configuration
    * @return New Submission request object
    */
-  public SubmissionRequest createSubmissionRequest(MSubmission summary,
-                                                   SqoopConnector connector,
-                                                   Object connectorConnection,
-                                                   Object connectorJob,
-                                                   Object frameworkConnection,
-                                                   Object frameworkJob) {
-    return new SubmissionRequest(summary, connector,
-      connectorConnection, connectorJob, frameworkConnection, frameworkJob);
+  public SubmissionRequest createSubmissionRequest() {
+    return new SubmissionRequest();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 575a8bb..d04a100 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -307,10 +307,16 @@ public final class FrameworkManager {
 
     // Create request object
     MSubmission summary = new MSubmission(jobId);
-    SubmissionRequest request = executionEngine.createSubmissionRequest(
-      summary, connector,
-      connectorConnection, connectorJob,
-      frameworkConnection, frameworkJob);
+    SubmissionRequest request = executionEngine.createSubmissionRequest();
+
+    // Save important variables to the submission request
+    request.setSummary(summary);
+    request.setConnector(connector);
+    request.setConfigConnectorConnection(connectorConnection);
+    request.setConfigConnectorJob(connectorJob);
+    request.setConfigFrameworkConnection(frameworkConnection);
+    request.setConfigFrameworkJob(frameworkJob);
+    request.setJobType(job.getType());
     request.setJobName(job.getName());
     request.setJobId(job.getPersistenceId());
 
@@ -329,6 +335,7 @@ public final class FrameworkManager {
     // Extra libraries that Sqoop code requires
     request.addJarForClass(JSONValue.class);
 
+    // Get connector callbacks
     switch (job.getType()) {
       case IMPORT:
         request.setConnectorCallbacks(connector.getImporter());
@@ -340,7 +347,6 @@ public final class FrameworkManager {
         throw  new SqoopException(FrameworkError.FRAMEWORK_0005,
           "Unsupported job type " + job.getType().name());
     }
-
     LOG.debug("Using callbacks: " + request.getConnectorCallbacks());
 
     // Initialize submission from connector perspective

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index 9f471b5..8392a10 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.framework;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.job.etl.CallbackBase;
+import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MSubmission;
 import org.apache.sqoop.utils.ClassUtils;
 
@@ -49,6 +50,11 @@ public class SubmissionRequest {
   long jobId;
 
   /**
+   * Job type
+   */
+  MJob.Type jobType;
+
+  /**
    * Connector instance associated with this submission request
    */
   SqoopConnector connector;
@@ -87,27 +93,20 @@ public class SubmissionRequest {
   String outputDirectory;
 
 
-  public SubmissionRequest(MSubmission submission,
-                           SqoopConnector connector,
-                           Object configConnectorConnection,
-                           Object configConnectorJob,
-                           Object configFrameworkConnection,
-                           Object configFrameworkJob) {
-    this.summary = submission;
-    this.connector = connector;
+  public SubmissionRequest() {
     this.jars = new LinkedList<String>();
     this.connectorContext = new MutableMapContext();
     this.frameworkContext = new MutableMapContext();
-    this.configConnectorConnection = configConnectorConnection;
-    this.configConnectorJob = configConnectorJob;
-    this.configFrameworkConnection = configFrameworkConnection;
-    this.configFrameworkJob = configFrameworkJob;
   }
 
   public MSubmission getSummary() {
     return summary;
   }
 
+  public void setSummary(MSubmission summary) {
+    this.summary = summary;
+  }
+
   public String getJobName() {
     return jobName;
   }
@@ -124,10 +123,22 @@ public class SubmissionRequest {
     this.jobId = jobId;
   }
 
+  public MJob.Type getJobType() {
+    return jobType;
+  }
+
+  public void setJobType(MJob.Type jobType) {
+    this.jobType = jobType;
+  }
+
   public SqoopConnector getConnector() {
     return connector;
   }
 
+  public void setConnector(SqoopConnector connector) {
+    this.connector = connector;
+  }
+
   public List<String> getJars() {
     return jars;
   }
@@ -156,18 +167,34 @@ public class SubmissionRequest {
     return configConnectorConnection;
   }
 
+  public void setConfigConnectorConnection(Object config) {
+    configConnectorConnection = config;
+  }
+
   public Object getConfigConnectorJob() {
     return configConnectorJob;
   }
 
+  public void setConfigConnectorJob(Object config) {
+    configConnectorJob = config;
+  }
+
   public Object getConfigFrameworkConnection() {
     return configFrameworkConnection;
   }
 
+  public void setConfigFrameworkConnection(Object config) {
+    configFrameworkConnection = config;
+  }
+
   public Object getConfigFrameworkJob() {
     return configFrameworkJob;
   }
 
+  public void setConfigFrameworkJob(Object config) {
+    configFrameworkJob = config;
+  }
+
   public MutableMapContext getConnectorContext() {
     return connectorContext;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
index 3f37222..32d598c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
@@ -21,9 +21,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.framework.SubmissionRequest;
-import org.apache.sqoop.model.MSubmission;
 
 /**
  * Map-reduce specific submission request containing all extra information
@@ -42,14 +40,8 @@ public class MRSubmissionRequest extends SubmissionRequest {
   Class<? extends Writable> outputKeyClass;
   Class<? extends Writable> outputValueClass;
 
-  public MRSubmissionRequest(MSubmission submission,
-                             SqoopConnector connector,
-                             Object configConnectorConnection,
-                             Object configConnectorJob,
-                             Object configFrameworkConnection,
-                             Object configFrameworkJob) {
-    super(submission, connector, configConnectorConnection, configConnectorJob,
-      configFrameworkConnection, configFrameworkJob);
+  public MRSubmissionRequest() {
+    super();
   }
 
   public Class<? extends InputFormat> getInputFormatClass() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 77ca59b..4a5b305 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.execution.mapreduce;
 
 import org.apache.hadoop.io.NullWritable;
 import org.apache.sqoop.common.MutableMapContext;
-import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.framework.ExecutionEngine;
 import org.apache.sqoop.framework.SubmissionRequest;
 import org.apache.sqoop.job.JobConstants;
@@ -29,7 +28,6 @@ import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
-import org.apache.sqoop.model.MSubmission;
 
 /**
  *
@@ -37,14 +35,8 @@ import org.apache.sqoop.model.MSubmission;
 public class MapreduceExecutionEngine extends ExecutionEngine {
 
   @Override
-  public SubmissionRequest createSubmissionRequest(MSubmission summary,
-                                                   SqoopConnector connector,
-                                                   Object connectorConnection,
-                                                   Object connectorJob,
-                                                   Object frameworkConnection,
-                                                   Object frameworkJob) {
-    return new MRSubmissionRequest(summary, connector, connectorConnection,
-      connectorJob, frameworkConnection, frameworkJob);
+  public SubmissionRequest createSubmissionRequest() {
+    return new MRSubmissionRequest();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index 19ac91e..58b2a42 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -27,6 +27,7 @@ public final class JobConstants extends Constants {
   public static final String PREFIX_JOB_CONFIG =
       ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
 
+  public static final String JOB_TYPE = PREFIX_JOB_CONFIG + "type";
 
   public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
       + "etl.partitioner";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
index ae647ce..4aa2128 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.job.mr;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.model.FormUtils;
+import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.utils.ClassUtils;
 
 /**
@@ -27,6 +28,10 @@ import org.apache.sqoop.utils.ClassUtils;
  */
 public final class ConfigurationUtils {
 
+  public static MJob.Type getJobType(Configuration configuration) {
+    return MJob.Type.valueOf(configuration.get(JobConstants.JOB_TYPE));
+  }
+
   public static Object getConnectorConnection(Configuration configuration) {
     return loadConfiguration(configuration,
       JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index 6892b4b..dbe832a 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -49,14 +49,31 @@ public class SqoopMapper
     String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
     Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
 
-    PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
-    Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
-    Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
+    // Objects that should be pass to the Executor execution
+    PrefixContext subContext = null;
+    Object configConnection = null;
+    Object configJob = null;
+
+    // Executor is in connector space for IMPORT and in framework space for EXPORT
+    switch (ConfigurationUtils.getJobType(conf)) {
+      case IMPORT:
+        subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
+        configConnection = ConfigurationUtils.getConnectorConnection(conf);
+        configJob = ConfigurationUtils.getConnectorJob(conf);
+        break;
+      case EXPORT:
+        subContext = new PrefixContext(conf, "");
+        configConnection = ConfigurationUtils.getFrameworkConnection(conf);
+        configJob = ConfigurationUtils.getFrameworkJob(conf);
+        break;
+      default:
+        throw new SqoopException(CoreError.CORE_0023);
+    }
 
     SqoopSplit split = context.getCurrentKey();
 
     try {
-      extractor.run(connectorContext, connectorConnection, connectorJob, split.getPartition(),
+      extractor.run(subContext, configConnection, configJob, split.getPartition(),
         new MapDataWriter(context));
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index 812dd8e..875a123 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -63,6 +63,7 @@ public class TestHdfsLoad extends TestCase {
     FileUtils.delete(outdir);
 
     Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "IMPORT");
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
@@ -80,6 +81,7 @@ public class TestHdfsLoad extends TestCase {
     FileUtils.delete(outdir);
 
     Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "IMPORT");
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
@@ -122,6 +124,7 @@ public class TestHdfsLoad extends TestCase {
     FileUtils.delete(outdir);
 
     Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "IMPORT");
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());
@@ -139,6 +142,7 @@ public class TestHdfsLoad extends TestCase {
     FileUtils.delete(outdir);
 
     Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "IMPORT");
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index e269899..6e49cc2 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -55,6 +55,7 @@ public class TestMapReduce extends TestCase {
 
   public void testInputFormat() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "IMPORT");
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     Job job = Job.getInstance(conf);
 
@@ -71,6 +72,7 @@ public class TestMapReduce extends TestCase {
 
   public void testMapper() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "IMPORT");
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
 
@@ -80,6 +82,7 @@ public class TestMapReduce extends TestCase {
 
   public void testOutputFormat() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(JobConstants.JOB_TYPE, "IMPORT");
     conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName());
     conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName());
     conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/30a1af0e/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index 15cb476..48dc073 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -119,11 +119,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
    */
   @Override
   public boolean isExecutionEngineSupported(Class executionEngineClass) {
-    if(executionEngineClass == MapreduceExecutionEngine.class) {
-      return true;
-    }
-
-    return false;
+    return executionEngineClass == MapreduceExecutionEngine.class;
   }
 
   /**
@@ -137,6 +133,9 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
     // Clone global configuration
     Configuration configuration = new Configuration(globalConfiguration);
 
+    // Serialize job type as it will be needed by underlying execution engine
+    configuration.set(JobConstants.JOB_TYPE, request.getJobType().name());
+
     // Serialize framework context into job configuration
     for(Map.Entry<String, String> entry: request.getFrameworkContext()) {
       configuration.set(entry.getKey(), entry.getValue());


Mime
View raw message