sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject git commit: SQOOP-737 Option to set number of extractors and loaders (Jarek Jarcec Cecho)
Date Tue, 04 Dec 2012 20:29:11 GMT
Updated Branches:
  refs/heads/sqoop2 c4f9ef846 -> a633fb0b3


SQOOP-737 Option to set number of extractors and loaders
(Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a633fb0b
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a633fb0b
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a633fb0b

Branch: refs/heads/sqoop2
Commit: a633fb0b34dece59e9f1420985817a8da8f8cb24
Parents: c4f9ef8
Author: Bilung Lee <blee@apache.org>
Authored: Tue Dec 4 11:50:21 2012 -0800
Committer: Bilung Lee <blee@apache.org>
Committed: Tue Dec 4 12:28:23 2012 -0800

----------------------------------------------------------------------
 .../org/apache/sqoop/common/MutableContext.java    |   15 +++++++
 .../org/apache/sqoop/common/MutableMapContext.java |   10 +++++
 .../jdbc/GenericJdbcImportPartitioner.java         |    7 +--
 .../connector/jdbc/TestImportPartitioner.java      |   16 ++-----
 .../apache/sqoop/framework/FrameworkManager.java   |    6 +++
 .../apache/sqoop/framework/SubmissionRequest.java  |   26 ++++++++++++
 .../configuration/ImportJobConfiguration.java      |    2 +
 .../framework/configuration/ThrottlingForm.java    |   32 +++++++++++++++
 .../main/resources/framework-resources.properties  |   11 +++++
 .../mapreduce/MapreduceExecutionEngine.java        |    4 ++
 .../java/org/apache/sqoop/job/JobConstants.java    |    3 +
 .../apache/sqoop/job/MapreduceExecutionError.java  |    3 +
 .../org/apache/sqoop/job/mr/SqoopInputFormat.java  |   11 +++++-
 .../java/org/apache/sqoop/job/TestHdfsLoad.java    |    2 +-
 .../java/org/apache/sqoop/job/TestMapReduce.java   |    2 +-
 .../java/org/apache/sqoop/job/etl/Partitioner.java |    1 +
 .../mapreduce/MapreduceSubmissionEngine.java       |    9 +++-
 17 files changed, 140 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/common/src/main/java/org/apache/sqoop/common/MutableContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/MutableContext.java b/common/src/main/java/org/apache/sqoop/common/MutableContext.java
index 238bbfd..ecb97b5 100644
--- a/common/src/main/java/org/apache/sqoop/common/MutableContext.java
+++ b/common/src/main/java/org/apache/sqoop/common/MutableContext.java
@@ -30,4 +30,19 @@ public interface MutableContext extends ImmutableContext {
    */
   public void setString(String key, String value);
 
+  /**
+   * Set long value for given key.
+   *
+   * @param key Key
+   * @param value New value
+   */
+  public void setLong(String key, long value);
+
+  /**
+   * Set integer value for given key.
+   *
+   * @param key Key
+   * @param value New value
+   */
+  public void setInteger(String key, int value);
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/common/src/main/java/org/apache/sqoop/common/MutableMapContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/common/MutableMapContext.java b/common/src/main/java/org/apache/sqoop/common/MutableMapContext.java
index cd9d3e3..cb0c3e1 100644
--- a/common/src/main/java/org/apache/sqoop/common/MutableMapContext.java
+++ b/common/src/main/java/org/apache/sqoop/common/MutableMapContext.java
@@ -43,6 +43,16 @@ public class MutableMapContext extends MapContext implements Iterable<Map.Entry<
   }
 
   @Override
+  public void setLong(String key, long value) {
+    getOptions().put(key, Long.toString(value));
+  }
+
+  @Override
+  public void setInteger(String key, int value) {
+    getOptions().put(key, Integer.toString(value));
+  }
+
+  @Override
   public Iterator<Map.Entry<String, String>> iterator() {
     return getOptions().entrySet().iterator();
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
index a6d3b52..0d9f0c0 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
@@ -23,21 +23,20 @@ import java.util.List;
 
 import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 
 public class GenericJdbcImportPartitioner extends Partitioner {
 
-  private int numberPartitions;
+  private long numberPartitions;
   private String partitionColumnName;
   private int partitionColumnType;
   private String partitionMinValue;
   private String partitionMaxValue;
 
   @Override
-  public List<Partition> getPartitions(ImmutableContext context, Object connectionC,
Object jobC) {
-    numberPartitions = context.getInt(Constants.JOB_ETL_NUMBER_PARTITIONS, 10);
+  public List<Partition> getPartitions(ImmutableContext context, long maxPartitions,
Object connectionC, Object jobC) {
+    numberPartitions = maxPartitions;
     partitionColumnName = context.getString(
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
     partitionColumnType = Integer.parseInt(context.getString(

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
index d5db190..77c4739 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
@@ -18,7 +18,6 @@
 package org.apache.sqoop.connector.jdbc;
 
 import java.sql.Types;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
@@ -31,7 +30,6 @@ import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
-import org.junit.Test;
 
 public class TestImportPartitioner extends TestCase {
 
@@ -52,13 +50,12 @@ public class TestImportPartitioner extends TestCase {
     context.setString(
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         String.valueOf(START + NUMBER_OF_ROWS - 1));
-    context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "5");
 
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(context, 5, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
         "-5 <= ICOL AND ICOL < -3",
@@ -83,13 +80,12 @@ public class TestImportPartitioner extends TestCase {
     context.setString(
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         String.valueOf(START + NUMBER_OF_ROWS - 1));
-    context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "3");
 
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(context, 3, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
         "-5 <= ICOL AND ICOL < -1",
@@ -112,13 +108,12 @@ public class TestImportPartitioner extends TestCase {
     context.setString(
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         String.valueOf(START + NUMBER_OF_ROWS - 1));
-    context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "13");
 
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(context, 13, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
         "-5 <= ICOL AND ICOL < -4",
@@ -148,13 +143,12 @@ public class TestImportPartitioner extends TestCase {
     context.setString(
         GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
         String.valueOf((double)(START + NUMBER_OF_ROWS - 1)));
-    context.setString(Constants.JOB_ETL_NUMBER_PARTITIONS, "5");
 
     ConnectionConfiguration connConf = new ConnectionConfiguration();
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(context, 5, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
         "-5.0 <= DCOL AND DCOL < -3.0",
@@ -185,7 +179,7 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    List<Partition> partitions = partitioner.getPartitions(context, connConf, jobConf);
+    List<Partition> partitions = partitioner.getPartitions(context, 3, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
         "-5.0 <= DCOL AND DCOL < -1.6666666666666665",

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 0cd6969..6674643 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -440,6 +440,12 @@ public final class FrameworkManager {
     // Initialize the map-reduce part (all sort of required classes, ...)
     request.setOutputDirectory(jobConfiguration.output.outputDirectory);
 
+    // We're directly moving configured number of extractors and loaders to
+    // underlying request object. In the future we might need to throttle this
+    // count based on other running jobs to meet our SLAs.
+    request.setExtractors(jobConfiguration.throttling.extractors);
+    request.setLoaders(jobConfiguration.throttling.loaders);
+
     // Delegate rest of the job to execution engine
     executionEngine.prepareImportSubmission(request);
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
index fb6b6a9..53d0039 100644
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
+++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionRequest.java
@@ -97,6 +97,16 @@ public class SubmissionRequest {
    */
   String notificationUrl;
 
+  /**
+   * Number of extractors
+   */
+  Integer extractors;
+
+  /**
+   * Number of loaders
+   */
+  Integer loaders;
+
   public SubmissionRequest() {
     this.jars = new LinkedList<String>();
     this.connectorContext = new MutableMapContext();
@@ -222,4 +232,20 @@ public class SubmissionRequest {
   public void setNotificationUrl(String url) {
     this.notificationUrl = url;
   }
+
+  public Integer getExtractors() {
+    return extractors;
+  }
+
+  public void setExtractors(Integer extractors) {
+    this.extractors = extractors;
+  }
+
+  public Integer getLoaders() {
+    return loaders;
+  }
+
+  public void setLoaders(Integer loaders) {
+    this.loaders = loaders;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
index 8c4dcf1..c674fc2 100644
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/ImportJobConfiguration.java
@@ -27,4 +27,6 @@ import org.apache.sqoop.model.Form;
 public class ImportJobConfiguration {
 
   @Form public OutputForm output;
+
+  @Form public ThrottlingForm throttling;
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java
b/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java
new file mode 100644
index 0000000..c435f6b
--- /dev/null
+++ b/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.framework.configuration;
+
+import org.apache.sqoop.model.FormClass;
+import org.apache.sqoop.model.Input;
+
+/**
+ * Form to set up number of loaders and extractors
+ */
+@FormClass
+public class ThrottlingForm {
+
+  @Input public Integer extractors;
+
+  @Input public Integer loaders;
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/core/src/main/resources/framework-resources.properties
----------------------------------------------------------------------
diff --git a/core/src/main/resources/framework-resources.properties b/core/src/main/resources/framework-resources.properties
index 019f5ca..db40946 100644
--- a/core/src/main/resources/framework-resources.properties
+++ b/core/src/main/resources/framework-resources.properties
@@ -44,3 +44,14 @@ output.outputDirectory.help = Output directory for final data
 output.ignored.label = Ignored
 output.ignored.help = This value is ignored
 
+# Throttling From
+#
+throttling.label = Throttling resources
+throttling.help = Set throttling boundaries to not overload your systems
+
+throttling.extractors.label = Extractors
+throttling.extractors.help = Number of extractors that Sqoop will use
+
+throttling.loaders.label = Loaders
+throttling.loaders.help = Number of loaders that Sqoop will use
+

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
index 3248e77..e2163ff 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java
@@ -68,6 +68,10 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
     context.setString(JobConstants.JOB_ETL_EXTRACTOR, importer.getExtractor().getName());
     context.setString(JobConstants.JOB_ETL_DESTROYER, importer.getDestroyer().getName());
 
+    if(request.getExtractors() != null) {
+      context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
+    }
+
     // TODO: This settings should be abstracted to core module at some point
     if(jobConf.output.outputFormat == OutputFormat.TEXT_FILE) {
       context.setString(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
index d899fce..f5123a2 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java
@@ -49,6 +49,9 @@ public final class JobConstants extends Constants {
       + "mr.output.codec";
 
 
+  public static final String JOB_ETL_EXTRACTOR_NUM = PREFIX_JOB_CONFIG
+    + "etl.extractor.count";
+
   public static final String JOB_CONFIG_CLASS_CONNECTOR_CONNECTION =
     PREFIX_JOB_CONFIG + "config.class.connector.connection";
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
index 30956f3..1dc12d1 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/MapreduceExecutionError.java
@@ -76,6 +76,9 @@ public enum MapreduceExecutionError implements ErrorCode {
   /** Unsupported output format type found **/
   MAPRED_EXEC_0024("Unknown output format type"),
 
+  /** Got invalid number of partitions from Partitioner */
+  MAPRED_EXEC_0025("Retrieved invalid number of partitions from Partitioner"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index 8fcdc99..d191e03 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -30,7 +30,9 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
@@ -62,7 +64,9 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable>
{
     Object connectorConnection = ConfigurationUtils.getConnectorConnection(conf);
     Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
 
-    List<Partition> partitions = partitioner.getPartitions(connectorContext, connectorConnection,
connectorJob);
+    long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
+
+    List<Partition> partitions = partitioner.getPartitions(connectorContext, maxPartitions,
connectorConnection, connectorJob);
     List<InputSplit> splits = new LinkedList<InputSplit>();
     for (Partition partition : partitions) {
       LOG.debug("Partition: " + partition);
@@ -71,6 +75,11 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable>
{
       splits.add(split);
     }
 
+    if(splits.size() > maxPartitions) {
+      throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0025,
+        String.format("Got %d, max was %d", splits.size(), maxPartitions));
+    }
+
     return splits;
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index 21a2be9..4e6209d 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -204,7 +204,7 @@ public class TestHdfsLoad extends TestCase {
 
   public static class DummyPartitioner extends Partitioner {
     @Override
-    public List<Partition> getPartitions(ImmutableContext context, Object oc, Object
oj) {
+    public List<Partition> getPartitions(ImmutableContext context, long maxPartitions,
Object oc, Object oj) {
       List<Partition> partitions = new LinkedList<Partition>();
       for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
         DummyPartition partition = new DummyPartition();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 745a3a4..c8caecd 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -120,7 +120,7 @@ public class TestMapReduce extends TestCase {
 
   public static class DummyPartitioner extends Partitioner {
     @Override
-    public List<Partition> getPartitions(ImmutableContext context, Object oc, Object
oj) {
+    public List<Partition> getPartitions(ImmutableContext context, long maxPartitions,
Object oc, Object oj) {
       List<Partition> partitions = new LinkedList<Partition>();
       for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
         DummyPartition partition = new DummyPartition();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
index 3a525c4..9cd000c 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
@@ -28,6 +28,7 @@ import java.util.List;
 public abstract class Partitioner {
 
   public abstract List<Partition> getPartitions(ImmutableContext context,
+                                                long maxPartitions,
                                                 Object connectionConfiguration,
                                                 Object jobConfiguration);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a633fb0b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
index a64a477..8f7864e 100644
--- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -208,8 +208,13 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
         FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
       }
 
-      // TODO(jarcec): Harcoded no reducers
-      job.setNumReduceTasks(0);
+      // Set number of reducers as number of configured loaders  or suppress
+      // reduce phase entirely if loaders are not set at all.
+      if(request.getLoaders() != null) {
+        job.setNumReduceTasks(request.getLoaders());
+      } else {
+        job.setNumReduceTasks(0);
+      }
 
       job.setOutputFormatClass(request.getOutputFormatClass());
       job.setOutputKeyClass(request.getOutputKeyClass());


Mime
View raw message