sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [1/3] git commit: SQOOP-666 Separate execution engine module (Jarek Jarcec Cecho)
Date Tue, 06 Nov 2012 20:16:09 GMT
Updated Branches:
  refs/heads/sqoop2 0976713f0 -> 2255e721a


SQOOP-666 Separate execution engine module
(Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/2255e721
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/2255e721
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/2255e721

Branch: refs/heads/sqoop2
Commit: 2255e721a0d01a449dc0d2092c99091632ba5466
Parents: 25f3fd3
Author: Bilung Lee <blee@apache.org>
Authored: Tue Nov 6 11:40:42 2012 -0800
Committer: Bilung Lee <blee@apache.org>
Committed: Tue Nov 6 11:40:42 2012 -0800

----------------------------------------------------------------------
 core/pom.xml                                       |   14 +-
 .../apache/sqoop/framework/ExecutionEngine.java    |   72 +++
 .../apache/sqoop/framework/FrameworkConstants.java |    9 +
 .../org/apache/sqoop/framework/FrameworkError.java |    4 +
 .../apache/sqoop/framework/FrameworkManager.java   |  136 ++++--
 .../apache/sqoop/framework/SubmissionEngine.java   |    9 +
 .../apache/sqoop/framework/SubmissionRequest.java  |   60 +---
 dist/src/main/server/conf/sqoop.properties         |    5 +
 execution/mapreduce/pom.xml                        |   67 +++
 .../execution/mapreduce/MRSubmissionRequest.java   |  110 +++++
 .../mapreduce/MapreduceExecutionEngine.java        |   74 +++
 .../java/org/apache/sqoop/job/JobConstants.java    |   82 ++++
 .../java/org/apache/sqoop/job/PrefixContext.java   |   62 +++
 .../sqoop/job/etl/HdfsSequenceImportLoader.java    |  108 ++++
 .../apache/sqoop/job/etl/HdfsTextImportLoader.java |  103 ++++
 .../main/java/org/apache/sqoop/job/io/Data.java    |  378 +++++++++++++++
 .../java/org/apache/sqoop/job/io/FieldTypes.java   |   42 ++
 .../apache/sqoop/job/mr/ConfigurationUtils.java    |   65 +++
 .../apache/sqoop/job/mr/SqoopFileOutputFormat.java |   69 +++
 .../org/apache/sqoop/job/mr/SqoopInputFormat.java  |  118 +++++
 .../java/org/apache/sqoop/job/mr/SqoopMapper.java  |  109 +++++
 .../apache/sqoop/job/mr/SqoopNullOutputFormat.java |   77 +++
 .../job/mr/SqoopOutputFormatLoadExecutor.java      |  228 +++++++++
 .../java/org/apache/sqoop/job/mr/SqoopReducer.java |   35 ++
 .../java/org/apache/sqoop/job/mr/SqoopSplit.java   |   82 ++++
 .../test/java/org/apache/sqoop/job/FileUtils.java  |   69 +++
 .../test/java/org/apache/sqoop/job/JobUtils.java   |   69 +++
 .../java/org/apache/sqoop/job/TestHdfsLoad.java    |  232 +++++++++
 .../java/org/apache/sqoop/job/TestJobEngine.java   |  196 ++++++++
 .../java/org/apache/sqoop/job/TestMapReduce.java   |  229 +++++++++
 .../java/org/apache/sqoop/job/io/TestData.java     |   75 +++
 execution/pom.xml                                  |   36 ++
 pom.xml                                            |    1 +
 submission/mapreduce/pom.xml                       |    6 +
 .../mapreduce/MapreduceSubmissionEngine.java       |   20 +-
 35 files changed, 2941 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 028c240..0732b2c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -36,29 +36,23 @@ limitations under the License.
       <groupId>org.apache.sqoop</groupId>
       <artifactId>sqoop-spi</artifactId>
     </dependency>
+
     <dependency>
       <groupId>org.apache.sqoop</groupId>
       <artifactId>sqoop-common</artifactId>
     </dependency>
+
     <dependency>
       <groupId>commons-dbcp</groupId>
       <artifactId>commons-dbcp</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-      <scope>provided</scope>
-    </dependency>
+
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
new file mode 100644
index 0000000..e1ccdf6
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework;
+
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.model.MSubmission;
+
+/**
+ * Execution engine drive execution of sqoop submission (job). It's responsible
+ * for executing all defined steps in the import/export workflow.
+ */
+public abstract class ExecutionEngine {
+
+  /**
+   * Initialize execution engine
+   *
+   * @param context Configuration context
+   */
+  public void initialize(ImmutableContext context, String prefix) {
+  }
+
+  /**
+   * Destroy execution engine when stopping server
+   */
+  public void destroy() {
+  }
+
+  /**
+   * Return new SubmissionRequest class or any subclass if it's needed by
+   * execution and submission engine combination.
+   *
+   * @param summary Submission summary
+   * @param connector Appropriate connector structure
+   * @param connectorConnection Connector connection configuration
+   * @param connectorJob Connector job configuration
+   * @param frameworkConnection Framework connection configuration
+   * @param frameworkJob Framework job configuration
+   * @return New Submission request object
+   */
+  public SubmissionRequest createSubmissionRequest(MSubmission summary,
+                                                   SqoopConnector connector,
+                                                   Object connectorConnection,
+                                                   Object connectorJob,
+                                                   Object frameworkConnection,
+                                                   Object frameworkJob) {
+    return new SubmissionRequest(summary, connector,
+      connectorConnection, connectorJob, frameworkConnection, frameworkJob);
+  }
+
+  /**
+   * Prepare given submission request for import submission.
+   *
+   * @param request Submission request
+   */
+  public abstract void prepareImportSubmission(SubmissionRequest request);
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
index d6e70ca..32da4e8 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkConstants.java
@@ -29,6 +29,9 @@ public final class FrameworkConstants {
   public static final String PREFIX_SUBMISSION_CONFIG =
     ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "submission.";
 
+  public static final String PREFIX_EXECUTION_CONFIG =
+    ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "execution.";
+
   public static final String SYSCFG_SUBMISSION_ENGINE =
     PREFIX_SUBMISSION_CONFIG + "engine";
 
@@ -50,6 +53,12 @@ public final class FrameworkConstants {
   public static final String SYSCFG_SUBMISSION_UPDATE_SLEEP =
     PREFIX_SUBMISSION_UPDATE_CONFIG + "sleep";
 
+  public static final String SYSCFG_EXECUTION_ENGINE =
+    PREFIX_EXECUTION_CONFIG + "engine";
+
+  public static final String PREFIX_EXECUTION_ENGINE_CONFIG =
+    SYSCFG_EXECUTION_ENGINE + ".";
+
   // Connection/Job Configuration forms
 
   public static final String FORM_SECURITY =

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
index 19d0d87..4277311 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkError.java
@@ -38,6 +38,10 @@ public enum FrameworkError implements ErrorCode {
 
   FRAMEWORK_0006("Can't bootstrap job"),
 
+  FRAMEWORK_0007("Invalid execution engine"),
+
+  FRAMEWORK_0008("Invalid combination of submission and execution engines"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 604d403..7e10ddc 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -19,7 +19,6 @@ package org.apache.sqoop.framework;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.ConnectorManager;
 import org.apache.sqoop.connector.spi.SqoopConnector;
@@ -27,11 +26,8 @@ import org.apache.sqoop.core.SqoopConfiguration;
 import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
 import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
-import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.etl.CallbackBase;
 import org.apache.sqoop.job.etl.Destroyer;
-import org.apache.sqoop.job.etl.HdfsTextImportLoader;
-import org.apache.sqoop.job.etl.Importer;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MConnection;
@@ -57,47 +53,93 @@ import java.util.ResourceBundle;
 /**
  * Manager for Sqoop framework itself.
  *
- * All Sqoop internals (job execution engine, metadata) should be handled
- * within this manager.
+ * All Sqoop internals are handled in this class:
+ * * Submission engine
+ * * Execution engine
+ * * Framework metadata
  *
  * Current implementation of entire submission engine is using repository
- * for keep of current track, so that server might be restarted at any time
- * without any affect on running jobs. This approach however might not be the
- * fastest way and we might want to introduce internal structures with running
- * jobs in case that this approach will be too slow.
+ * for keeping track of running submissions. Thus, server might be restarted at
+ * any time without any affect on running jobs. This approach however might not
+ * be the fastest way and we might want to introduce internal structures for
+ * running jobs in case that this approach will be too slow.
  */
 public final class FrameworkManager {
 
   private static final Logger LOG = Logger.getLogger(FrameworkManager.class);
 
+  /**
+   * Default interval for purging old submissions from repository.
+   */
   private static final long DEFAULT_PURGE_THRESHOLD = 24*60*60*1000;
 
+  /**
+   * Default sleep interval for purge thread.
+   */
   private static final long DEFAULT_PURGE_SLEEP = 24*60*60*1000;
 
+  /**
+   * Default interval for update thread.
+   */
   private static final long DEFAULT_UPDATE_SLEEP = 60*5*1000;
 
+  /**
+   * Framework metadata structures in MForm format
+   */
   private static MFramework mFramework;
 
+  /**
+   * Validator instance
+   */
   private static final Validator validator;
 
+  /**
+   * Configured submission engine instance
+   */
   private static SubmissionEngine submissionEngine;
 
+  /**
+   * Configured execution engine instance
+   */
+  private static ExecutionEngine executionEngine;
+
+  /**
+   * Purge thread that will periodically remove old submissions from repository.
+   */
   private static PurgeThread purgeThread = null;
 
+  /**
+   * Update thread that will periodically check status of running submissions.
+   */
   private static UpdateThread updateThread = null;
 
+  /**
+   * Synchronization variable between threads.
+   */
   private static boolean running = true;
 
+  /**
+   * Specifies how old submissions should be removed from repository.
+   */
   private static long purgeThreshold;
 
+  /**
+   * Number of milliseconds for purge thread to sleep.
+   */
   private static long purgeSleep;
 
+  /**
+   * Number of milliseconds for update thread to slepp.
+   */
   private static long updateSleep;
 
+  /**
+   * Mutex for creating new submissions. We're not allowing more then one
+   * running submission for one job.
+   */
   private static final Object submissionMutex = new Object();
 
   static {
-
     MConnectionForms connectionForms = new MConnectionForms(
       FormUtils.toForms(getConnectionConfigurationClass())
     );
@@ -123,22 +165,31 @@ public final class FrameworkManager {
     String submissionEngineClassName =
       context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
 
-    Class<?> submissionEngineClass =
-        ClassUtils.loadClass(submissionEngineClassName);
-
-    if (submissionEngineClass == null) {
+    submissionEngine = (SubmissionEngine) ClassUtils.instantiate(submissionEngineClassName);
+    if(submissionEngine == null) {
       throw new SqoopException(FrameworkError.FRAMEWORK_0001,
-          submissionEngineClassName);
+        submissionEngineClassName);
     }
 
-    try {
-      submissionEngine = (SubmissionEngine)submissionEngineClass.newInstance();
-    } catch (Exception ex) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0001,
-          submissionEngineClassName, ex);
+    submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
+
+    // Execution engine
+    String executionEngineClassName =
+      context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
+
+    executionEngine = (ExecutionEngine) ClassUtils.instantiate(executionEngineClassName);
+    if(executionEngine == null) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0007,
+        executionEngineClassName);
     }
 
-    submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
+    // We need to make sure that user has configured compatible combination of
+    // submission engine and execution engine
+    if(! submissionEngine.isExecutionEngineSupported(executionEngine.getClass())) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0008);
+    }
+
+    executionEngine.initialize(context, FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
 
     // Set up worker threads
     purgeThreshold = context.getLong(
@@ -161,7 +212,6 @@ public final class FrameworkManager {
     updateThread = new UpdateThread();
     updateThread.start();
 
-
     LOG.info("Submission manager initialized: OK");
   }
 
@@ -189,6 +239,10 @@ public final class FrameworkManager {
     if(submissionEngine != null) {
       submissionEngine.destroy();
     }
+
+    if(executionEngine != null) {
+      executionEngine.destroy();
+    }
   }
 
   public static Validator getValidator() {
@@ -253,22 +307,26 @@ public final class FrameworkManager {
 
     // Create request object
     MSubmission summary = new MSubmission(jobId);
-    SubmissionRequest request = new SubmissionRequest(summary, connector,
-      connectorConnection, connectorJob, frameworkConnection, frameworkJob);
+    SubmissionRequest request = executionEngine.createSubmissionRequest(
+      summary, connector,
+      connectorConnection, connectorJob,
+      frameworkConnection, frameworkJob);
     request.setJobName(job.getName());
 
     // Let's register all important jars
     // sqoop-common
-    request.addJar(ClassUtils.jarForClass(MapContext.class));
+    request.addJarForClass(MapContext.class);
     // sqoop-core
-    request.addJar(ClassUtils.jarForClass(FrameworkManager.class));
+    request.addJarForClass(FrameworkManager.class);
     // sqoop-spi
-    request.addJar(ClassUtils.jarForClass(SqoopConnector.class));
-    // particular connector in use
-    request.addJar(ClassUtils.jarForClass(connector.getClass()));
+    request.addJarForClass(SqoopConnector.class);
+    // Execution engine jar
+    request.addJarForClass(executionEngine.getClass());
+    // Connector in use
+    request.addJarForClass(connector.getClass());
 
     // Extra libraries that Sqoop code requires
-    request.addJar(ClassUtils.jarForClass(JSONValue.class));
+    request.addJarForClass(JSONValue.class);
 
     switch (job.getType()) {
       case IMPORT:
@@ -308,7 +366,7 @@ public final class FrameworkManager {
     // Bootstrap job from framework perspective
     switch (job.getType()) {
       case IMPORT:
-        bootstrapImportSubmission(request);
+        prepareImportSubmission(request);
         break;
       case EXPORT:
         // TODO(jarcec): Implement export path
@@ -342,22 +400,14 @@ public final class FrameworkManager {
     return summary;
   }
 
-  private static void bootstrapImportSubmission(SubmissionRequest request) {
-    Importer importer = (Importer)request.getConnectorCallbacks();
+  private static void prepareImportSubmission(SubmissionRequest request) {
     ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request.getConfigFrameworkJob();
 
     // Initialize the map-reduce part (all sort of required classes, ...)
     request.setOutputDirectory(jobConfiguration.outputDirectory);
 
-    // Defaults for classes are mostly fine for now.
-
-
-    // Set up framework context
-    MutableMapContext context = request.getFrameworkContext();
-    context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName());
-    context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
-    context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
-    context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+    // Delegate rest of the job to execution engine
+    executionEngine.prepareImportSubmission(request);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
index f4ad3f5..71e4ec9 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
@@ -42,6 +42,15 @@ public abstract class SubmissionEngine {
   }
 
   /**
+   * Callback to verify that configured submission engine and execution engine
+   * are compatible.
+   *
+   * @param executionEngineClass Configured execution class.
+   * @return True if such execution engine is supported
+   */
+  public abstract boolean isExecutionEngineSupported(Class executionEngineClass);
+
+  /**
    * Submit new job to remote (hadoop) cluster. This method *must* fill
    * submission.getSummary.setExternalId(), otherwise Sqoop framework won't
    * be able to track progress on this job!

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index 27b0566..c70a5cc 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -17,15 +17,11 @@
  */
 package org.apache.sqoop.framework;
 
-import org.apache.hadoop.io.NullWritable;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.job.etl.CallbackBase;
-import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
-import org.apache.sqoop.job.mr.SqoopInputFormat;
-import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.utils.ClassUtils;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -85,20 +81,6 @@ public class SubmissionRequest {
    */
   String outputDirectory;
 
-  /**
-   * Map-reduce specific options.
-   *
-   * I'm using strings so that this class won't have direct dependency on
-   * hadoop libraries.
-   */
-  Class inputFormatClass;
-  Class mapperClass;
-  Class mapOutputKeyClass;
-  Class mapOutputValueClass;
-  Class outputFormatClass;
-  Class outputKeyClass;
-  Class outputValueClass;
-
 
   public SubmissionRequest(MSubmission submission,
                            SqoopConnector connector,
@@ -115,15 +97,6 @@ public class SubmissionRequest {
     this.configConnectorJob = configConnectorJob;
     this.configFrameworkConnection = configFrameworkConnection;
     this.configFrameworkJob = configFrameworkJob;
-
-    // TODO(Jarcec): Move this to job execution engine
-    this.inputFormatClass = SqoopInputFormat.class;
-    this.mapperClass = SqoopMapper.class;
-    this.mapOutputKeyClass = Data.class;
-    this.mapOutputValueClass = NullWritable.class;
-    this.outputFormatClass = SqoopFileOutputFormat.class;
-    this.outputKeyClass = Data.class;
-    this.outputValueClass = NullWritable.class;
   }
 
   public MSubmission getSummary() {
@@ -150,6 +123,10 @@ public class SubmissionRequest {
     jars.add(jar);
   }
 
+  public void addJarForClass(Class klass) {
+    jars.add(ClassUtils.jarForClass(klass));
+  }
+
   public void addJars(List<String> jars) {
     this.jars.addAll(jars);
   }
@@ -193,31 +170,4 @@ public class SubmissionRequest {
   public void setOutputDirectory(String outputDirectory) {
     this.outputDirectory = outputDirectory;
   }
-  public Class getInputFormatClass() {
-    return inputFormatClass;
-  }
-
-  public Class getMapperClass() {
-    return mapperClass;
-  }
-
-  public Class getMapOutputKeyClass() {
-    return mapOutputKeyClass;
-  }
-
-  public Class getMapOutputValueClass() {
-    return mapOutputValueClass;
-  }
-
-  public Class getOutputFormatClass() {
-    return outputFormatClass;
-  }
-
-  public Class getOutputKeyClass() {
-    return outputKeyClass;
-  }
-
-  public Class getOutputValueClass() {
-    return outputValueClass;
-  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/dist/src/main/server/conf/sqoop.properties
----------------------------------------------------------------------
diff --git a/dist/src/main/server/conf/sqoop.properties b/dist/src/main/server/conf/sqoop.properties
index d429c3a..5131aad 100755
--- a/dist/src/main/server/conf/sqoop.properties
+++ b/dist/src/main/server/conf/sqoop.properties
@@ -108,3 +108,8 @@ org.apache.sqoop.submission.engine=org.apache.sqoop.submission.mapreduce.Mapredu
 
 # Hadoop configuration directory
 org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/etc/hadoop/conf/
+
+#
+# Execution engine configuration
+#
+org.apache.sqoop.execution.engine=org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml
new file mode 100644
index 0000000..e529f55
--- /dev/null
+++ b/execution/mapreduce/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.sqoop</groupId>
+    <artifactId>execution</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop.execution</groupId>
+  <artifactId>sqoop-execution-mapreduce</artifactId>
+  <version>2.0.0-SNAPSHOT</version>
+  <name>Sqoop Mapreduce Execution Engine</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-core</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-core</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
new file mode 100644
index 0000000..3f37222
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MRSubmissionRequest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.execution.mapreduce;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.model.MSubmission;
+
+/**
+ * Map-reduce specific submission request containing all extra information
+ * needed for bootstrapping map-reduce job.
+ */
+public class MRSubmissionRequest extends SubmissionRequest {
+
+  /**
+   * Map-reduce specific options.
+   */
+  Class<? extends InputFormat> inputFormatClass;
+  Class<? extends Mapper> mapperClass;
+  Class<? extends Writable> mapOutputKeyClass;
+  Class<? extends Writable> mapOutputValueClass;
+  Class<? extends OutputFormat> outputFormatClass;
+  Class<? extends Writable> outputKeyClass;
+  Class<? extends Writable> outputValueClass;
+
+  public MRSubmissionRequest(MSubmission submission,
+                             SqoopConnector connector,
+                             Object configConnectorConnection,
+                             Object configConnectorJob,
+                             Object configFrameworkConnection,
+                             Object configFrameworkJob) {
+    super(submission, connector, configConnectorConnection, configConnectorJob,
+      configFrameworkConnection, configFrameworkJob);
+  }
+
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  public void setInputFormatClass(Class<? extends InputFormat> inputFormatClass) {
+    this.inputFormatClass = inputFormatClass;
+  }
+
+  public Class<? extends Mapper> getMapperClass() {
+    return mapperClass;
+  }
+
+  public void setMapperClass(Class<? extends Mapper> mapperClass) {
+    this.mapperClass = mapperClass;
+  }
+
+  public Class<? extends Writable> getMapOutputKeyClass() {
+    return mapOutputKeyClass;
+  }
+
+  public void setMapOutputKeyClass(Class<? extends Writable> mapOutputKeyClass) {
+    this.mapOutputKeyClass = mapOutputKeyClass;
+  }
+
+  public Class<? extends Writable> getMapOutputValueClass() {
+    return mapOutputValueClass;
+  }
+
+  public void setMapOutputValueClass(Class<? extends Writable> mapOutputValueClass) {
+    this.mapOutputValueClass = mapOutputValueClass;
+  }
+
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return outputFormatClass;
+  }
+
+  public void setOutputFormatClass(Class<? extends OutputFormat> outputFormatClass) {
+    this.outputFormatClass = outputFormatClass;
+  }
+
+  public Class<? extends Writable> getOutputKeyClass() {
+    return outputKeyClass;
+  }
+
+  public void setOutputKeyClass(Class<? extends Writable> outputKeyClass) {
+    this.outputKeyClass = outputKeyClass;
+  }
+
+  public Class<? extends Writable> getOutputValueClass() {
+    return outputValueClass;
+  }
+
+  public void setOutputValueClass(Class<? extends Writable> outputValueClass) {
+    this.outputValueClass = outputValueClass;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
new file mode 100644
index 0000000..77ca59b
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.execution.mapreduce;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.connector.spi.SqoopConnector;
+import org.apache.sqoop.framework.ExecutionEngine;
+import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.etl.HdfsTextImportLoader;
+import org.apache.sqoop.job.etl.Importer;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
+import org.apache.sqoop.job.mr.SqoopInputFormat;
+import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.model.MSubmission;
+
+/**
+ *
+ */
+public class MapreduceExecutionEngine extends ExecutionEngine {
+
+  @Override
+  public SubmissionRequest createSubmissionRequest(MSubmission summary,
+                                                   SqoopConnector connector,
+                                                   Object connectorConnection,
+                                                   Object connectorJob,
+                                                   Object frameworkConnection,
+                                                   Object frameworkJob) {
+    return new MRSubmissionRequest(summary, connector, connectorConnection,
+      connectorJob, frameworkConnection, frameworkJob);
+  }
+
+  @Override
+  public void prepareImportSubmission(SubmissionRequest gRequest) {
+    MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
+
+    // Configure map-reduce classes for import
+    request.setInputFormatClass(SqoopInputFormat.class);
+
+    request.setMapperClass(SqoopMapper.class);
+    request.setMapOutputKeyClass(Data.class);
+    request.setMapOutputValueClass(NullWritable.class);
+
+    request.setOutputFormatClass(SqoopFileOutputFormat.class);
+    request.setOutputKeyClass(Data.class);
+    request.setOutputValueClass(NullWritable.class);
+
+    Importer importer = (Importer)request.getConnectorCallbacks();
+
+    // Set up framework context
+    MutableMapContext context = request.getFrameworkContext();
+    context.setString(JobConstants.JOB_ETL_PARTITIONER, importer.getPartitioner().getName());
+    context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
+    context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
+    context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
new file mode 100644
index 0000000..19ac91e
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.sqoop.core.ConfigurationConstants;
+
+public final class JobConstants extends Constants {
+  /**
+   * All job related configuration is prefixed with this:
+   * <tt>org.apache.sqoop.job.</tt>
+   */
+  public static final String PREFIX_JOB_CONFIG =
+      ConfigurationConstants.PREFIX_GLOBAL_CONFIG + "job.";
+
+
+  public static final String JOB_ETL_PARTITIONER = PREFIX_JOB_CONFIG
+      + "etl.partitioner";
+
+  public static final String JOB_ETL_EXTRACTOR = PREFIX_JOB_CONFIG
+      + "etl.extractor";
+
+  public static final String JOB_ETL_LOADER = PREFIX_JOB_CONFIG
+      + "etl.loader";
+
+  public static final String JOB_ETL_DESTROYER = PREFIX_JOB_CONFIG
+      + "etl.destroyer";
+
+
+  public static final String JOB_MR_OUTPUT_FILE = PREFIX_JOB_CONFIG
+      + "mr.output.file";
+
+  public static final String JOB_MR_OUTPUT_CODEC = PREFIX_JOB_CONFIG
+      + "mr.output.codec";
+
+
+  public static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION =
+    PREFIX_JOB_CONFIG + "config.class.connector.connection";
+
+  public static final String JOB_CONFIG_CLASS_CONNECTOR_JOB =
+    PREFIX_JOB_CONFIG + "config.class.connector.job";
+
+  public static final String JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION =
+    PREFIX_JOB_CONFIG + "config.class.framework.connection";
+
+  public static final String JOB_CONFIG_CLASS_FRAMEWORK_JOB =
+    PREFIX_JOB_CONFIG + "config.class.framework.job";
+
+  public static final String JOB_CONFIG_CONNECTOR_CONNECTION =
+    PREFIX_JOB_CONFIG + "config.connector.connection";
+
+  public static final String JOB_CONFIG_CONNECTOR_JOB =
+    PREFIX_JOB_CONFIG + "config.connector.job";
+
+  public static final String JOB_CONFIG_FRAMEWORK_CONNECTION =
+    PREFIX_JOB_CONFIG + "config.framework.connection";
+
+  public static final String JOB_CONFIG_FRAMEWORK_JOB =
+    PREFIX_JOB_CONFIG + "config.framework.job";
+
+  public static final String PREFIX_CONNECTOR_CONTEXT =
+    PREFIX_JOB_CONFIG + "connector.context.";
+
+
+  private JobConstants() {
+    // Disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
new file mode 100644
index 0000000..5488b46
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/PrefixContext.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.common.ImmutableContext;
+
+/**
+ * Implementation of immutable context that is based on Hadoop configuration
+ * object. Each context property is prefixed with special prefix and loaded
+ * directly.
+ */
+public class PrefixContext implements ImmutableContext {
+
+  Configuration configuration;
+  String prefix;
+
+  public PrefixContext(Configuration configuration, String prefix) {
+    this.configuration = configuration;
+    this.prefix = prefix;
+  }
+
+  @Override
+  public String getString(String key) {
+    return configuration.get(prefix + key);
+  }
+
+  @Override
+  public String getString(String key, String defaultValue) {
+    return configuration.get(prefix + key, defaultValue);
+  }
+
+  @Override
+  public long getLong(String key, long defaultValue) {
+    return configuration.getLong(prefix + key, defaultValue);
+  }
+
+  @Override
+  public int getInt(String key, int defaultValue) {
+    return  configuration.getInt(prefix + key, defaultValue);
+  }
+
+  @Override
+  public boolean getBoolean(String key, boolean defaultValue) {
+    return configuration.getBoolean(prefix + key, defaultValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
new file mode 100644
index 0000000..1235d1d
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.utils.ClassUtils;
+
+public class HdfsSequenceImportLoader extends Loader {
+
+  public static final String EXTENSION = ".seq";
+
+  private final char fieldDelimiter;
+
+  public HdfsSequenceImportLoader() {
+    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+  }
+
+  @Override
+  public void run(ImmutableContext context, DataReader reader) {
+    reader.setFieldDelimiter(fieldDelimiter);
+
+    Configuration conf = new Configuration();
+//    Configuration conf = ((EtlContext)context).getConfiguration();
+    String filename =
+        context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
+    String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
+
+    CompressionCodec codec = null;
+    if (codecname != null) {
+      Class<?> clz = ClassUtils.loadClass(codecname);
+      if (clz == null) {
+        throw new SqoopException(CoreError.CORE_0009, codecname);
+      }
+
+      try {
+        codec = (CompressionCodec) clz.newInstance();
+        if (codec instanceof Configurable) {
+          ((Configurable) codec).setConf(conf);
+        }
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0010, codecname, e);
+      }
+    }
+
+    filename += EXTENSION;
+
+    try {
+      Path filepath = new Path(filename);
+      SequenceFile.Writer filewriter;
+      if (codec != null) {
+        filewriter = SequenceFile.createWriter(conf,
+            SequenceFile.Writer.file(filepath),
+            SequenceFile.Writer.keyClass(Text.class),
+            SequenceFile.Writer.valueClass(NullWritable.class),
+            SequenceFile.Writer.compression(CompressionType.BLOCK, codec));
+      } else {
+        filewriter = SequenceFile.createWriter(conf,
+          SequenceFile.Writer.file(filepath),
+          SequenceFile.Writer.keyClass(Text.class),
+          SequenceFile.Writer.valueClass(NullWritable.class),
+          SequenceFile.Writer.compression(CompressionType.NONE));
+      }
+
+      String csv;
+      Text text = new Text();
+      while ((csv = reader.readCsvRecord()) != null) {
+        text.set(csv);
+        filewriter.append(text, NullWritable.get());
+      }
+      filewriter.close();
+
+    } catch (IOException e) {
+      throw new SqoopException(CoreError.CORE_0018, e);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
new file mode 100644
index 0000000..36aa11f
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.utils.ClassUtils;
+
+public class HdfsTextImportLoader extends Loader {
+
+  private final char fieldDelimiter;
+  private final char recordDelimiter;
+
+  public HdfsTextImportLoader() {
+    fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER;
+    recordDelimiter = Data.DEFAULT_RECORD_DELIMITER;
+  }
+
+  @Override
+  public void run(ImmutableContext context, DataReader reader) {
+    reader.setFieldDelimiter(fieldDelimiter);
+
+    Configuration conf = new Configuration();
+//    Configuration conf = ((EtlContext)context).getConfiguration();
+    String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
+    String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
+
+    CompressionCodec codec = null;
+    if (codecname != null) {
+      Class<?> clz = ClassUtils.loadClass(codecname);
+      if (clz == null) {
+        throw new SqoopException(CoreError.CORE_0009, codecname);
+      }
+
+      try {
+        codec = (CompressionCodec) clz.newInstance();
+        if (codec instanceof Configurable) {
+          ((Configurable) codec).setConf(conf);
+        }
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0010, codecname, e);
+      }
+
+      filename += codec.getDefaultExtension();
+    }
+
+    try {
+      Path filepath = new Path(filename);
+      FileSystem fs = filepath.getFileSystem(conf);
+
+      BufferedWriter filewriter;
+      DataOutputStream filestream = fs.create(filepath, false);
+      if (codec != null) {
+        filewriter = new BufferedWriter(new OutputStreamWriter(
+            codec.createOutputStream(filestream, codec.createCompressor()),
+            Data.CHARSET_NAME));
+      } else {
+        filewriter = new BufferedWriter(new OutputStreamWriter(
+            filestream, Data.CHARSET_NAME));
+      }
+
+      String csv;
+      while ((csv = reader.readCsvRecord()) != null) {
+        filewriter.write(csv + recordDelimiter);
+      }
+      filewriter.close();
+
+    } catch (IOException e) {
+      throw new SqoopException(CoreError.CORE_0018, e);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
new file mode 100644
index 0000000..4ddd132
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/Data.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+
+public class Data implements WritableComparable<Data> {
+
+  // The content is an Object to accommodate different kinds of data.
+  // For example, it can be:
+  // - Object[] for an array of object record
+  // - String for a text of CSV record
+  private Object content = null;
+
+  public static final int EMPTY_DATA = 0;
+  public static final int CSV_RECORD = 1;
+  public static final int ARRAY_RECORD = 2;
+  private int type = EMPTY_DATA;
+
+  public static final String CHARSET_NAME = "UTF-8";
+
+  public static final char DEFAULT_RECORD_DELIMITER = '\n';
+  public static final char DEFAULT_FIELD_DELIMITER = ',';
+  public static final char DEFAULT_STRING_DELIMITER = '\'';
+  public static final char DEFAULT_STRING_ESCAPE = '\\';
+  private char fieldDelimiter = DEFAULT_FIELD_DELIMITER;
+  private char stringDelimiter = DEFAULT_STRING_DELIMITER;
+  private char stringEscape = DEFAULT_STRING_ESCAPE;
+  private String escapedStringDelimiter = String.valueOf(new char[] {
+      stringEscape, stringDelimiter
+  });
+
+  public void setFieldDelimiter(char fieldDelimiter) {
+    this.fieldDelimiter = fieldDelimiter;
+  }
+
+  public void setContent(Object content, int type) {
+    switch (type) {
+    case EMPTY_DATA:
+    case CSV_RECORD:
+    case ARRAY_RECORD:
+      this.type = type;
+      this.content = content;
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  public Object getContent(int targetType) {
+    switch (targetType) {
+    case CSV_RECORD:
+      return format();
+    case ARRAY_RECORD:
+      return parse();
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(targetType));
+    }
+  }
+
+  public int getType() {
+    return type;
+  }
+
+  public boolean isEmpty() {
+    return (type == EMPTY_DATA);
+  }
+
+  @Override
+  public String toString() {
+    return (String)getContent(CSV_RECORD);
+  }
+
+  @Override
+  public int compareTo(Data other) {
+    byte[] myBytes = toString().getBytes(Charset.forName(CHARSET_NAME));
+    byte[] otherBytes = other.toString().getBytes(
+        Charset.forName(CHARSET_NAME));
+    return WritableComparator.compareBytes(
+        myBytes, 0, myBytes.length, otherBytes, 0, otherBytes.length);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Data)) {
+      return false;
+    }
+
+    Data data = (Data)other;
+    if (type != data.getType()) {
+      return false;
+    }
+
+    return toString().equals(data.toString());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = super.hashCode();
+    switch (type) {
+    case CSV_RECORD:
+      result += 31 * content.hashCode();
+      return result;
+    case ARRAY_RECORD:
+      Object[] array = (Object[])content;
+      for (int i = 0; i < array.length; i++) {
+        result += 31 * array[i].hashCode();
+      }
+      return result;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    type = readType(in);
+    switch (type) {
+    case CSV_RECORD:
+      readCsv(in);
+      break;
+    case ARRAY_RECORD:
+      readArray(in);
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    writeType(out, type);
+    switch (type) {
+    case CSV_RECORD:
+      writeCsv(out);
+      break;
+    case ARRAY_RECORD:
+      writeArray(out);
+      break;
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  private int readType(DataInput in) throws IOException {
+    return WritableUtils.readVInt(in);
+  }
+
+  private void writeType(DataOutput out, int type) throws IOException {
+    WritableUtils.writeVInt(out, type);
+  }
+
+  private void readCsv(DataInput in) throws IOException {
+    content = in.readUTF();
+  }
+
+  private void writeCsv(DataOutput out) throws IOException {
+    out.writeUTF((String)content);
+  }
+
+  private void readArray(DataInput in) throws IOException {
+    // read number of columns
+    int columns = in.readInt();
+    content = new Object[columns];
+    Object[] array = (Object[])content;
+    // read each column
+    for (int i = 0; i < array.length; i++) {
+      int type = readType(in);
+      switch (type) {
+      case FieldTypes.UTF:
+        array[i] = in.readUTF();
+        break;
+
+      case FieldTypes.BIN:
+        int length = in.readInt();
+        byte[] bytes = new byte[length];
+        in.readFully(bytes);
+        array[i] = bytes;
+        break;
+
+      case FieldTypes.DOUBLE:
+        array[i] = in.readDouble();
+        break;
+
+      case FieldTypes.FLOAT:
+        array[i] = in.readFloat();
+        break;
+
+      case FieldTypes.LONG:
+        array[i] = in.readLong();
+        break;
+
+      case FieldTypes.INT:
+        array[i] = in.readInt();
+        break;
+
+      case FieldTypes.SHORT:
+        array[i] = in.readShort();
+        break;
+
+      case FieldTypes.CHAR:
+        array[i] = in.readChar();
+        break;
+
+      case FieldTypes.BYTE:
+        array[i] = in.readByte();
+        break;
+
+      case FieldTypes.BOOLEAN:
+        array[i] = in.readBoolean();
+        break;
+
+      case FieldTypes.NULL:
+        array[i] = null;
+        break;
+
+      default:
+        throw new IOException(
+          new SqoopException(CoreError.CORE_0012, Integer.toString(type))
+        );
+      }
+    }
+  }
+
+  private void writeArray(DataOutput out) throws IOException {
+    Object[] array = (Object[])content;
+    // write number of columns
+    out.writeInt(array.length);
+    // write each column
+    for (int i = 0; i < array.length; i++) {
+      if (array[i] instanceof String) {
+        writeType(out, FieldTypes.UTF);
+        out.writeUTF((String)array[i]);
+
+      } else if (array[i] instanceof byte[]) {
+        writeType(out, FieldTypes.BIN);
+        out.writeInt(((byte[])array[i]).length);
+        out.write((byte[])array[i]);
+
+      } else if (array[i] instanceof Double) {
+        writeType(out, FieldTypes.DOUBLE);
+        out.writeDouble((Double)array[i]);
+
+      } else if (array[i] instanceof Float) {
+        writeType(out, FieldTypes.FLOAT);
+        out.writeFloat((Float)array[i]);
+
+      } else if (array[i] instanceof Long) {
+        writeType(out, FieldTypes.LONG);
+        out.writeLong((Long)array[i]);
+
+      } else if (array[i] instanceof Integer) {
+        writeType(out, FieldTypes.INT);
+        out.writeInt((Integer)array[i]);
+
+      } else if (array[i] instanceof Short) {
+        writeType(out, FieldTypes.SHORT);
+        out.writeShort((Short)array[i]);
+
+      } else if (array[i] instanceof Character) {
+        writeType(out, FieldTypes.CHAR);
+        out.writeChar((Character)array[i]);
+
+      } else if (array[i] instanceof Byte) {
+        writeType(out, FieldTypes.BYTE);
+        out.writeByte((Byte)array[i]);
+
+      } else if (array[i] instanceof Boolean) {
+        writeType(out, FieldTypes.BOOLEAN);
+        out.writeBoolean((Boolean)array[i]);
+
+      } else if (array[i] == null) {
+        writeType(out, FieldTypes.NULL);
+
+      } else {
+        throw new IOException(
+          new SqoopException(
+              CoreError.CORE_0012, array[i].getClass().getName()
+          )
+        );
+      }
+    }
+  }
+
+  private String format() {
+    switch (type) {
+    case EMPTY_DATA:
+      return null;
+
+    case CSV_RECORD:
+      if (fieldDelimiter == DEFAULT_FIELD_DELIMITER) {
+        return (String)content;
+      } else {
+        // TODO: need to exclude the case where comma is part of a string.
+        return ((String)content).replaceAll(
+            String.valueOf(DEFAULT_FIELD_DELIMITER),
+            String.valueOf(fieldDelimiter));
+      }
+
+    case ARRAY_RECORD:
+      StringBuilder sb = new StringBuilder();
+      Object[] array = (Object[])content;
+      for (int i = 0; i < array.length; i++) {
+        if (i != 0) {
+          sb.append(fieldDelimiter);
+        }
+
+        if (array[i] instanceof String) {
+          sb.append(stringDelimiter);
+          sb.append(escape((String)array[i]));
+          sb.append(stringDelimiter);
+        } else if (array[i] instanceof byte[]) {
+          sb.append(Arrays.toString((byte[])array[i]));
+        } else {
+          sb.append(String.valueOf(array[i]));
+        }
+      }
+      return sb.toString();
+
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  private Object[] parse() {
+    switch (type) {
+    case EMPTY_DATA:
+      return null;
+
+    case CSV_RECORD:
+      ArrayList<Object> list = new ArrayList<Object>();
+      // todo: need to parse CSV into Array
+      return list.toArray();
+
+    case ARRAY_RECORD:
+      return (Object[])content;
+
+    default:
+      throw new SqoopException(CoreError.CORE_0012, String.valueOf(type));
+    }
+  }
+
+  private String escape(String string) {
+    // TODO: Also need to escape those special characters as documented in:
+    // https://cwiki.apache.org/confluence/display/SQOOP/Sqoop2+Intermediate+representation#Sqoop2Intermediaterepresentation-Intermediateformatrepresentationproposal
+    String regex = String.valueOf(stringDelimiter);
+    String replacement = Matcher.quoteReplacement(escapedStringDelimiter);
+    return string.replaceAll(regex, replacement);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
new file mode 100644
index 0000000..e96dc6e
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/io/FieldTypes.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.io;
+
+public final class FieldTypes {
+
+  public static final int NULL    = 0;
+
+  public static final int BOOLEAN = 1;
+
+  public static final int BYTE    = 10;
+  public static final int CHAR    = 11;
+
+  public static final int SHORT   = 20;
+  public static final int INT     = 21;
+  public static final int LONG    = 22;
+
+  public static final int FLOAT   = 50;
+  public static final int DOUBLE  = 51;
+
+  public static final int BIN     = 100;
+  public static final int UTF     = 101;
+
+  private FieldTypes() {
+    // Disable explicit object creation
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
new file mode 100644
index 0000000..59baaf6
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/ConfigurationUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.model.FormUtils;
+import org.apache.sqoop.utils.ClassUtils;
+
+/**
+ * Helper class to load configuration specific objects from job configuration
+ */
+public final class ConfigurationUtils {
+
+  public static Object getConnectorConnection(Configuration configuration) {
+    return loadConfiguration(configuration,
+      JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
+      JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION);
+  }
+
+  public static Object getConnectorJob(Configuration configuration) {
+    return loadConfiguration(configuration,
+      JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
+      JobConstants.JOB_CONFIG_CONNECTOR_JOB);
+  }
+
+  public static Object getFrameworkConnection(Configuration configuration) {
+    return loadConfiguration(configuration,
+      JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
+      JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION);
+  }
+
+  public static Object getFrameworkJob(Configuration configuration) {
+    return loadConfiguration(configuration,
+      JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
+      JobConstants.JOB_CONFIG_FRAMEWORK_JOB);
+  }
+
+  private static Object loadConfiguration(Configuration configuration,
+                                          String classProperty,
+                                          String valueProperty) {
+    Object object = ClassUtils.instantiate(configuration.get(classProperty));
+    FormUtils.fillValues(configuration.get(valueProperty), object);
+    return object;
+  }
+
+  private ConfigurationUtils() {
+    // Instantiation is prohibited
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
new file mode 100644
index 0000000..c465f10
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * An output format for MapReduce job.
+ */
+public class SqoopFileOutputFormat
+    extends FileOutputFormat<Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopFileOutputFormat.class.getName());
+
+  public static final Class<? extends CompressionCodec> DEFAULT_CODEC =
+      DefaultCodec.class;
+
+  @Override
+  public RecordWriter<Data, NullWritable> getRecordWriter(
+      TaskAttemptContext context) throws IOException {
+    Configuration conf = context.getConfiguration();
+
+    Path filepath = getDefaultWorkFile(context, "");
+    String filename = filepath.toString();
+    conf.set(JobConstants.JOB_MR_OUTPUT_FILE, filename);
+
+    boolean isCompressed = getCompressOutput(context);
+    if (isCompressed) {
+      String codecname =
+          conf.get(FileOutputFormat.COMPRESS_CODEC, DEFAULT_CODEC.getName());
+      conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
+    }
+
+    SqoopOutputFormatLoadExecutor executor =
+        new SqoopOutputFormatLoadExecutor(context);
+    return executor.getRecordWriter();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
new file mode 100644
index 0000000..8fcdc99
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.utils.ClassUtils;
+
+/**
+ * An InputFormat for MapReduce job.
+ */
+public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopInputFormat.class.getName());
+
+  @Override
+  public RecordReader<SqoopSplit, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) {
+    return new SqoopRecordReader();
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+
+    String partitionerName = conf.get(JobConstants.JOB_ETL_PARTITIONER);
+    Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
+
+    PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
+    Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
+    Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
+
+    List<Partition> partitions = partitioner.getPartitions(connectorContext, connectorConnection, connectorJob);
+    List<InputSplit> splits = new LinkedList<InputSplit>();
+    for (Partition partition : partitions) {
+      LOG.debug("Partition: " + partition);
+      SqoopSplit split = new SqoopSplit();
+      split.setPartition(partition);
+      splits.add(split);
+    }
+
+    return splits;
+  }
+
+  public static class SqoopRecordReader
+      extends RecordReader<SqoopSplit, NullWritable> {
+
+    private boolean delivered = false;
+    private SqoopSplit split = null;
+
+    @Override
+    public boolean nextKeyValue() {
+      if (delivered) {
+        return false;
+      } else {
+        delivered = true;
+        return true;
+      }
+    }
+
+    @Override
+    public SqoopSplit getCurrentKey() {
+      return split;
+    }
+
+    @Override
+    public NullWritable getCurrentValue() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public float getProgress() {
+      return delivered ? 1.0f : 0.0f;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) {
+      this.split = (SqoopSplit)split;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
new file mode 100644
index 0000000..6892b4b
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.utils.ClassUtils;
+
+/**
+ * A mapper to perform map function.
+ */
+public class SqoopMapper
+    extends Mapper<SqoopSplit, NullWritable, Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopMapper.class.getName());
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+
+    String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
+    Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
+
+    PrefixContext connectorContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
+    Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
+    Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
+
+    SqoopSplit split = context.getCurrentKey();
+
+    try {
+      extractor.run(connectorContext, connectorConnection, connectorJob, split.getPartition(),
+        new MapDataWriter(context));
+
+    } catch (Exception e) {
+      throw new SqoopException(CoreError.CORE_0017, e);
+    }
+  }
+
+  public class MapDataWriter extends DataWriter {
+    private Context context;
+    private Data data;
+
+    public MapDataWriter(Context context) {
+      this.context = context;
+    }
+
+    @Override
+    public void setFieldDelimiter(char fieldDelimiter) {
+      if (data == null) {
+        data = new Data();
+      }
+
+      data.setFieldDelimiter(fieldDelimiter);
+    }
+
+    @Override
+    public void writeArrayRecord(Object[] array) {
+      writeContent(array, Data.ARRAY_RECORD);
+    }
+
+    @Override
+    public void writeCsvRecord(String csv) {
+      writeContent(csv, Data.CSV_RECORD);
+    }
+
+    @Override
+    public void writeContent(Object content, int type) {
+      if (data == null) {
+        data = new Data();
+      }
+
+      data.setContent(content, type);
+      try {
+        context.write(data, NullWritable.get());
+      } catch (Exception e) {
+        throw new SqoopException(CoreError.CORE_0013, e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
new file mode 100644
index 0000000..1242f90
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.io.Data;
+
+/**
+ * An output format for MapReduce job.
+ */
+public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable> {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopNullOutputFormat.class.getName());
+
+  @Override
+  public void checkOutputSpecs(JobContext context) {
+    // do nothing
+  }
+
+  @Override
+  public RecordWriter<Data, NullWritable> getRecordWriter(
+      TaskAttemptContext context) {
+    SqoopOutputFormatLoadExecutor executor =
+        new SqoopOutputFormatLoadExecutor(context);
+    return executor.getRecordWriter();
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+    // return an output committer that does nothing
+    return new NullOutputCommitter();
+  }
+
+  class NullOutputCommitter extends OutputCommitter {
+    @Override
+    public void setupJob(JobContext jobContext) { }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) { }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+      return false;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2255e721/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
new file mode 100644
index 0000000..96e1533
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.core.CoreError;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.io.Data;
+import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.utils.ClassUtils;
+
+public class SqoopOutputFormatLoadExecutor {
+
+  public static final Log LOG =
+      LogFactory.getLog(SqoopOutputFormatLoadExecutor.class.getName());
+
+  private boolean readerFinished;
+  private boolean writerFinished;
+  private Data data;
+  private JobContext context;
+  private SqoopRecordWriter producer;
+  private ConsumerThread consumer;
+
+  public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
+    data = new Data();
+    context = jobctx;
+    producer = new SqoopRecordWriter();
+    consumer = new ConsumerThread();
+  }
+
+  public RecordWriter<Data, NullWritable> getRecordWriter() {
+    consumer.setDaemon(true);
+    consumer.start();
+    return producer;
+  }
+
+  public class SqoopRecordWriter extends RecordWriter<Data, NullWritable> {
+    @Override
+    public void write(Data key, NullWritable value) {
+      synchronized (data) {
+        if (readerFinished) {
+          consumer.checkException();
+          return;
+        }
+
+        try {
+          if (!data.isEmpty()) {
+            // wait for reader to consume data
+            data.wait();
+          }
+
+          int type = key.getType();
+          data.setContent(key.getContent(type), type);
+
+          // notify reader that the data is ready
+          data.notify();
+
+        } catch (InterruptedException e) {
+          // inform reader that writer is finished
+          writerFinished = true;
+
+          // unlock reader so it can continue
+          data.notify();
+
+          // throw exception
+          throw new SqoopException(CoreError.CORE_0015, e);
+        }
+      }
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) {
+      synchronized (data) {
+        if (readerFinished) {
+          consumer.checkException();
+          return;
+        }
+
+        try {
+          if (!data.isEmpty()) {
+            // wait for reader to consume data
+            data.wait();
+          }
+
+          writerFinished = true;
+
+          data.notify();
+
+        } catch (InterruptedException e) {
+          // inform reader that writer is finished
+          writerFinished = true;
+
+          // unlock reader so it can continue
+          data.notify();
+
+          // throw exception
+          throw new SqoopException(CoreError.CORE_0015, e);
+        }
+      }
+    }
+  }
+
+  public class OutputFormatDataReader extends DataReader {
+    @Override
+    public void setFieldDelimiter(char fieldDelimiter) {
+      data.setFieldDelimiter(fieldDelimiter);
+    }
+
+    @Override
+    public Object[] readArrayRecord() {
+      return (Object[])readContent(Data.ARRAY_RECORD);
+    }
+
+    @Override
+    public String readCsvRecord() {
+      return (String)readContent(Data.CSV_RECORD);
+    }
+
+    @Override
+    public Object readContent(int type) {
+      synchronized (data) {
+        if (writerFinished) {
+          return null;
+        }
+
+        try {
+          if (data.isEmpty()) {
+            // wait for writer to produce data
+            data.wait();
+          }
+
+          Object content = data.getContent(type);
+          data.setContent(null, Data.EMPTY_DATA);
+
+          // notify writer that data is consumed
+          data.notify();
+
+          return content;
+
+        } catch (InterruptedException e) {
+          // inform writer that reader is finished
+          readerFinished = true;
+
+          // unlock writer so it can continue
+          data.notify();
+
+          // throw exception
+          throw new SqoopException(CoreError.CORE_0016, e);
+        }
+      }
+    }
+  }
+
+  public class ConsumerThread extends Thread {
+    private SqoopException exception = null;
+
+    public void checkException() {
+      if (exception != null) {
+        throw exception;
+      }
+    }
+
+    @Override
+    public void run() {
+      DataReader reader = new OutputFormatDataReader();
+
+      Configuration conf = context.getConfiguration();
+
+
+      String loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
+      Loader loader = (Loader) ClassUtils.instantiate(loaderName);
+
+      // Get together framework context as configuration prefix by nothing
+      PrefixContext frameworkContext = new PrefixContext(conf, "");
+
+      try {
+        loader.run(frameworkContext, reader);
+      } catch (Throwable t) {
+        throw new SqoopException(CoreError.CORE_0018, t);
+      }
+
+      synchronized (data) {
+        // inform writer that reader is finished
+        readerFinished = true;
+
+        // unlock writer so it can continue
+        data.notify();
+
+        // if no exception happens yet
+        if (exception == null && !writerFinished) {
+          // create exception if data are not all consumed
+          exception = new SqoopException(CoreError.CORE_0019);
+        }
+
+        // throw deferred exception if exist
+        if (exception != null) {
+          throw exception;
+        }
+      }
+    }
+  }
+
+}


Mime
View raw message