sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject git commit: SQOOP-740 Provide export submission implementation (Jarek Jarcec Cecho)
Date Sat, 08 Dec 2012 00:00:34 GMT
Updated Branches:
  refs/heads/sqoop2 e9868cb72 -> 9ac49d95c


SQOOP-740 Provide export submission implementation
(Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/9ac49d95
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/9ac49d95
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/9ac49d95

Branch: refs/heads/sqoop2
Commit: 9ac49d95c21f1bed42ce511d3cd91deeb3c5904d
Parents: e9868cb
Author: Bilung Lee <blee@apache.org>
Authored: Fri Dec 7 15:07:34 2012 -0800
Committer: Bilung Lee <blee@apache.org>
Committed: Fri Dec 7 15:07:34 2012 -0800

----------------------------------------------------------------------
 .../client/shell/CreateConnectionFunction.java     |    2 +-
 .../sqoop/client/shell/CreateJobFunction.java      |    2 +-
 .../connector/jdbc/GenericJdbcExportLoader.java    |    2 +-
 .../sqoop/connector/jdbc/TestExportLoader.java     |    5 +-
 .../apache/sqoop/framework/ExecutionEngine.java    |    8 ++-
 .../apache/sqoop/framework/FrameworkManager.java   |   15 ++++-
 .../configuration/ExportJobConfiguration.java      |    8 ++-
 .../configuration/ImportJobConfiguration.java      |    5 ++
 .../sqoop/framework/configuration/InputForm.java   |   30 +++++++++
 .../main/resources/framework-resources.properties  |    8 +++
 .../mapreduce/MapreduceExecutionEngine.java        |   48 +++++++++++++++
 .../sqoop/job/etl/HdfsExportPartitioner.java       |   41 ++++++-------
 .../sqoop/job/etl/HdfsSequenceImportLoader.java    |    2 +-
 .../apache/sqoop/job/etl/HdfsTextImportLoader.java |    2 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java      |   23 ++++++-
 .../java/org/apache/sqoop/job/TestHdfsExtract.java |    2 +-
 .../java/org/apache/sqoop/job/TestMapReduce.java   |    2 +-
 .../main/java/org/apache/sqoop/job/etl/Loader.java |   14 ++++-
 18 files changed, 178 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java
b/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java
index 734276d..4df1c71 100644
--- a/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java
+++ b/client/src/main/java/org/apache/sqoop/client/shell/CreateConnectionFunction.java
@@ -85,7 +85,7 @@ public class CreateConnectionFunction extends SqoopFunction {
     ResourceBundle frameworkBundle = frameworkBean.getResourceBundle();
 
     MConnector connector = connectorBean.getConnectors().get(0);
-    ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(0);
+    ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(connector.getPersistenceId());
 
     MConnection connection = new MConnection(connector.getPersistenceId(),
                                              connector.getConnectionForms(),

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java b/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java
index 0b685bf..3aa6c4f 100644
--- a/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java
+++ b/client/src/main/java/org/apache/sqoop/client/shell/CreateJobFunction.java
@@ -103,7 +103,7 @@ public class CreateJobFunction extends  SqoopFunction {
 
     connectorBean = readConnector(String.valueOf(connection.getConnectorId()));
     MConnector connector = connectorBean.getConnectors().get(0);
-    ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(0);
+    ResourceBundle connectorBundle = connectorBean.getResourceBundles().get(connection.getPersistenceId());
 
     MJob.Type jobType = MJob.Type.valueOf(type.toUpperCase());
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
index 13574b2..b2e59f7 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
@@ -29,7 +29,7 @@ public class GenericJdbcExportLoader extends Loader {
   private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
 
   @Override
-  public void run(ImmutableContext context, DataReader reader) throws Exception{
+  public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws
Exception{
     String driver = context.getString(
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER);
     String url = context.getString(

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
index c97693d..bf3c5e2 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
@@ -23,11 +23,8 @@ import junit.framework.TestCase;
 
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
-import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
-import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.io.DataReader;
-import org.junit.Test;
 
 public class TestExportLoader extends TestCase {
 
@@ -75,7 +72,7 @@ public class TestExportLoader extends TestCase {
     Loader loader = new GenericJdbcExportLoader();
     DummyReader reader = new DummyReader();
 
-    loader.run(context, reader);
+    loader.load(context, null, null, reader);
 
     int index = START;
     ResultSet rs = executor.executeQuery("SELECT * FROM "

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
index ae14d9a..f43942d 100644
--- a/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
+++ b/core/src/main/java/org/apache/sqoop/framework/ExecutionEngine.java
@@ -52,9 +52,15 @@ public abstract class ExecutionEngine {
   }
 
   /**
-   * Prepare given submission request for import submission.
+   * Prepare given submission request for import job type.
    *
    * @param request Submission request
    */
   public abstract void prepareImportSubmission(SubmissionRequest request);
+
+  /**
+   * Prepare given submission request for export job type..
+   * @param request
+   */
+  public abstract void prepareExportSubmission(SubmissionRequest request);
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 6674643..a5ac74f 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -403,7 +403,7 @@ public final class FrameworkManager {
         prepareImportSubmission(request);
         break;
       case EXPORT:
-        // TODO(jarcec): Implement export path
+        prepareExportSubmission(request);
         break;
       default:
         throw  new SqoopException(FrameworkError.FRAMEWORK_0005,
@@ -450,6 +450,19 @@ public final class FrameworkManager {
     executionEngine.prepareImportSubmission(request);
   }
 
+  private static void prepareExportSubmission(SubmissionRequest request) {
+    ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request.getConfigFrameworkJob();
+
+    // We're directly moving configured number of extractors and loaders to
+    // underlying request object. In the future we might need to throttle this
+    // count based on other running jobs to meet our SLAs.
+    request.setExtractors(jobConfiguration.throttling.extractors);
+    request.setLoaders(jobConfiguration.throttling.loaders);
+
+    // Delegate rest of the job to execution engine
+    executionEngine.prepareExportSubmission(request);
+  }
+
   /**
    * Callback that will be called only if we failed to submit the job to the
    * remote cluster.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
b/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
index 330aff0..d533089 100644
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/ExportJobConfiguration.java
@@ -26,5 +26,11 @@ import org.apache.sqoop.model.Form;
 @ConfigurationClass
 public class ExportJobConfiguration {
 
-  @Form OutputForm output;
+  @Form public InputForm input;
+
+  @Form public ThrottlingForm throttling;
+
+  public ExportJobConfiguration() {
+    throttling = new ThrottlingForm();
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
index c674fc2..2a35eb9 100644
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
@@ -29,4 +29,9 @@ public class ImportJobConfiguration {
   @Form public OutputForm output;
 
   @Form public ThrottlingForm throttling;
+
+  public ImportJobConfiguration() {
+    output = new OutputForm();
+    throttling = new ThrottlingForm();
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java
new file mode 100644
index 0000000..c97a5de
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/InputForm.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ *
+ */
+@FormClass
+public class InputForm {
+
+  @Input(size = 50) public String inputDirectory;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/core/src/main/resources/framework-resources.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/framework-resources.properties b/core/src/main/resources/framework-resources.properties
index db40946..cebc90e 100644
--- a/core/src/main/resources/framework-resources.properties
+++ b/core/src/main/resources/framework-resources.properties
@@ -44,6 +44,14 @@ output.outputDirectory.help = Output directory for final data
 output.ignored.label = Ignored
 output.ignored.help = This value is ignored
 
+# Input Form
+#
+input.label = Input configuration
+input.help = Specifies information required to get data from Hadoop ecosystem
+
+input.inputDirectory.label = Input directory
+input.inputDirectory.help = Directory that should be exported
+
 # Throttling From
 #
 throttling.label = Throttling resources

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index e2163ff..06872ca 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -18,32 +18,44 @@
 package org.apache.sqoop.execution.mapreduce;
 
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.framework.ExecutionEngine;
 import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
 import org.apache.sqoop.framework.configuration.OutputFormat;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
+import org.apache.sqoop.job.etl.Exporter;
+import org.apache.sqoop.job.etl.HdfsExportPartitioner;
 import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
+import org.apache.sqoop.job.etl.HdfsTextExportExtractor;
 import org.apache.sqoop.job.etl.HdfsTextImportLoader;
 import org.apache.sqoop.job.etl.Importer;
 import org.apache.sqoop.job.io.Data;
 import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
+import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
 
 /**
  *
  */
 public class MapreduceExecutionEngine extends ExecutionEngine {
 
+  /**
+   *  {@inheritDoc}
+   */
   @Override
   public SubmissionRequest createSubmissionRequest() {
     return new MRSubmissionRequest();
   }
 
+  /**
+   *  {@inheritDoc}
+   */
   @Override
   public void prepareImportSubmission(SubmissionRequest gRequest) {
     MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
@@ -82,4 +94,40 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
         "Format: " + jobConf.output.outputFormat);
     }
   }
+
+  /**
+   *  {@inheritDoc}
+   */
+  @Override
+  public void prepareExportSubmission(SubmissionRequest gRequest) {
+    MRSubmissionRequest request = (MRSubmissionRequest) gRequest;
+    ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob();
+
+    // Configure map-reduce classes for import
+    request.setInputFormatClass(SqoopInputFormat.class);
+
+    request.setMapperClass(SqoopMapper.class);
+    request.setMapOutputKeyClass(Data.class);
+    request.setMapOutputValueClass(NullWritable.class);
+
+    request.setOutputFormatClass(SqoopNullOutputFormat.class);
+    request.setOutputKeyClass(Data.class);
+    request.setOutputValueClass(NullWritable.class);
+
+    Exporter exporter = (Exporter)request.getConnectorCallbacks();
+
+    // Set up framework context
+    MutableMapContext context = request.getFrameworkContext();
+    context.setString(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName());
+    context.setString(JobConstants.JOB_ETL_LOADER, exporter.getLoader().getName());
+    context.setString(JobConstants.JOB_ETL_DESTROYER, exporter.getDestroyer().getName());
+
+    // We should make one extractor that will be able to read all supported file types
+    context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName());
+    context.setString(FileInputFormat.INPUT_DIR, jobConf.input.inputDirectory);
+
+    if(request.getExtractors() != null) {
+      context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
index 9e7ea4e..7ffd97c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Set;
@@ -30,7 +31,6 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -42,7 +42,7 @@ import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.Constants;
+import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 
@@ -68,12 +68,10 @@ public class HdfsExportPartitioner extends Partitioner {
 
   @Override
   public List<Partition> getPartitions(ImmutableContext context,
-      long maxPartitions, Object connectionConfiguration, Object jobConfiguration) {
+      long numTasks, Object connectionConfiguration, Object jobConfiguration) {
     Configuration conf = ((PrefixContext)context).getConfiguration();
 
     try {
-      int numTasks = Integer.parseInt(conf.get(
-          Constants.JOB_ETL_NUMBER_PARTITIONS));
       long numInputBytes = getInputSize(conf);
       maxSplitSize = numInputBytes / numTasks;
 
@@ -117,24 +115,21 @@ public class HdfsExportPartitioner extends Partitioner {
       // all the files in input set
       String indir = conf.get(FileInputFormat.INPUT_DIR);
       FileSystem fs = FileSystem.get(conf);
-      Path[] paths = FileUtil.stat2Paths(fs.listStatus(new Path(indir)));
-      List<Partition> partitions = new ArrayList<Partition>();
-      if (paths.length == 0) {
-        return partitions;
+
+      List<Path> paths = new LinkedList<Path>();
+      for(FileStatus status : fs.listStatus(new Path(indir))) {
+        if(!status.isDirectory()) {
+          paths.add(status.getPath());
+        }
       }
 
-      // Convert them to Paths first. This is a costly operation and
-      // we should do it first, otherwise we will incur doing it multiple
-      // times, one time each for each pool in the next loop.
-      List<Path> newpaths = new ArrayList<Path>();
-      for (int i = 0; i < paths.length; i++) {
-        Path p = new Path(paths[i].toUri().getPath());
-        newpaths.add(p);
+      List<Partition> partitions = new ArrayList<Partition>();
+      if (paths.size() == 0) {
+        return partitions;
       }
-      paths = null;
 
       // create splits for all files that are not in any pool.
-      getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]),
+      getMoreSplits(conf, paths,
                     maxSize, minSizeNode, minSizeRack, partitions);
 
       // free up rackToNodes map
@@ -161,7 +156,7 @@ public class HdfsExportPartitioner extends Partitioner {
   /**
    * Return all the splits in the specified set of paths
    */
-  private void getMoreSplits(Configuration conf, Path[] paths,
+  private void getMoreSplits(Configuration conf, List<Path> paths,
       long maxSize, long minSizeNode, long minSizeRack,
       List<Partition> partitions) throws IOException {
 
@@ -180,14 +175,14 @@ public class HdfsExportPartitioner extends Partitioner {
     HashMap<String, List<OneBlockInfo>> nodeToBlocks =
                               new HashMap<String, List<OneBlockInfo>>();
 
-    files = new OneFileInfo[paths.length];
-    if (paths.length == 0) {
+    files = new OneFileInfo[paths.size()];
+    if (paths.size() == 0) {
       return;
     }
 
     // populate all the blocks for all files
-    for (int i = 0; i < paths.length; i++) {
-      files[i] = new OneFileInfo(paths[i], conf, isSplitable(conf, paths[i]),
+    for (int i = 0; i < paths.size(); i++) {
+      files[i] = new OneFileInfo(paths.get(i), conf, isSplitable(conf, paths.get(i)),
                                  rackToBlocks, blockToNodes, nodeToBlocks,
                                  rackToNodes, maxSize);
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
index a706ea8..a5d6b9c 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
@@ -46,7 +46,7 @@ public class HdfsSequenceImportLoader extends Loader {
   }
 
   @Override
-  public void run(ImmutableContext context, DataReader reader) throws Exception{
+  public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws
Exception{
     reader.setFieldDelimiter(fieldDelimiter);
 
     Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
index 55eb389..490b1c2 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
@@ -46,7 +46,7 @@ public class HdfsTextImportLoader extends Loader {
   }
 
   @Override
-  public void run(ImmutableContext context, DataReader reader) throws Exception{
+  public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws
Exception{
     reader.setFieldDelimiter(fieldDelimiter);
 
     Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index 3bd1e1b..71b4724 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -169,11 +169,28 @@ public class SqoopOutputFormatLoadExecutor {
       String loaderName = conf.get(JobConstants.JOB_ETL_LOADER);
       Loader loader = (Loader) ClassUtils.instantiate(loaderName);
 
-      // Get together framework context as configuration prefix by nothing
-      PrefixContext frameworkContext = new PrefixContext(conf, "");
+      // Objects that should be pass to the Executor execution
+      PrefixContext subContext = null;
+      Object configConnection = null;
+      Object configJob = null;
+
+      switch (ConfigurationUtils.getJobType(conf)) {
+        case EXPORT:
+          subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT);
+          configConnection = ConfigurationUtils.getConnectorConnection(conf);
+          configJob = ConfigurationUtils.getConnectorJob(conf);
+          break;
+        case IMPORT:
+          subContext = new PrefixContext(conf, "");
+          configConnection = ConfigurationUtils.getFrameworkConnection(conf);
+          configJob = ConfigurationUtils.getFrameworkJob(conf);
+          break;
+        default:
+          throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023);
+      }
 
       try {
-        loader.run(frameworkContext, reader);
+        loader.load(subContext, configConnection, configJob, reader);
       } catch (Throwable t) {
         LOG.error("Error while loading data out of MR job.", t);
         throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0018, t);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index 585fac7..9edf0ba 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -233,7 +233,7 @@ public class TestHdfsExtract extends TestCase {
 
   public static class DummyLoader extends Loader {
     @Override
-    public void run(ImmutableContext context, DataReader reader)
+    public void load(ImmutableContext context, Object oc, Object oj, DataReader reader)
         throws Exception {
       int index = 1;
       int sum = 0;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index c8caecd..8590065 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -216,7 +216,7 @@ public class TestMapReduce extends TestCase {
     private Data actual = new Data();
 
     @Override
-    public void run(ImmutableContext context, DataReader reader) throws Exception{
+    public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws
Exception{
       Object[] array;
       while ((array = reader.readArrayRecord()) != null) {
         actual.setContent(array, Data.ARRAY_RECORD);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9ac49d95/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
index 046b939..3148e49 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
@@ -25,6 +25,18 @@ import org.apache.sqoop.job.io.DataReader;
  */
 public abstract class Loader {
 
-  public abstract void run(ImmutableContext context, DataReader reader) throws Exception;
+  /**
+   * Load data to target.
+   *
+   * @param context Context object
+   * @param connectionConfiguration Connection configuration
+   * @param jobConfiguration Job configuration
+   * @param reader Data reader object
+   * @throws Exception
+   */
+  public abstract void load(ImmutableContext context,
+                            Object connectionConfiguration,
+                            Object jobConfiguration,
+                            DataReader reader) throws Exception;
 
 }


Mime
View raw message