sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-749: Exports Using Stored Procedures (Functions)
Date Tue, 22 Jan 2013 20:25:34 GMT
Updated Branches:
  refs/heads/trunk 482f39253 -> 20af67ef6


SQOOP-749: Exports Using Stored Procedures (Functions)

(Nick White via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/20af67ef
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/20af67ef
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/20af67ef

Branch: refs/heads/trunk
Commit: 20af67ef60096b17e1d9585670e5ec787eb760e2
Parents: 482f392
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Tue Jan 22 12:24:40 2013 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Tue Jan 22 12:24:40 2013 -0800

----------------------------------------------------------------------
 ivy.xml                                            |    5 +
 ivy/libraries.properties                           |    2 +
 src/docs/user/export-purpose.txt                   |    5 +-
 src/docs/user/export.txt                           |   20 +-
 src/java/org/apache/sqoop/SqoopOptions.java        |    9 +
 src/java/org/apache/sqoop/manager/ConnManager.java |   31 ++
 src/java/org/apache/sqoop/manager/SqlManager.java  |   90 ++++
 .../sqoop/mapreduce/ExportCallOutputFormat.java    |  154 +++++++
 .../apache/sqoop/mapreduce/JdbcCallExportJob.java  |  104 +++++
 .../org/apache/sqoop/mapreduce/JdbcExportJob.java  |   11 +-
 src/java/org/apache/sqoop/orm/ClassWriter.java     |   10 +-
 src/java/org/apache/sqoop/tool/BaseSqoopTool.java  |    1 +
 src/java/org/apache/sqoop/tool/ExportTool.java     |   37 ++-
 src/test/com/cloudera/sqoop/SmokeTests.java        |    3 +
 src/test/com/cloudera/sqoop/TestConnFactory.java   |   10 +
 src/test/com/cloudera/sqoop/TestExport.java        |   13 +-
 .../sqoop/manager/PostgresqlExportTest.java        |  136 +++++-
 .../cloudera/sqoop/testutil/ExportJobTestCase.java |   10 +-
 .../org/apache/sqoop/TestExportUsingProcedure.java |  326 +++++++++++++++
 19 files changed, 940 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 1ee60df..1fa4dd1 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -124,6 +124,11 @@ under the License.
       conf="common->master" />
     <dependency org="junit" name="junit" rev="${junit.version}"
       conf="test->default"/>
+    <!-- We're only using H2 for tests as it supports stored
+         procedures; once we move to HSQLDB 2.x we can drop
+         this -->
+    <dependency org="com.h2database" name="h2" rev="${h2.version}"
+      conf="test->default"/>
     <dependency org="hsqldb" name="hsqldb" rev="${hsqldb.version}"
       conf="common->default;redist->default"/>
     <dependency org="commons-io" name="commons-io" rev="${commons-io.version}"

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/ivy/libraries.properties
----------------------------------------------------------------------
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 4c9e37d..430d554 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -34,6 +34,8 @@ ivy.version=2.1.0
 
 junit.version=4.5
 
+h2.version=1.3.170
+
 log4j.version=1.2.16
 
 mvn.version=2.0.10

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/docs/user/export-purpose.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/export-purpose.txt b/src/docs/user/export-purpose.txt
index c26eaa7..def6ead 100644
--- a/src/docs/user/export-purpose.txt
+++ b/src/docs/user/export-purpose.txt
@@ -25,6 +25,5 @@ user-specified delimiters.
 The default operation is to transform these into a set of +INSERT+
 statements that inject the records into the database. In "update mode,"
 Sqoop will generate +UPDATE+ statements that replace existing records
-in the database.
-
-
+in the database, and in "call mode" Sqoop will make a stored procedure
+call for each record.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/docs/user/export.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/export.txt b/src/docs/user/export.txt
index 9f600fe..8b9e473 100644
--- a/src/docs/user/export.txt
+++ b/src/docs/user/export.txt
@@ -54,6 +54,7 @@ Argument                                 Description
 +-m,\--num-mappers <n>+                  Use 'n' map tasks to export in\
                                          parallel
 +\--table <table-name>+                  Table to populate
++\--call <stored-proc-name>+             Stored Procedure to call
 +\--update-key <col-name>+               Anchor column to use for updates.\
                                          Use a comma separated list of columns\
                                          if there are more than one column.
@@ -76,9 +77,10 @@ Argument                                 Description
                                          statement execution.
 ------------------------------------------------------------------------
 
-The +\--table+ and +\--export-dir+ arguments are required. These
-specify the table to populate in the database, and the
-directory in HDFS that contains the source data.
+The +\--export-dir+ argument and one of +\--table+ or +\--call+ are
+ required. These specify the table to populate in the database (or the
+ stored procedure to call), and the directory in HDFS that contains
+ the source data.
 
 You can control the number of mappers independently from the number of
 files present in the directory. Export performance depends on the
@@ -126,7 +128,8 @@ specified, Sqoop will delete all of the data before starting the export job.
 
 NOTE: Support for staging data prior to pushing it into the destination
 table is not available for +--direct+ exports. It is also not available when
-export is invoked using the +--update-key+ option for updating existing data.
+export is invoked using the +--update-key+ option for updating existing data,
+and when stored procedures are used to insert the data.
 
 
 Inserts vs. Updates
@@ -275,3 +278,12 @@ Another basic export to populate a table named +bar+ with validation enabled:
 $ sqoop export --connect jdbc:mysql://db.example.com/foo --table bar  \
     --export-dir /results/bar_data --validate
 ----
+
+An export that calls a stored procedure named +barproc+ for every record in
++/results/bar_data+ would look like:
+
+----
+$ sqoop export --connect jdbc:mysql://db.example.com/foo --call barproc \
+    --export-dir /results/bar_data
+----
+

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/SqoopOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index b0fdfa0..addc889 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -99,6 +99,7 @@ public class SqoopOptions implements Cloneable {
   @StoredAsProperty("db.username") private String username;
   @StoredAsProperty("db.export.staging.table") private String stagingTableName;
   @StoredAsProperty("db.clear.staging.table") private boolean clearStagingTable;
+  @StoredAsProperty("db.export.call") private String call;
   private Properties connectionParams; //Properties stored as db.connect.params
 
 
@@ -2012,4 +2013,12 @@ public class SqoopOptions implements Cloneable {
       Class validationFailureHandlerClazz) {
     this.validationFailureHandlerClass = validationFailureHandlerClazz;
   }
+
+  public String getCall() {
+    return call;
+  }
+
+  public void setCall(String theCall) {
+    this.call = theCall;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/manager/ConnManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java
index 358981e..1b32dc9 100644
--- a/src/java/org/apache/sqoop/manager/ConnManager.java
+++ b/src/java/org/apache/sqoop/manager/ConnManager.java
@@ -70,6 +70,15 @@ public abstract class ConnManager {
   public abstract String [] getColumnNames(String tableName);
 
   /**
+   * Return a list of stored procedure argument names in the order
+   * that they are declared.
+   */
+  public String [] getColumnNamesForProcedure(String procedureName) {
+    throw new UnsupportedOperationException(
+        "No stored procedure support for this database");
+  }
+
+  /**
    * Return a list of column names in query in the order returned by the db.
    */
   public String [] getColumnNamesForQuery(String query) {
@@ -247,6 +256,18 @@ public abstract class ConnManager {
 
   /**
    * Return an unordered mapping from colname to sqltype for
+   * all the input arguments for a stored procedure.
+   *
+   * The Integer type id is a constant from java.sql.Types
+   */
+  public Map<String, Integer> getColumnTypesForProcedure(
+      String procedureName) {
+    throw new UnsupportedOperationException(
+        "No stored procedure support for this database");
+  }
+
+  /**
+   * Return an unordered mapping from colname to sqltype for
    * all columns in a table or query.
    *
    * The Integer type id is a constant from java.sql.Types
@@ -475,6 +496,16 @@ public abstract class ConnManager {
   }
 
   /**
+   * Export data stored in HDFS into a table in a database. This calls a stored
+   * procedure to insert rows into the target table.
+   */
+  public void callTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    throw new ExportException("This database does not support exports "
+        + "using stored procedures");
+  }
+
+  /**
    * Export updated data stored in HDFS into a database table.
    * This updates existing rows in the target table, based on the
    * updateKeyCol specified in the context's SqoopOptions.

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/manager/SqlManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java
index 3a52c6d..03c9e64 100644
--- a/src/java/org/apache/sqoop/manager/SqlManager.java
+++ b/src/java/org/apache/sqoop/manager/SqlManager.java
@@ -45,12 +45,15 @@ import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.mapreduce.JdbcCallExportJob;
 import org.apache.sqoop.util.SqlTypeMap;
 
 /**
@@ -161,6 +164,48 @@ public abstract class SqlManager
     }
   }
 
+  @Override
+  public String[] getColumnNamesForProcedure(String procedureName) {
+    List<String> ret = new ArrayList<String>();
+    try {
+      DatabaseMetaData metaData = this.getConnection().getMetaData();
+      ResultSet results = metaData.getProcedureColumns(null, null,
+          procedureName, null);
+      if (null == results) {
+        return null;
+      }
+
+      try {
+        while (results.next()) {
+          if (results.getInt("COLUMN_TYPE")
+              != DatabaseMetaData.procedureColumnReturn) {
+            int index = results.getInt("ORDINAL_POSITION") - 1;
+            if (index < 0) {
+              continue; // actually the return type
+            }
+            for(int i = ret.size(); i < index; ++i) {
+              ret.add(null);
+            }
+            String name = results.getString("COLUMN_NAME");
+            if (index == ret.size()) {
+              ret.add(name);
+            } else {
+              ret.set(index, name);
+            }
+          }
+        }
+        return ret.toArray(new String[ret.size()]);
+      } finally {
+        results.close();
+        getConnection().commit();
+      }
+    } catch (SQLException sqlException) {
+      LOG.error("Error reading procedure metadata: "
+          + sqlException.toString());
+      return null;
+    }
+  }
+
   /**
    * @return the SQL query to use in getColumnTypes() in case this logic must
    * be tuned per-database, but the main extraction loop is still inheritable.
@@ -322,6 +367,42 @@ public abstract class SqlManager
   }
 
   @Override
+  public Map<String, Integer> getColumnTypesForProcedure(String procedureName) {
+    Map<String, Integer> ret = new TreeMap<String, Integer>();
+    try {
+      DatabaseMetaData metaData = this.getConnection().getMetaData();
+      ResultSet results = metaData.getProcedureColumns(null, null,
+          procedureName, null);
+      if (null == results) {
+        return null;
+      }
+
+      try {
+        while (results.next()) {
+          if (results.getInt("COLUMN_TYPE")
+            != DatabaseMetaData.procedureColumnReturn
+          && results.getInt("ORDINAL_POSITION") > 0) {
+            // we don't care if we get several rows for the
+            // same ORDINAL_POSITION (e.g. like H2 gives us)
+            // as we'll just overwrite the entry in the map:
+            ret.put(
+              results.getString("COLUMN_NAME"),
+              results.getInt("DATA_TYPE"));
+          }
+        }
+        return ret.isEmpty() ? null : ret;
+      } finally {
+        results.close();
+        getConnection().commit();
+      }
+    } catch (SQLException sqlException) {
+      LOG.error("Error reading primary key metadata: "
+          + sqlException.toString());
+      return null;
+    }
+  }
+
+  @Override
   public String[] listTables() {
     ResultSet results = null;
     String [] tableTypes = {"TABLE"};
@@ -692,6 +773,15 @@ public abstract class SqlManager
     exportJob.runExport();
   }
 
+  @Override
+  public void callTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException,
+      ExportException {
+    context.setConnManager(this);
+    JdbcCallExportJob exportJob = new JdbcCallExportJob(context);
+    exportJob.runExport();
+  }
+
   public void release() {
     if (null != this.lastStatement) {
       try {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/mapreduce/ExportCallOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/ExportCallOutputFormat.java b/src/java/org/apache/sqoop/mapreduce/ExportCallOutputFormat.java
new file mode 100644
index 0000000..7dc3453
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/ExportCallOutputFormat.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * Insert the emitted keys as records into a database table.
+ * This supports a configurable "spill threshold" at which
+ * point intermediate transactions are committed.
+ *
+ * Record objects are buffered before actually performing the INSERT
+ * statements; this requires that the key implement the
+ * SqoopRecord interface.
+ *
+ * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ */
+public class ExportCallOutputFormat<K extends SqoopRecord, V>
+    extends AsyncSqlOutputFormat<K, V> {
+
+  private static final Log LOG = LogFactory.getLog(
+      ExportCallOutputFormat.class);
+
+  @Override
+  /** {@inheritDoc} */
+  public void checkOutputSpecs(JobContext context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    DBConfiguration dbConf = new DBConfiguration(conf);
+
+    // Sanity check all the configuration values we need.
+    if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
+      throw new IOException("Database connection URL is not set.");
+    } else if (null == dbConf.getOutputTableName()) {
+      throw new IOException("Procedure name is not set for export");
+    } else if (null == dbConf.getOutputFieldNames()
+        && 0 == dbConf.getOutputFieldCount()) {
+      throw new IOException(
+          "Output field names are null and zero output field count set.");
+    }
+  }
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new ExportCallRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to a row in a database table.
+   * The actual database updates are executed in a second thread.
+   */
+  public class ExportCallRecordWriter extends AsyncSqlRecordWriter<K, V> {
+
+    protected String procedureName;
+    protected String [] columnNames; // The columns to insert into.
+    protected int columnCount; // If columnNames is null, tells ## of cols.
+
+    public ExportCallRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+
+      Configuration conf = getConf();
+
+      DBConfiguration dbConf = new DBConfiguration(conf);
+      procedureName = dbConf.getOutputTableName();
+      columnNames = dbConf.getOutputFieldNames();
+      columnCount = dbConf.getOutputFieldCount();
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    protected PreparedStatement getPreparedStatement(
+        List<SqoopRecord> userRecords) throws SQLException {
+
+      PreparedStatement stmt = null;
+
+      // Synchronize on connection to ensure this does not conflict
+      // with the operations in the update thread.
+      Connection conn = getConnection();
+      synchronized (conn) {
+        stmt = conn.prepareCall(getCallStatement(userRecords.size()));
+      }
+
+      for (SqoopRecord record : userRecords) {
+        record.write(stmt, 0);
+        stmt.addBatch();
+      }
+
+      return stmt;
+    }
+
+    @Override
+    protected boolean isBatchExec() {
+      return true;
+    }
+
+    /**
+     * @return an INSERT statement suitable for inserting 'numRows' rows.
+     */
+    protected String getCallStatement(int numRows) {
+      StringBuilder sb = new StringBuilder();
+
+      sb.append("{call " + procedureName + " (");
+
+      int numSlots = columnNames == null ? columnCount : columnNames.length;
+      if (numSlots > 0) {
+        sb.append("?");
+      }
+      for(int i = 1; i < numSlots; ++i) {
+        sb.append(", ?");
+      }
+
+      sb.append(")}");
+
+      return sb.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
new file mode 100644
index 0000000..2459698
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcCallExportJob.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.mapreduce.db.DBOutputFormat;
+
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.google.common.base.Strings;
+
+/**
+ * Run an export using JDBC (JDBC-based ExportCallOutputFormat) to
+ * call the stored procedure.
+ */
+public class JdbcCallExportJob extends JdbcExportJob {
+  public static final String SQOOP_EXPORT_CALL_KEY = "sqoop.export.call";
+
+  public static final Log LOG = LogFactory.getLog(
+      JdbcCallExportJob.class.getName());
+
+  public JdbcCallExportJob(final ExportJobContext context) {
+    super(context, null, null, ExportCallOutputFormat.class);
+  }
+
+  public JdbcCallExportJob(final ExportJobContext ctxt,
+      final Class<? extends Mapper> mapperClass,
+      final Class<? extends InputFormat> inputFormatClass,
+      final Class<? extends OutputFormat> outputFormatClass) {
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
+  }
+
+  /**
+   * makes sure the job knows what stored procedure to call.
+   */
+  @Override
+  protected void propagateOptionsToJob(Job job) {
+    super.propagateOptionsToJob(job);
+    job.getConfiguration().set(SQOOP_EXPORT_CALL_KEY, options.getCall());
+  }
+
+  @Override
+  protected void configureOutputFormat(Job job, String tableName,
+      String tableClassName) throws IOException {
+    String procedureName = job.getConfiguration().get(SQOOP_EXPORT_CALL_KEY);
+
+    ConnManager mgr = context.getConnManager();
+    try {
+      if (Strings.isNullOrEmpty(options.getUsername())) {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString(),
+            options.getConnectionParams());
+      } else {
+        DBConfiguration.configureDB(job.getConfiguration(),
+            mgr.getDriverClass(),
+            options.getConnectString(),
+            options.getUsername(),
+            options.getPassword(),
+            options.getConnectionParams());
+      }
+
+      String [] colNames = options.getColumns();
+      if (null == colNames) {
+        colNames = mgr.getColumnNamesForProcedure(procedureName);
+      }
+      DBOutputFormat.setOutput(
+        job,
+        mgr.escapeTableName(procedureName),
+        colNames);
+
+      job.setOutputFormatClass(getOutputFormatClass());
+      job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Could not load OutputFormat", cnfe);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
index 00bd910..20636a0 100644
--- a/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
+++ b/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
@@ -68,8 +68,15 @@ public class JdbcExportJob extends ExportJobBase {
     if (fileType == FileType.AVRO_DATA_FILE) {
       LOG.debug("Configuring for Avro export");
       ConnManager connManager = context.getConnManager();
-      Map<String, Integer> columnTypeInts =
-        connManager.getColumnTypes(tableName, options.getSqlQuery());
+      Map<String, Integer> columnTypeInts;
+      if (options.getCall() == null) {
+        columnTypeInts = connManager.getColumnTypes(
+          tableName,
+          options.getSqlQuery());
+      } else {
+        columnTypeInts = connManager.getColumnTypesForProcedure(
+          options.getCall());
+      }
       MapWritable columnTypes = new MapWritable();
       for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
         Text columnName = new Text(e.getKey());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/orm/ClassWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java
index 47e1221..126b406 100644
--- a/src/java/org/apache/sqoop/orm/ClassWriter.java
+++ b/src/java/org/apache/sqoop/orm/ClassWriter.java
@@ -1209,6 +1209,10 @@ public class ClassWriter {
       if (null != tableName) {
         // Table-based import. Read column names from table.
         colNames = connManager.getColumnNames(tableName);
+      } else if (options.getCall() != null) {
+        // Read procedure arguments from metadata
+        colNames = connManager.getColumnNamesForProcedure(
+            this.options.getCall());
       } else {
         // Infer/assign column names for arbitrary query.
         colNames = connManager.getColumnNamesForQuery(
@@ -1236,7 +1240,11 @@ public class ClassWriter {
   }
 
   protected Map<String, Integer> getColumnTypes() throws IOException {
-    return connManager.getColumnTypes(tableName, options.getSqlQuery());
+    if (options.getCall() == null) {
+      return connManager.getColumnTypes(tableName, options.getSqlQuery());
+    } else {
+      return connManager.getColumnTypesForProcedure(options.getCall());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index b4b2213..684d4a5 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -142,6 +142,7 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool {
   public static final String HELP_ARG = "help";
   public static final String UPDATE_KEY_ARG = "update-key";
   public static final String UPDATE_MODE_ARG = "update-mode";
+  public static final String CALL_ARG = "call";
 
   // Arguments for validation.
   public static final String VALIDATE_ARG = "validate";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/java/org/apache/sqoop/tool/ExportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ExportTool.java b/src/java/org/apache/sqoop/tool/ExportTool.java
index acd296d..215addd 100644
--- a/src/java/org/apache/sqoop/tool/ExportTool.java
+++ b/src/java/org/apache/sqoop/tool/ExportTool.java
@@ -73,6 +73,9 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
         // Mixed update/insert export
         manager.upsertTable(context);
       }
+    } else if (options.getCall() != null) {
+      // Stored procedure-based export.
+        manager.callTable(context);
     } else {
       // INSERT-based export.
       manager.exportTable(context);
@@ -176,6 +179,12 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
             + "new rows are found with non-matching keys in database")
         .withLongOpt(UPDATE_MODE_ARG)
         .create());
+    exportOpts.addOption(OptionBuilder
+         .hasArg()
+         .withDescription("Populate the table using this stored "
+             + "procedure (one call per row)")
+         .withLongOpt(CALL_ARG)
+         .create());
 
     addValidationOpts(exportOpts);
 
@@ -273,6 +282,10 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
         out.setClearStagingTable(true);
       }
 
+      if (in.hasOption(CALL_ARG)) {
+          out.setCall(in.getOptionValue(CALL_ARG));
+      }
+
       applyValidationOptions(in, out);
       applyNewUpdateOptions(in, out);
       applyInputFormatOptions(in, out);
@@ -290,8 +303,9 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
    */
   protected void validateExportOptions(SqoopOptions options)
       throws InvalidOptionsException {
-    if (options.getTableName() == null) {
-      throw new InvalidOptionsException("Export requires a --table argument."
+    if (options.getTableName() == null && options.getCall() == null) {
+      throw new InvalidOptionsException(
+          "Export requires a --table or a --call argument."
           + HELP_STR);
     } else if (options.getExportDir() == null) {
       throw new InvalidOptionsException(
@@ -322,8 +336,25 @@ public class ExportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
     } else if (options.doClearStagingTable()
         && options.getStagingTableName() == null) {
       // Option to clear staging table specified but not the staging table name
-      throw new InvalidOptionsException("Option to clear the staging table is "
+      throw new InvalidOptionsException(
+          "Option to clear the staging table is "
           + "specified but the staging table name is not.");
+    } else if (options.getCall() != null
+        && options.getStagingTableName() != null) {
+      // using a stored procedure to insert rows is incompatible with using
+      // a staging table (as we don't know where the procedure will put the
+      // data, or what transactions it'll perform)
+      throw new InvalidOptionsException(
+          "Option the use a staging table is "
+          + "specified as well as a call option.");
+    } else if (options.getCall() != null && options.getUpdateKeyCol() != null) {
+        throw new InvalidOptionsException(
+          "Option to call a stored procedure"
+          + "can't be used in update mode.");
+    } else if (options.getCall() != null && options.getTableName() != null) {
+        // we don't know if the stored procedure will insert rows into
+        // a given table
+        throw new InvalidOptionsException("Can't specify --call and --table.");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/com/cloudera/sqoop/SmokeTests.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/SmokeTests.java b/src/test/com/cloudera/sqoop/SmokeTests.java
index 76df6cf..c5dc860 100644
--- a/src/test/com/cloudera/sqoop/SmokeTests.java
+++ b/src/test/com/cloudera/sqoop/SmokeTests.java
@@ -18,6 +18,8 @@
 
 package com.cloudera.sqoop;
 
+import org.apache.sqoop.TestExportUsingProcedure;
+
 import com.cloudera.sqoop.hive.TestHiveImport;
 import com.cloudera.sqoop.hive.TestTableDefWriter;
 import com.cloudera.sqoop.io.TestLobFile;
@@ -82,6 +84,7 @@ public final class SmokeTests {
     suite.addTestSuite(TestBooleanParser.class);
     suite.addTestSuite(TestMerge.class);
     suite.addTestSuite(TestToolPlugin.class);
+    suite.addTestSuite(TestExportUsingProcedure.class);
     suite.addTest(MapreduceTests.suite());
 
     return suite;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/com/cloudera/sqoop/TestConnFactory.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestConnFactory.java b/src/test/com/cloudera/sqoop/TestConnFactory.java
index 893b388..c0b295e 100644
--- a/src/test/com/cloudera/sqoop/TestConnFactory.java
+++ b/src/test/com/cloudera/sqoop/TestConnFactory.java
@@ -122,6 +122,10 @@ public class TestConnFactory extends TestCase {
       return null;
     }
 
+    public String[] getColumnNamesForProcedure(String procedureName) {
+      return null;
+    }
+
     public String getPrimaryKey(String tableName) {
       return null;
     }
@@ -148,6 +152,12 @@ public class TestConnFactory extends TestCase {
       return null;
     }
 
+    @Override
+    public Map<String, Integer> getColumnTypesForProcedure(
+        String procedureName) {
+      return null;
+    }
+
     public ResultSet readTable(String tableName, String [] columns) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/com/cloudera/sqoop/TestExport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestExport.java b/src/test/com/cloudera/sqoop/TestExport.java
index eba10aa..0b650af 100644
--- a/src/test/com/cloudera/sqoop/TestExport.java
+++ b/src/test/com/cloudera/sqoop/TestExport.java
@@ -795,6 +795,18 @@ public class TestExport extends ExportJobTestCase {
   }
 
   public void testColumnsExport() throws IOException, SQLException {
+    testColumnsExport("id,msg," + forIdx(0) + "," + forIdx(2));
+  }
+
+  /**
+   * It's possible to change the column string that
+   * {@link #testColumnsExport()} uses -  you might want to do
+   * this if your database randomly generates column names, instead
+   * of using the given ones (e.g. stored procedure parameter
+   * names in H2)
+   */
+  protected void testColumnsExport(
+      String columnsStr) throws IOException, SQLException {
     final int TOTAL_COLUMNS = 3;
     final int TOTAL_RECORDS = 10;
 
@@ -842,7 +854,6 @@ public class TestExport extends ExportJobTestCase {
     createTextFile(0, TOTAL_RECORDS, false, gen0, gen2);
     createTable(gen0, gen1, gen2);
 
-    String columnsStr = "id,msg," + forIdx(0) + "," + forIdx(2);
     runExport(getArgv(true, 10, 10, "--columns", columnsStr));
 
     ColumnGenerator genNull = new NullColumnGenerator();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java b/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java
index be449e4..e85e62a 100644
--- a/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java
+++ b/src/test/com/cloudera/sqoop/manager/PostgresqlExportTest.java
@@ -50,6 +50,7 @@ public class PostgresqlExportTest extends ExportJobTestCase {
   static final String DATABASE_USER = "sqooptest";
   static final String DATABASE_NAME = "sqooptest";
   static final String TABLE_NAME = "EMPLOYEES_PG";
+  static final String PROCEDURE_NAME = "INSERT_AN_EMPLOYEE";
   static final String STAGING_TABLE_NAME = "STAGING";
   static final String SCHEMA_PUBLIC = "public";
   static final String SCHEMA_SPECIAL = "special";
@@ -80,6 +81,7 @@ public class PostgresqlExportTest extends ExportJobTestCase {
     createTable(STAGING_TABLE_NAME, SCHEMA_PUBLIC);
     createTable(TABLE_NAME, SCHEMA_SPECIAL);
     createTable(STAGING_TABLE_NAME, SCHEMA_SPECIAL);
+    createProcedure(PROCEDURE_NAME, SCHEMA_PUBLIC);
 
     LOG.debug("setUp complete.");
   }
@@ -95,8 +97,86 @@ public class PostgresqlExportTest extends ExportJobTestCase {
     }
   }
 
-  public void createTable(String tableName, String schema) {
-    SqoopOptions options = new SqoopOptions(CONNECT_STRING, tableName);
+  private interface CreateIt {
+    void createIt(
+        Statement st,
+        String fullName,
+        ConnManager manager) throws SQLException;
+  }
+
+  private void createTable(String tableName, String schema) {
+    CreateIt createIt = new CreateIt() {
+      @Override
+      public void createIt(
+          Statement st,
+          String fullName,
+          ConnManager manager) throws SQLException {
+        st.executeUpdate("CREATE TABLE " + fullName + " ("
+          + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
+          + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
+          + manager.escapeColName("start_date") + " DATE, "
+          + manager.escapeColName("salary") + " FLOAT, "
+          + manager.escapeColName("dept") + " VARCHAR(32))");
+      }
+    };
+    create(tableName, "TABLE", schema, createIt);
+  }
+
+  private void createProcedure(String procedureName, String schema) {
+    CreateIt createIt = new CreateIt() {
+      @Override
+      public void createIt(
+          Statement st,
+          String fullName,
+          ConnManager manager) throws SQLException {
+        st.executeUpdate("CREATE OR REPLACE FUNCTION " + fullName + " ("
+          + "IN " + manager.escapeColName("id") + " INT,"
+          + "IN " + manager.escapeColName("name") + " VARCHAR(24),"
+          + "IN " + manager.escapeColName("start_date") + " DATE,"
+          + "IN " + manager.escapeColName("salary") + " FLOAT,"
+          + "IN " + manager.escapeColName("dept") + " VARCHAR(32)"
+          + ") "
+          + "RETURNS VOID "
+          + "AS $$ "
+          + "BEGIN "
+          + "INSERT INTO "
+          + escapeTableOrSchemaName(SCHEMA_PUBLIC)
+          + "."
+          + escapeTableOrSchemaName(TABLE_NAME)
+          + " ("
+          + manager.escapeColName("id")
+          +", "
+          + manager.escapeColName("name")
+          +", "
+          + manager.escapeColName("start_date")
+          +", "
+          + manager.escapeColName("salary")
+          +", "
+          + manager.escapeColName("dept")
+          + ") VALUES ("
+          + manager.escapeColName("id")
+          +", "
+          + manager.escapeColName("name")
+          +", "
+          + manager.escapeColName("start_date")
+          +", "
+          + manager.escapeColName("salary")
+          +", "
+          + manager.escapeColName("dept")
+          + ");"
+          + "END;"
+          + "$$ LANGUAGE plpgsql;");
+      }
+    };
+    create(procedureName, "FUNCTION", schema, createIt);
+  }
+
+  private void create(
+      String name,
+      String type,
+      String schema,
+      CreateIt createIt) {
+    SqoopOptions options = new SqoopOptions(CONNECT_STRING, name);
     options.setUsername(DATABASE_USER);
 
     ConnManager manager = null;
@@ -118,26 +198,24 @@ public class PostgresqlExportTest extends ExportJobTestCase {
       }
 
       String fullTableName = escapeTableOrSchemaName(schema)
-        + "." + escapeTableOrSchemaName(tableName);
+        + "." + escapeTableOrSchemaName(name);
 
       try {
         // Try to remove the table first. DROP TABLE IF EXISTS didn't
         // get added until pg 8.3, so we just use "DROP TABLE" and ignore
         // any exception here if one occurs.
-        st.executeUpdate("DROP TABLE " + fullTableName);
+        st.executeUpdate("DROP " + type + " " + fullTableName);
       } catch (SQLException e) {
-        LOG.info("Couldn't drop table " + schema + "." + tableName + " (ok)",
+        LOG.info("Couldn't drop "
+            + type.toLowerCase()
+            + " " +fullTableName
+            + " (ok)",
           e);
         // Now we need to reset the transaction.
         connection.rollback();
       }
 
-      st.executeUpdate("CREATE TABLE " + fullTableName + " ("
-        + manager.escapeColName("id") + " INT NOT NULL PRIMARY KEY, "
-        + manager.escapeColName("name") + " VARCHAR(24) NOT NULL, "
-        + manager.escapeColName("start_date") + " DATE, "
-        + manager.escapeColName("salary") + " FLOAT, "
-        + manager.escapeColName("dept") + " VARCHAR(32))");
+      createIt.createIt(st, fullTableName, manager);
 
       connection.commit();
     } catch (SQLException sqlE) {
@@ -161,14 +239,19 @@ public class PostgresqlExportTest extends ExportJobTestCase {
     LOG.debug("setUp complete.");
   }
 
-  private String [] getArgv(String tableName,
+  private String [] getArgv(boolean useTable,
                             String... extraArgs) {
     ArrayList<String> args = new ArrayList<String>();
 
     CommonArgs.addHadoopFlags(args);
 
-    args.add("--table");
-    args.add(tableName);
+    if (useTable) {
+      args.add("--table");
+      args.add(TABLE_NAME);
+    } else {
+      args.add("--call");
+      args.add(PROCEDURE_NAME);
+    }
     args.add("--export-dir");
     args.add(getWarehouseDir());
     args.add("--fields-terminated-by");
@@ -208,7 +291,18 @@ public class PostgresqlExportTest extends ExportJobTestCase {
       "3,Fred,2009-01-23,15,marketing",
     });
 
-    runExport(getArgv(TABLE_NAME));
+    runExport(getArgv(true));
+
+    assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection);
+  }
+
+  public void testExportUsingProcedure() throws IOException, SQLException {
+    createTestFile("inputFile", new String[] {
+        "2,Bob,2009-04-20,400,sales",
+        "3,Fred,2009-01-23,15,marketing",
+    });
+
+    runExport(getArgv(false));
 
     assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection);
   }
@@ -221,7 +315,7 @@ public class PostgresqlExportTest extends ExportJobTestCase {
 
     String[] extra = new String[] {"--staging-table", STAGING_TABLE_NAME, };
 
-    runExport(getArgv(TABLE_NAME, extra));
+    runExport(getArgv(true, extra));
 
     assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection);
   }
@@ -234,7 +328,7 @@ public class PostgresqlExportTest extends ExportJobTestCase {
 
     String[] extra = new String[] {"--direct"};
 
-    runExport(getArgv(TABLE_NAME, extra));
+    runExport(getArgv(true, extra));
 
     assertRowCount(2, escapeTableOrSchemaName(TABLE_NAME), connection);
   }
@@ -250,7 +344,7 @@ public class PostgresqlExportTest extends ExportJobTestCase {
       SCHEMA_SPECIAL,
     };
 
-    runExport(getArgv(TABLE_NAME, extra));
+    runExport(getArgv(true, extra));
 
     assertRowCount(2,
       escapeTableOrSchemaName(SCHEMA_SPECIAL)
@@ -272,7 +366,7 @@ public class PostgresqlExportTest extends ExportJobTestCase {
       SCHEMA_SPECIAL,
     };
 
-    runExport(getArgv(TABLE_NAME, extra));
+    runExport(getArgv(true, extra));
 
     assertRowCount(2,
       escapeTableOrSchemaName(SCHEMA_SPECIAL)
@@ -296,7 +390,7 @@ public class PostgresqlExportTest extends ExportJobTestCase {
       SCHEMA_SPECIAL,
     };
 
-    runExport(getArgv(TABLE_NAME, extra));
+    runExport(getArgv(true, extra));
 
     assertRowCount(2,
       escapeTableOrSchemaName(SCHEMA_SPECIAL)
@@ -317,7 +411,7 @@ public class PostgresqlExportTest extends ExportJobTestCase {
       SCHEMA_SPECIAL,
     };
 
-    runExport(getArgv(TABLE_NAME, extra));
+    runExport(getArgv(true, extra));
 
     assertRowCount(2,
       escapeTableOrSchemaName(SCHEMA_SPECIAL)

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
index 4f6fd37..e13f3df 100644
--- a/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
@@ -131,8 +131,10 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase {
       }
     }
 
-    args.add("--table");
-    args.add(getTableName());
+    if (usesSQLtable()) {
+      args.add("--table");
+      args.add(getTableName());
+    }
     args.add("--export-dir");
     args.add(getTablePath().toString());
     args.add("--connect");
@@ -152,6 +154,10 @@ public abstract class ExportJobTestCase extends BaseSqoopTestCase {
     return args.toArray(new String[0]);
   }
 
+  protected boolean usesSQLtable() {
+    return true;
+  }
+
   /** When exporting text columns, what should the text contain? */
   protected String getMsgPrefix() {
     return "textfield";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/20af67ef/src/test/org/apache/sqoop/TestExportUsingProcedure.java
----------------------------------------------------------------------
diff --git a/src/test/org/apache/sqoop/TestExportUsingProcedure.java b/src/test/org/apache/sqoop/TestExportUsingProcedure.java
new file mode 100644
index 0000000..6414ef7
--- /dev/null
+++ b/src/test/org/apache/sqoop/TestExportUsingProcedure.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Types;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.manager.GenericJdbcManager;
+import org.apache.sqoop.tool.ExportTool;
+import org.h2.Driver;
+import org.junit.After;
+import org.junit.Before;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.TestExport;
+
+/**
+ * We'll use H2 as a database as the version of HSQLDB we currently depend on
+ * (1.8) doesn't include support for stored procedures.
+ */
+public class TestExportUsingProcedure extends TestExport {
+  private static final String PROCEDURE_NAME = "INSERT_PROCEDURE";
+  /**
+   * Stored procedures are static; we'll need an instance to get a connection.
+   */
+  private static TestExportUsingProcedure instanceForProcedure;
+  private int functionCalls = 0;
+  private String[] names;
+  private String[] types;
+  private Connection connection;
+
+  @Override
+  @Before
+  public void setUp() {
+    super.setUp();
+    instanceForProcedure = this;
+  }
+
+  @Override
+  public void createTable(ColumnGenerator... extraColumns) throws SQLException {
+    super.createTable(extraColumns);
+    names = new String[extraColumns.length];
+    types = new String[extraColumns.length];
+    for (int i = 0; i < extraColumns.length; ++i) {
+      names[i] = forIdx(i);
+      types[i] = extraColumns[i].getType();
+    }
+    createProcedure(names, types);
+  }
+
+  private void createProcedure(String[] extraNames, String[] extraTypes)
+      throws SQLException {
+    StringBuilder drop = new StringBuilder("DROP ALIAS IF EXISTS ");
+    drop.append(PROCEDURE_NAME);
+
+    StringBuilder create = new StringBuilder("CREATE ALIAS ");
+    create.append(PROCEDURE_NAME);
+    create.append(" FOR \"");
+    create.append(getClass().getName());
+    create.append(".insertFunction");
+    if (extraNames.length > 0) {
+      create.append(getName());
+    }
+    create.append('"');
+
+    Connection conn = getConnection();
+    Statement statement = conn.createStatement();
+    try {
+      statement.execute(drop.toString());
+      statement.execute(create.toString());
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+  }
+
+  @Override
+  protected String getConnectString() {
+    return "jdbc:h2:mem:" + getName();
+  }
+
+  @Override
+  protected void verifyExport(int expectedNumRecords, Connection conn)
+      throws IOException, SQLException {
+    assertEquals("stored procedure must be called for each row",
+        expectedNumRecords, functionCalls);
+    super.verifyExport(expectedNumRecords, conn);
+  }
+
+  @Override
+  protected String[] getArgv(boolean includeHadoopFlags, int rowsPerStmt,
+      int statementsPerTx, String... additionalArgv) {
+    // we need different class names per test, or the classloader will
+    // just use the old class definition even though we've compiled a
+    // new one!
+    String[] args = newStrArray(additionalArgv, "--" + ExportTool.CALL_ARG,
+        PROCEDURE_NAME, "--" + ExportTool.CLASS_NAME_ARG, getName(), "--"
+            + ExportTool.CONN_MANAGER_CLASS_NAME,
+        GenericJdbcManager.class.getName(), "--" + ExportTool.DRIVER_ARG,
+        Driver.class.getName());
+    return super
+        .getArgv(includeHadoopFlags, rowsPerStmt, statementsPerTx, args);
+  }
+
+  @Override
+  protected String[] getCodeGenArgv(String... extraArgs) {
+    String[] myExtraArgs = newStrArray(extraArgs, "--"
+        + ExportTool.CONN_MANAGER_CLASS_NAME,
+        GenericJdbcManager.class.getName(), "--" + ExportTool.DRIVER_ARG,
+        Driver.class.getName());
+    return super.getCodeGenArgv(myExtraArgs);
+  }
+
+  @Override
+  protected Connection getConnection() {
+    if (connection != null) {
+      return connection;
+    }
+    try {
+      connection = DriverManager.getConnection(getConnectString());
+      connection.setAutoCommit(false);
+      return connection;
+    } catch (SQLException e) {
+      throw new AssertionError(e.getMessage());
+    }
+  }
+
+  /**
+   * This gets called during {@link #setUp()} to check the non-HSQLDB database
+   * is valid. We'll therefore set the connection manager here...
+   */
+  @Override
+  protected SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions ret = new SqoopOptions(conf);
+    if (ret.getConnManagerClassName() == null) {
+      ret.setConnManagerClassName(GenericJdbcManager.class.getName());
+    }
+    if (ret.getDriverClassName() == null) {
+      ret.setDriverClassName(Driver.class.getName());
+    }
+    return ret;
+  }
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Override
+  protected boolean usesSQLtable() {
+    return false;
+  }
+
+  @Override
+  @After
+  public void tearDown() {
+    super.tearDown();
+    if (connection != null) {
+      try {
+        connection.close();
+      } catch (SQLException e) {
+        // don't really care, it's only in memory
+      }
+    }
+  }
+
+  // TEST OVERRIDES
+
+  @Override
+  public void testMultiMapTextExportWithStaging() throws IOException,
+      SQLException {
+    try {
+      super.testMultiMapTextExportWithStaging();
+      fail("staging tables not compatible with --call");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+
+  @Override
+  public void testMultiTransactionWithStaging() throws IOException,
+      SQLException {
+    try {
+      super.testMultiTransactionWithStaging();
+      fail("staging tables not compatible with --call");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+
+  /**
+   * H2 renames the stored procedure arguments P1, P2, ..., Pn.
+   */
+  @Override
+  public void testColumnsExport() throws IOException, SQLException {
+    super.testColumnsExport("P1,P2,P3,P4");
+  }
+
+  // STORED PROCEDURES
+
+  private interface SetExtraArgs {
+    void set(PreparedStatement on) throws SQLException;
+  }
+
+  private static void insertFunction(int id, String msg,
+      SetExtraArgs setExtraArgs) throws SQLException {
+    instanceForProcedure.functionCalls += 1;
+    Connection con = instanceForProcedure.getConnection();
+
+    StringBuilder sql = new StringBuilder("insert into ");
+    sql.append(instanceForProcedure.getTableName());
+    sql.append("(id, msg");
+    for (int i = 0; i < instanceForProcedure.names.length; ++i) {
+      sql.append(", ");
+      sql.append(instanceForProcedure.names[i]);
+    }
+    sql.append(") values (");
+    sql.append(StringUtils.repeat("?", ",  ",
+        instanceForProcedure.names.length + 2));
+    sql.append(")");
+
+    PreparedStatement statement = con.prepareStatement(sql.toString());
+    try {
+      statement.setInt(1, id);
+      statement.setString(2, msg);
+      setExtraArgs.set(statement);
+      statement.execute();
+      con.commit();
+    } finally {
+      statement.close();
+    }
+  }
+
+  public static void insertFunction(int id, String msg) throws SQLException {
+    insertFunction(id, msg, new SetExtraArgs() {
+      @Override
+      public void set(PreparedStatement on) throws SQLException {
+      }
+    });
+  }
+
+  public static void insertFunctiontestIntCol(int id, String msg,
+      final int testIntCol) throws SQLException {
+    insertFunction(id, msg, new SetExtraArgs() {
+      @Override
+      public void set(PreparedStatement on) throws SQLException {
+        on.setInt(3, testIntCol);
+      }
+    });
+  }
+
+  public static void insertFunctiontestBigIntCol(int id, String msg,
+      final long testBigIntCol) throws SQLException {
+    insertFunction(id, msg, new SetExtraArgs() {
+      @Override
+      public void set(PreparedStatement on) throws SQLException {
+        on.setLong(3, testBigIntCol);
+      }
+    });
+  }
+
+  public static void insertFunctiontestDatesAndTimes(int id, String msg,
+      final Date date, final Time time) throws SQLException {
+    insertFunction(id, msg, new SetExtraArgs() {
+      @Override
+      public void set(PreparedStatement on) throws SQLException {
+        on.setDate(3, date);
+        on.setTime(4, time);
+      }
+    });
+  }
+
+  public static void insertFunctiontestNumericTypes(int id, String msg,
+      final BigDecimal f, final BigDecimal d) throws SQLException {
+    insertFunction(id, msg, new SetExtraArgs() {
+      @Override
+      public void set(PreparedStatement on) throws SQLException {
+        on.setBigDecimal(3, f);
+        on.setBigDecimal(4, d);
+      }
+    });
+  }
+
+  /**
+   * This test case is special - we're only inserting into a subset of the
+   * columns in the table.
+   */
+  public static void insertFunctiontestColumnsExport(int id, String msg,
+      final int int1, final int int2) throws SQLException {
+    insertFunction(id, msg, new SetExtraArgs() {
+      @Override
+      public void set(PreparedStatement on) throws SQLException {
+        on.setInt(3, int1);
+        on.setNull(4, Types.INTEGER);
+        on.setInt(5, int2);
+      }
+    });
+  }
+
+}


Mime
View raw message