sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject [2/2] git commit: SQOOP-846: Provide a connector for Netezza appliances
Date Tue, 05 Mar 2013 06:23:19 GMT
Updated Branches:
  refs/heads/trunk 34bdf07bc -> 0d5f73ad8


SQOOP-846: Provide a connector for Netezza appliances

(Venkat Ranganathan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/0d5f73ad
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/0d5f73ad
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/0d5f73ad

Branch: refs/heads/trunk
Commit: 0d5f73ad8dccd4630e4762361513b1583b2b1a2f
Parents: 34bdf07
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Mon Mar 4 22:22:46 2013 -0800
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Mon Mar 4 22:22:46 2013 -0800

----------------------------------------------------------------------
 src/docs/user/connectors.txt                       |   67 ++++
 src/java/org/apache/sqoop/lib/DelimiterSet.java    |   11 +
 .../sqoop/manager/DefaultManagerFactory.java       |    6 +
 .../apache/sqoop/manager/DirectNetezzaManager.java |  249 +++++++++++++
 src/java/org/apache/sqoop/manager/MySQLUtils.java  |   14 +-
 .../org/apache/sqoop/manager/NetezzaManager.java   |  222 +++++++++++
 .../db/netezza/NetezzaDBDataSliceSplitter.java     |   60 +++
 .../netezza/NetezzaExternalTableExportMapper.java  |  221 +++++++++++
 .../netezza/NetezzaExternalTableImportMapper.java  |  224 +++++++++++
 .../NetezzaExternalTableRecordExportMapper.java    |   38 ++
 .../NetezzaExternalTableTextExportMapper.java      |   38 ++
 .../db/netezza/NetezzaJDBCStatementRunner.java     |   95 +++++
 .../netezza/NetezzaDataDrivenDBInputFormat.java    |   69 ++++
 .../netezza/NetezzaExternalTableExportJob.java     |  117 ++++++
 .../netezza/NetezzaExternalTableImportJob.java     |  123 ++++++
 .../netezza/NetezzaExternalTableInputFormat.java   |  112 ++++++
 .../netezza/NetezzaExternalTableInputSplit.java    |   74 ++++
 .../manager/DirectNetezzaExportManualTest.java     |  286 +++++++++++++++
 .../sqoop/manager/NetezzaImportManualTest.java     |  225 ++++++++++++
 .../cloudera/sqoop/manager/NetezzaTestUtils.java   |   93 +++++
 20 files changed, 2338 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/docs/user/connectors.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt
index 7dd2a2e..c172c4b 100644
--- a/src/docs/user/connectors.txt
+++ b/src/docs/user/connectors.txt
@@ -243,3 +243,70 @@ or map tasks fail.
 When reduce task fails,
 staging table for the task are left for manual retry and
 users must take care of it.
+
+Netezza Connector
+~~~~~~~~~~~~~~~~~
+
+Extra arguments
+^^^^^^^^^^^^^^^
+
+List of all extra arguments supported by Netezza Connector is shown below:
+
+.Supported Netezza extra arguments:
+[grid="all"]
+`-------------------------------------`----------------------------------------
+Argument                              Description
+-------------------------------------------------------------------------------
++--partitioned-access+                Whether each mapper acts on a subset\
+                                      of data slices of a table or all\
+                                      Default is "false" for standard mode\
+                                      and "true" for direct mode.
++--max-errors+                        Applicable only in direct mode.\
+                                      This option specifies the error threshold\
+                                      per mapper while transferring data. If\
+                                      the number of errors encountered exceed\
+                                      this threshold then the job will fail.
+                                      Default value is 1.
++--log-dir+                           Applicable only in direct mode.\
+                                      Specifies the directory where Netezza\
+                                      external table operation logs are stored.\
+                                      Default value is /tmp.
+--------------------------------------------------------------------------------
+
+
+Direct Mode
+^^^^^^^^^^^
+Netezza connector supports an optimized data transfer facility using the
+Netezza external tables feature.  Each map tasks of Netezza connector's import
+job will work on a subset of the Netezza partitions and transparently create
+and use an external table to transport data.  Similarly, export jobs will use
+the external table to push data fast onto the NZ system.   Direct mode does
+not support staging tables, upsert options etc.
+
+Here is an example of complete command line for import using the Netezza
+external table feature.
+
+----
+$ sqoop import \
+    --direct \
+    --connect jdbc:netezza://nzhost:5480/sqoop \
+    --table nztable \
+    --username nzuser \
+    --password nzpass \
+    --target-dir hdfsdir
+
+----
+
+Here is an example of complete command line for export with tab as the field
+terminator character.
+
+----
+$ sqoop export \
+    --direct \
+    --connect jdbc:netezza://nzhost:5480/sqoop \
+    --table nztable \
+    --username nzuser \
+    --password nzpass \
+    --export-dir hdfsdir \
+    --input-fields-terminated-by "\t"
+----

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/lib/DelimiterSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/lib/DelimiterSet.java b/src/java/org/apache/sqoop/lib/DelimiterSet.java
index 4e9bcab..ef62ba0 100644
--- a/src/java/org/apache/sqoop/lib/DelimiterSet.java
+++ b/src/java/org/apache/sqoop/lib/DelimiterSet.java
@@ -33,8 +33,19 @@ public class DelimiterSet implements Cloneable {
 
   // If true, then the enclosed-by character is applied to every
   // field, not just ones containing embedded delimiters.
+
   private boolean encloseRequired;
 
+  public static final String OUTPUT_FIELD_DELIM_KEY =
+      "sqoop.output.field.delim";
+  public static final String OUTPUT_RECORD_DELIM_KEY =
+      "sqoop.output.record.delim";
+  public static final String OUTPUT_ENCLOSED_BY_KEY =
+      "sqoop.output.enclosed.by";
+  public static final String OUTPUT_ESCAPED_BY_KEY =
+      "sqoop.output.escaped.by";
+  public static final String OUTPUT_ENCLOSE_REQUIRED_KEY =
+      "sqoop.output.enclose.required";
   /**
    * Create a delimiter set with the default delimiters
    * (comma for fields, newline for records).

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java b/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
index 54eb258..72a955c 100644
--- a/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
+++ b/src/java/org/apache/sqoop/manager/DefaultManagerFactory.java
@@ -68,6 +68,12 @@ public class DefaultManagerFactory
       return new SQLServerManager(options);
     } else if (scheme.startsWith("jdbc:db2:")) {
       return new Db2Manager(options);
+    } else if (scheme.startsWith("jdbc:netezza:")) {
+      if (options.isDirect()) {
+        return new DirectNetezzaManager(options);
+      } else {
+        return new NetezzaManager(options);
+      }
     } else {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
new file mode 100644
index 0000000..0a1e605
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableExportJob;
+import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableImportJob;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.cli.RelatedOptions;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Manages direct mode transfers from Netezza databases using the external table
+ * options.
+ */
+public class DirectNetezzaManager extends NetezzaManager {
+
+  public static final Log LOG = LogFactory.getLog(DirectNetezzaManager.class
+      .getName());
+
+  public static final String NETEZZA_LOG_DIR_OPT = "netezza.log.dir";
+  public static final String NETEZZA_LOG_DIR_LONG_ARG = "log-dir";
+
+  public static final String NETEZZA_ERROR_THRESHOLD_OPT =
+      "netezza.error.threshold";
+  public static final String NETEZZA_ERROR_THRESHOLD_LONG_ARG =
+      "max-errors";
+
+  private static final String QUERY_CHECK_DICTIONARY_FOR_TABLE =
+      "SELECT 1 FROM _V_OBJECTS WHERE OWNER= ? "
+      + " AND OBJNAME = ? and OBJTYPE = 'TABLE'";
+
+  public DirectNetezzaManager(SqoopOptions opts) {
+    super(opts);
+    try {
+      handleNetezzaExtraArgs(options);
+    } catch (ParseException ioe) {
+      throw  new RuntimeException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Check Table if it is valid for export. Parse the table like what we do in
+   * Oracle manager
+   *
+   * @throws IOException
+   * @throws ExportException
+   */
+  private void checkTable() throws IOException, ExportException {
+    String tableOwner = this.options.getUsername();
+    String tableName = this.options.getTableName();
+    String shortTableName = tableName;
+    int qualifierIndex = tableName.indexOf('.');
+    if (qualifierIndex != -1) {
+      tableOwner = tableName.substring(0, qualifierIndex);
+      shortTableName = tableName.substring(qualifierIndex + 1);
+    }
+    Connection conn = null;
+    PreparedStatement ps = null;
+    ResultSet rs = null;
+
+    try {
+      try {
+        conn = getConnection();
+        ps = conn.prepareStatement(QUERY_CHECK_DICTIONARY_FOR_TABLE,
+            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+        ps.setString(1, tableOwner);
+        ps.setString(2, shortTableName);
+        rs = ps.executeQuery();
+        if (!rs.next()) {
+          String message = tableName
+              + " is not a valid Netezza table.  "
+              + "Please make sure that you have connected to the Netezza DB "
+              + "and the table name is right.   The current values are\n\t"
+              + "  connection string : " + options.getConnectString()
+              + "\n\t  table owner : " + tableOwner + "\n\t  table name : "
+              + shortTableName;
+          LOG.error(message);
+          throw new IOException(message);
+        }
+      } finally {
+        if (rs != null) {
+          rs.close();
+        }
+        if (ps != null) {
+          ps.close();
+        }
+        close();
+      }
+    } catch (SQLException sqle) {
+      throw new IOException("SQL exception checking table "
+          + sqle.getMessage(), sqle);
+    }
+  }
+
+  /**
+   * Export data stored in HDFS into a table in a database.
+   */
+  public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    options = context.getOptions();
+    context.setConnManager(this);
+
+
+    checkTable(); // Throws excpetion as necessary
+    NetezzaExternalTableExportJob exporter = null;
+
+    char qc = (char) options.getInputEnclosedBy();
+    char ec = (char) options.getInputEscapedBy();
+
+    if (qc > 0 && !(qc == '"' || qc == '\'')) {
+      throw new ExportException("Input enclosed-by character must be '\"' "
+         + "or ''' for netezza direct mode exports");
+    }
+    if (ec > 0 && ec != '\\') {
+      throw new ExportException("Input escaped-by character must be '\\' "
+          + "for netezza direct mode exports");
+    }
+    exporter = new NetezzaExternalTableExportJob(context);
+    exporter.runExport();
+  }
+
+  /**
+   * Import the table into HDFS by using Netezza external tables to pull out the
+   * data from the database and upload the files directly to HDFS.
+   */
+  @Override
+  public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException {
+
+    context.setConnManager(this);
+
+    String tableName = context.getTableName();
+    String jarFile = context.getJarFile();
+    SqoopOptions options = context.getOptions();
+
+    if (null == tableName) {
+      LOG.
+        error("Netezza external table import does not support query imports.");
+      LOG.
+        error("Do not use --direct and --query together for Netezza.");
+      throw
+        new IOException("Null tableName for Netezza external table import.");
+    }
+
+    char qc = options.getOutputEnclosedBy();
+    char ec = options.getOutputEscapedBy();
+
+    if (qc > 0 && !(qc == '"' || qc == '\'')) {
+      throw new ImportException("Output enclosed-by character must be '\"' "
+         + "or ''' for netezza direct mode imports");
+    }
+    if (ec > 0 && ec != '\\') {
+      throw new ImportException("Output escaped-by character must be '\\' "
+          + "for netezza direct mode exports");
+    }
+
+    NetezzaExternalTableImportJob importer = null;
+
+    importer = new NetezzaExternalTableImportJob(options, context);
+
+    // Direct Netezza Manager will use the datasliceid so no split columns
+    // will be used.
+
+    LOG.info("Beginning netezza fast path import");
+
+    if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
+      LOG.warn("File import layout " + options.getFileLayout()
+          + " is not supported by");
+      LOG.warn("Netezza direct import; import will proceed as text files.");
+    }
+
+    importer.runImport(tableName, jarFile, null, options.getConf());
+  }
+
+  protected  RelatedOptions getNetezzaExtraOpts() {
+    // Just add the options from NetezzaManager and ignore the setting
+    // for direct mode access
+    RelatedOptions netezzaOpts =
+        new RelatedOptions("Netezza Connector Direct mode options");
+
+    netezzaOpts.addOption(OptionBuilder
+        .withArgName(NETEZZA_ERROR_THRESHOLD_OPT).hasArg()
+        .withDescription("Error threshold for the job")
+        .withLongOpt(NETEZZA_ERROR_THRESHOLD_LONG_ARG).create());
+    netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_LOG_DIR_OPT)
+        .hasArg().withDescription("Netezza log directory")
+        .withLongOpt(NETEZZA_LOG_DIR_LONG_ARG).create());
+    return netezzaOpts;
+  }
+
+  private void handleNetezzaExtraArgs(SqoopOptions opts)
+      throws ParseException {
+
+    Configuration conf = opts.getConf();
+
+    String[] extraArgs = opts.getExtraArgs();
+
+    RelatedOptions netezzaOpts = getNetezzaExtraOpts();
+    CommandLine cmdLine = new GnuParser().parse(netezzaOpts, extraArgs, true);
+    if (cmdLine.hasOption(NETEZZA_ERROR_THRESHOLD_LONG_ARG)) {
+      int threshold = Integer.parseInt(cmdLine
+          .getOptionValue(NETEZZA_ERROR_THRESHOLD_LONG_ARG));
+      conf.setInt(NETEZZA_ERROR_THRESHOLD_OPT, threshold);
+    }
+    if (cmdLine.hasOption(NETEZZA_LOG_DIR_LONG_ARG)) {
+      String dir = cmdLine.getOptionValue(NETEZZA_LOG_DIR_LONG_ARG);
+      conf.set(NETEZZA_LOG_DIR_OPT, dir);
+    }
+
+    // Always true for Netezza direct mode access
+    conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true);
+  }
+
+  @Override
+  public boolean supportsStagingForExport() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/manager/MySQLUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/MySQLUtils.java b/src/java/org/apache/sqoop/manager/MySQLUtils.java
index ef18818..c86cf1a 100644
--- a/src/java/org/apache/sqoop/manager/MySQLUtils.java
+++ b/src/java/org/apache/sqoop/manager/MySQLUtils.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
 import com.cloudera.sqoop.config.ConfigurationConstants;
 import com.cloudera.sqoop.config.ConfigurationHelper;
 import com.cloudera.sqoop.util.DirectImportUtils;
+import org.apache.sqoop.lib.DelimiterSet;
 
 /**
  * Helper methods and constants for MySQL imports/exports.
@@ -48,15 +49,16 @@ public final class MySQLUtils {
   public static final String MYSQL_IMPORT_CMD = "mysqlimport";
 
   public static final String OUTPUT_FIELD_DELIM_KEY =
-      "sqoop.output.field.delim";
+      DelimiterSet.OUTPUT_FIELD_DELIM_KEY;
   public static final String OUTPUT_RECORD_DELIM_KEY =
-      "sqoop.output.record.delim";
+      DelimiterSet.OUTPUT_RECORD_DELIM_KEY;
   public static final String OUTPUT_ENCLOSED_BY_KEY =
-      "sqoop.output.enclosed.by";
+      DelimiterSet.OUTPUT_ENCLOSED_BY_KEY;
   public static final String OUTPUT_ESCAPED_BY_KEY =
-      "sqoop.output.escaped.by";
+      DelimiterSet.OUTPUT_ESCAPED_BY_KEY;
   public static final String OUTPUT_ENCLOSE_REQUIRED_KEY =
-      "sqoop.output.enclose.required";
+      DelimiterSet.OUTPUT_ENCLOSE_REQUIRED_KEY;
+
   public static final String TABLE_NAME_KEY =
       ConfigurationHelper.getDbInputTableNameProperty();
   public static final String CONNECT_STRING_KEY =
@@ -67,6 +69,7 @@ public final class MySQLUtils {
       ConfigurationHelper.getDbPasswordProperty();
   public static final String WHERE_CLAUSE_KEY =
       ConfigurationHelper.getDbInputConditionsProperty();
+
   public static final String EXTRA_ARGS_KEY =
       "sqoop.mysql.extra.args";
 
@@ -117,4 +120,3 @@ public final class MySQLUtils {
     return tempFile.toString();
   }
 }
-

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/manager/NetezzaManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/manager/NetezzaManager.java b/src/java/org/apache/sqoop/manager/NetezzaManager.java
new file mode 100644
index 0000000..0ac7717
--- /dev/null
+++ b/src/java/org/apache/sqoop/manager/NetezzaManager.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.manager;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.mapreduce.AsyncSqlOutputFormat;
+import org.apache.sqoop.mapreduce.netezza.NetezzaDataDrivenDBInputFormat;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.cli.RelatedOptions;
+import com.cloudera.sqoop.config.ConfigurationHelper;
+import com.cloudera.sqoop.util.ExportException;
+import com.cloudera.sqoop.util.ImportException;
+
+/**
+ * Manages connections to Netezza databases.
+ */
+public class NetezzaManager extends GenericJdbcManager {
+
+  public static final Log LOG = LogFactory.getLog(NetezzaManager.class
+      .getName());
+
+  // driver class to ensure is loaded when making db connection.
+  private static final String DRIVER_CLASS = "org.netezza.Driver";
+
+  // set to true after we warn the user that we can use direct fastpath.
+  protected static boolean directModeWarningPrinted = false;
+
+  // set to true after we warn the user that they should consider using
+  // batching.
+  protected static boolean batchModeWarningPrinted = false;
+
+  public static final String NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT =
+      "netezza.dataslice.aligned.access";
+
+  public static final String NETEZZA_DATASLICE_ALIGNED_ACCESS_LONG_ARG =
+      "partitioned-access";
+
+  public NetezzaManager(final SqoopOptions opts) {
+    super(DRIVER_CLASS, opts);
+  }
+
+
+  @Override
+  public String escapeColName(String colName) {
+    return escapeIdentifier(colName);
+  }
+
+  @Override
+  public String escapeTableName(String tableName) {
+    return escapeIdentifier(tableName);
+  }
+
+  protected String escapeIdentifier(String identifier) {
+    if (identifier == null) {
+      return null;
+    }
+    return "\"" + identifier.replace("\"", "\"\"") + "\"";
+  }
+
+
+  @Override
+  public void close() throws SQLException {
+    if (this.hasOpenConnection()) {
+      this.getConnection().rollback(); // Rollback any changes
+    }
+
+    super.close();
+  }
+
+  @Override
+  public void importTable(com.cloudera.sqoop.manager.ImportJobContext context)
+      throws IOException, ImportException {
+    context.setConnManager(this);
+    // The user probably should have requested --direct to invoke external
+    // table option.
+    // Display a warning informing them of this fact.
+    if (!NetezzaManager.directModeWarningPrinted) {
+      LOG.warn("It looks like you are importing from Netezza.");
+      LOG.warn("This transfer can be faster! Use the --direct");
+      LOG.warn("option to exercise a Netezza-specific fast path.");
+
+      NetezzaManager.directModeWarningPrinted = true; // don't display this
+                                                      // twice.
+    }
+    try {
+      handleNetezzaImportExtraArgs(context);
+    } catch (ParseException pe) {
+      throw (ImportException) new ImportException(pe.getMessage(), pe);
+    }
+    // Then run the normal importTable() method.
+    super.importTable(context);
+  }
+
+  @Override
+  public void exportTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    // The user probably should have requested --direct to invoke external
+    // table option.
+    // Display a warning informing them of this fact.
+    context.setConnManager(this);
+    if (!NetezzaManager.directModeWarningPrinted) {
+      LOG.warn("It looks like you are exporting to Netezza.");
+      LOG.warn("This transfer can be faster! Use the --direct");
+      LOG.warn("option to exercise a Netezza-specific fast path.");
+
+      NetezzaManager.directModeWarningPrinted = true; // don't display this
+                                                      // twice.
+    }
+
+    // Netezza does not have multi row inserts
+    if (!options.isBatchMode()) {
+      if (!NetezzaManager.batchModeWarningPrinted) {
+        LOG.warn("It looks like you are exporting to Netezza in non-batch ");
+        LOG.warn("mode.  Still this transfer can be made faster! Use the ");
+        LOG.warn("--batch option to exercise a Netezza-specific fast path.");
+        LOG.warn("Forcing records per statement to 1 in non batch mode");
+
+        NetezzaManager.batchModeWarningPrinted = true; // don't display this
+                                                       // twice.
+      }
+      context.getOptions().getConf()
+          .setInt(AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, 1);
+    }
+    // options.setBatchMode(true);
+    // TODO Force batchmode?
+    super.exportTable(context);
+  }
+
+  @Override
+  public void updateTable(com.cloudera.sqoop.manager.ExportJobContext context)
+      throws IOException, ExportException {
+    if (options.getNumMappers() > 1) {
+      String msg = "Netezza update with multiple mappers can lead to "
+          + "inconsistencies - Please set num-mappers option to 1 in the SQOOP "
+          + "command line for update jobs with Netezza and SQOOP";
+      throw new ExportException(msg);
+    }
+
+    if (!options.isBatchMode()) {
+      if (!NetezzaManager.batchModeWarningPrinted) {
+        LOG.warn("It looks like you are exporting to Netezza in non-batch ");
+        LOG.warn("mode.  Still this transfer can be made faster! Use the ");
+        LOG.warn("--batch option to exercise a Netezza-specific fast path.");
+        LOG.warn("Forcing records per statement to 1 in non batch mode");
+        NetezzaManager.batchModeWarningPrinted = true; // don't display this
+                                                       // twice.
+      }
+      context.getOptions().getConf()
+          .setInt(AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY, 1);
+    }
+    super.updateTable(context);
+  }
+
+  @Override
+  public boolean supportsStagingForExport() {
+    return true;
+  }
+
+  @Override
+  protected String getCurTimestampQuery() {
+    return "SELECT CURRENT_TIMESTAMP";
+  }
+
+  protected RelatedOptions getNetezzaExtraOpts() {
+    RelatedOptions netezzaOpts = new RelatedOptions("Netezza options");
+    netezzaOpts.addOption(OptionBuilder
+        .withArgName(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT).hasArg()
+        .withDescription("Data slice aligned import")
+        .withLongOpt(NETEZZA_DATASLICE_ALIGNED_ACCESS_LONG_ARG).create());
+    return netezzaOpts;
+  }
+
+  private void handleNetezzaImportExtraArgs(ImportJobContext context)
+      throws ParseException {
+
+    SqoopOptions opts = context.getOptions();
+    Configuration conf = opts.getConf();
+
+    String[] extraArgs = opts.getExtraArgs();
+
+
+    conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, false);
+
+    if (extraArgs != null && extraArgs.length > 0
+        && ConfigurationHelper.getConfNumMaps(conf) > 1) {
+      RelatedOptions netezzaOpts = getNetezzaExtraOpts();
+      CommandLine cmdLine = new GnuParser().parse(netezzaOpts, extraArgs, true);
+      if (cmdLine.hasOption(NETEZZA_DATASLICE_ALIGNED_ACCESS_LONG_ARG)) {
+        conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true);
+        context.setInputFormat(NetezzaDataDrivenDBInputFormat.class);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaDBDataSliceSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaDBDataSliceSplitter.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaDBDataSliceSplitter.java
new file mode 100644
index 0000000..368a349
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaDBDataSliceSplitter.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.sqoop.mapreduce.db.DBSplitter;
+import
+  org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat.DataDrivenDBInputSplit;
+
+import com.cloudera.sqoop.config.ConfigurationHelper;
+
+/**
+ * Netezza specific splitter based on data slice id.
+ */
+public class NetezzaDBDataSliceSplitter implements DBSplitter {
+
+  // Note: We have removed the throws SQLException clause as there is no
+  // SQL work done in this method
+  @Override
+  public List<InputSplit> split(Configuration conf, ResultSet results,
+      String colName) {
+    // For each map we will add a split such that
+    // the datasliceid % the mapper index equals the mapper index.
+    // The query will only be on the lower bound where clause.
+    // For upper bounds, we will specify a constant clause which always
+    // evaluates to true
+
+    int numSplits = ConfigurationHelper.getConfNumMaps(conf);
+    List<InputSplit> splitList = new ArrayList<InputSplit>(numSplits);
+    for (int i = 0; i < numSplits; ++i) {
+      StringBuilder lowerBoundClause = new StringBuilder(128);
+      lowerBoundClause.append(" datasliceid % ").append(numSplits)
+          .append(" = ").append(i);
+      splitList.add(new DataDrivenDBInputSplit(lowerBoundClause.toString(),
+          "1 = 1"));
+    }
+    return splitList;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
new file mode 100644
index 0000000..410a569
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.io.NamedFifo;
+import org.apache.sqoop.lib.SqoopRecord;
+import org.apache.sqoop.manager.DirectNetezzaManager;
+import org.apache.sqoop.mapreduce.SqoopMapper;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.PerfCounters;
+import org.apache.sqoop.util.TaskId;
+
+import com.cloudera.sqoop.lib.DelimiterSet;
+
+/**
+ * Netezza export mapper using external tables.
+ */
+public abstract class NetezzaExternalTableExportMapper<K, V> extends
+    SqoopMapper<K, V, NullWritable, NullWritable> {
+  /**
+   * Create a named FIFO, and start the Netezza JDBC thread connected to that
+   * FIFO. A File object representing the FIFO is in 'fifoFile'.
+   */
+
+  private Configuration conf;
+  private DBConfiguration dbc;
+  private File fifoFile;
+  private Connection con;
+  private OutputStream recordWriter;
+  public static final Log LOG = LogFactory
+      .getLog(NetezzaExternalTableImportMapper.class.getName());
+  private NetezzaJDBCStatementRunner extTableThread;
+  private PerfCounters counter;
+  private DelimiterSet outputDelimiters;
+
+  private String getSqlStatement() throws IOException {
+
+    char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ',');
+    char qc = (char) conf.getInt(DelimiterSet.OUTPUT_ENCLOSED_BY_KEY, 0);
+    char ec = (char) conf.getInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, 0);
+
+    int errorThreshold = conf.getInt(
+        DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
+    String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+
+    StringBuilder sqlStmt = new StringBuilder(2048);
+
+    sqlStmt.append("INSERT INTO ");
+    sqlStmt.append(dbc.getInputTableName());
+    sqlStmt.append(" SELECT * FROM EXTERNAL '");
+    sqlStmt.append(fifoFile.getAbsolutePath());
+    sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
+    sqlStmt.append(" BOOLSTYLE 'TRUE_FALSE' ");
+    sqlStmt.append(" CRINSTRING FALSE ");
+    sqlStmt.append(" DELIMITER ");
+    sqlStmt.append(Integer.toString(fd));
+    sqlStmt.append(" ENCODING 'internal' ");
+    if (ec > 0) {
+      sqlStmt.append(" ESCAPECHAR '\\' ");
+    }
+    sqlStmt.append(" FORMAT 'Text' ");
+    sqlStmt.append(" INCLUDEZEROSECONDS TRUE ");
+    sqlStmt.append(" NULLVALUE 'NULL' ");
+    if (qc > 0) {
+      switch (qc) {
+        case '\'':
+          sqlStmt.append(" QUOTEDVALUE SINGLE ");
+          break;
+        case '\"':
+          sqlStmt.append(" QUOTEDVALUE DOUBLE ");
+          break;
+        default:
+          LOG.warn("Unsupported enclosed by character: " + qc + " - ignoring.");
+      }
+    }
+    sqlStmt.append(" MAXERRORS ").append(errorThreshold);
+
+    if (logDir != null) {
+      logDir = logDir.trim();
+      if (logDir.length() > 0) {
+        File logDirPath = new File(logDir);
+        logDirPath.mkdirs();
+        if (logDirPath.canWrite() && logDirPath.isDirectory()) {
+          sqlStmt.append(" LOGDIR ").append(logDir).append(' ');
+        } else {
+          throw new IOException("Unable to create log directory specified");
+        }
+      }
+    }
+    sqlStmt.append(")");
+
+    String stmt = sqlStmt.toString();
+    LOG.debug("SQL generated for external table export" + stmt);
+
+    return stmt;
+  }
+
+  private void initNetezzaExternalTableExport(Context context)
+      throws IOException {
+    this.conf = context.getConfiguration();
+    dbc = new DBConfiguration(conf);
+    File taskAttemptDir = TaskId.getLocalWorkPath(conf);
+    this.outputDelimiters = new DelimiterSet(',', '\n', '\000', '\\', false);
+    this.fifoFile = new File(taskAttemptDir, ("nzexttable-export.txt"));
+    String filename = fifoFile.toString();
+    NamedFifo nf;
+    // Create the FIFO itself.
+    try {
+      nf = new NamedFifo(this.fifoFile);
+      nf.create();
+    } catch (IOException ioe) {
+      // Command failed.
+      LOG.error("Could not create FIFO file " + filename);
+      this.fifoFile = null;
+      throw new IOException(
+          "Could not create FIFO for netezza external table import", ioe);
+    }
+    String sqlStmt = getSqlStatement();
+    boolean cleanup = false;
+    try {
+      con = dbc.getConnection();
+      extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(),
+          con, sqlStmt);
+    } catch (SQLException sqle) {
+      cleanup = true;
+      throw new IOException(sqle);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    } finally {
+      if (con != null && cleanup) {
+        try {
+          con.close();
+        } catch (Exception e) {
+          LOG.debug("Exception closing connection " + e.getMessage());
+        }
+      }
+      con = null;
+    }
+
+    counter = new PerfCounters();
+    extTableThread.start();
+    // We start the JDBC thread first in this case as we want the FIFO reader to
+    // be running.
+    recordWriter = new BufferedOutputStream(new FileOutputStream(nf.getFile()));
+    counter.startClock();
+  }
+
+  @Override
+  public void run(Context context) throws IOException, InterruptedException {
+    setup(context);
+    initNetezzaExternalTableExport(context);
+    if (extTableThread.isAlive()) {
+      try {
+        while (context.nextKeyValue()) {
+          if (Thread.interrupted()) {
+            if (!extTableThread.isAlive()) {
+              break;
+            }
+          }
+          map(context.getCurrentKey(), context.getCurrentValue(), context);
+        }
+        cleanup(context);
+      } finally {
+        recordWriter.close();
+        extTableThread.join();
+        counter.stopClock();
+        LOG.info("Transferred " + counter.toString());
+        if (extTableThread.hasExceptions()) {
+          extTableThread.printException();
+          throw new IOException(extTableThread.getExcepton());
+        }
+      }
+    }
+  }
+
+  protected void writeTextRecord(Text record) throws IOException,
+      InterruptedException {
+    String outputStr = record.toString() + "\n";
+    byte[] outputBytes = outputStr.getBytes("UTF-8");
+    counter.addBytes(outputBytes.length);
+    recordWriter.write(outputBytes, 0, outputBytes.length);
+  }
+
+  protected void writeSqoopRecord(SqoopRecord sqr) throws IOException,
+      InterruptedException {
+    String outputStr = sqr.toString(this.outputDelimiters);
+    byte[] outputBytes = outputStr.getBytes("UTF-8");
+    counter.addBytes(outputBytes.length);
+    recordWriter.write(outputBytes, 0, outputBytes.length);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
new file mode 100644
index 0000000..9e6cab6
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.sqoop.config.ConfigurationHelper;
+import org.apache.sqoop.io.NamedFifo;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.manager.DirectNetezzaManager;
+import org.apache.sqoop.mapreduce.SqoopMapper;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.PerfCounters;
+import org.apache.sqoop.util.TaskId;
+
+/**
+ * Netezza import mapper using external tables.
+ */
+public class NetezzaExternalTableImportMapper extends
+    SqoopMapper<Integer, NullWritable, Text, NullWritable> {
+  /**
+   * Create a named FIFO, and start Netezza import connected to that FIFO. A
+   * File object representing the FIFO is in 'fifoFile'.
+   */
+
+  private Configuration conf;
+  private DBConfiguration dbc;
+  private File fifoFile;
+  private int numMappers;
+  private Connection con;
+  private BufferedReader recordReader;
+  public static final Log LOG = LogFactory
+      .getLog(NetezzaExternalTableImportMapper.class.getName());
+  private NetezzaJDBCStatementRunner extTableThread;
+  private PerfCounters counter;
+
+  private String getSqlStatement(int myId) throws IOException {
+
+    char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ',');
+    char qc = (char) conf.getInt(DelimiterSet.OUTPUT_ENCLOSED_BY_KEY, 0);
+    char ec = (char) conf.getInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, 0);
+    int errorThreshold = conf.getInt(
+        DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
+    String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+    String[] cols = dbc.getOutputFieldNames();
+    String inputConds = dbc.getInputConditions();
+    StringBuilder sqlStmt = new StringBuilder(2048);
+
+    sqlStmt.append("CREATE EXTERNAL TABLE '");
+    sqlStmt.append(fifoFile.getAbsolutePath());
+    sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
+    sqlStmt.append(" BOOLSTYLE 'T_F' ");
+    sqlStmt.append(" CRINSTRING FALSE ");
+    sqlStmt.append(" DELIMITER ");
+    sqlStmt.append(Integer.toString(fd));
+    sqlStmt.append(" ENCODING 'internal' ");
+    if (ec > 0) {
+      sqlStmt.append(" ESCAPECHAR '\\' ");
+    }
+    sqlStmt.append(" FORMAT 'Text' ");
+    sqlStmt.append(" INCLUDEZEROSECONDS TRUE ");
+    sqlStmt.append(" NULLVALUE 'null' ");
+    if (qc > 0) {
+      switch (qc) {
+        case '\'':
+          sqlStmt.append(" QUOTEDVALUE SINGLE ");
+          break;
+        case '\"':
+          sqlStmt.append(" QUOTEDVALUE DOUBLE ");
+          break;
+        default:
+          LOG.warn("Unsupported enclosed by character: " + qc + " - ignoring.");
+      }
+    }
+
+    sqlStmt.append(" MAXERRORS ").append(errorThreshold);
+
+    if (logDir != null) {
+      logDir = logDir.trim();
+      if (logDir.length() > 0) {
+        File logDirPath = new File(logDir);
+        logDirPath.mkdirs();
+        if (logDirPath.canWrite() && logDirPath.isDirectory()) {
+          sqlStmt.append(" LOGDIR ").append(logDir).append(' ');
+        } else {
+          throw new IOException("Unable to create log directory specified");
+        }
+      }
+    }
+
+    sqlStmt.append(") AS SELECT ");
+    if (cols == null || cols.length == 0) {
+      sqlStmt.append('*');
+    } else {
+      sqlStmt.append(cols[0]).append(' ');
+      for (int i = 0; i < cols.length; ++i) {
+        sqlStmt.append(',').append(cols[i]);
+      }
+    }
+    sqlStmt.append(" FROM ").append(dbc.getInputTableName()).append(' ');
+    sqlStmt.append("WHERE (DATASLICEID % ");
+    sqlStmt.append(numMappers).append(") = ").append(myId);
+    if (inputConds != null && inputConds.length() > 0) {
+      sqlStmt.append(" AND ( ").append(inputConds).append(')');
+    }
+
+    String stmt = sqlStmt.toString();
+    LOG.debug("SQL generated for external table import for data slice " + myId
+        + "=" + stmt);
+    return stmt;
+  }
+
+  private void initNetezzaExternalTableImport(int myId) throws IOException {
+
+    File taskAttemptDir = TaskId.getLocalWorkPath(conf);
+
+    this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt"));
+    String filename = fifoFile.toString();
+    NamedFifo nf;
+    // Create the FIFO itself.
+    try {
+      nf = new NamedFifo(this.fifoFile);
+      nf.create();
+    } catch (IOException ioe) {
+      // Command failed.
+      LOG.error("Could not create FIFO file " + filename);
+      this.fifoFile = null;
+      throw new IOException(
+          "Could not create FIFO for netezza external table import", ioe);
+    }
+    String sqlStmt = getSqlStatement(myId);
+    boolean cleanup = false;
+    try {
+      con = dbc.getConnection();
+      extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(),
+          con, sqlStmt);
+    } catch (SQLException sqle) {
+      cleanup = true;
+      throw new IOException(sqle);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    } finally {
+      if (con != null && cleanup) {
+        try {
+          con.close();
+        } catch (Exception e) {
+          LOG.debug("Exception closing connection " + e.getMessage());
+        }
+      }
+      con = null;
+    }
+    extTableThread.start();
+    // We need to start the reader end first
+    recordReader = new BufferedReader(new InputStreamReader(
+        new FileInputStream(nf.getFile())));
+  }
+
+  public void map(Integer dataSliceId, NullWritable val, Context context)
+      throws IOException, InterruptedException {
+    conf = context.getConfiguration();
+    dbc = new DBConfiguration(conf);
+    numMappers = ConfigurationHelper.getConfNumMaps(conf);
+    char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n');
+    initNetezzaExternalTableImport(dataSliceId);
+    counter = new PerfCounters();
+    counter.startClock();
+    Text outputRecord = new Text();
+    if (extTableThread.isAlive()) {
+      try {
+        String inputRecord = recordReader.readLine();
+        while (inputRecord != null) {
+          if (Thread.interrupted()) {
+            if (!extTableThread.isAlive()) {
+              break;
+            }
+          }
+          outputRecord.set(inputRecord + rd);
+          // May be we should set the output to be String for faster performance
+          // There is no real benefit in changing it to Text and then
+          // converting it back in our case
+          context.write(outputRecord, NullWritable.get());
+          counter.addBytes(1 + inputRecord.length());
+          inputRecord = recordReader.readLine();
+        }
+      } finally {
+        recordReader.close();
+        extTableThread.join();
+        counter.stopClock();
+        LOG.info("Transferred " + counter.toString());
+        if (extTableThread.hasExceptions()) {
+          extTableThread.printException();
+          throw new IOException(extTableThread.getExcepton());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableRecordExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableRecordExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableRecordExportMapper.java
new file mode 100644
index 0000000..d3024e5
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableRecordExportMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.sqoop.lib.SqoopRecord;
+
+/**
+ * Netezza export mapper for Sqoop records.
+ */
+public class NetezzaExternalTableRecordExportMapper extends
+    NetezzaExternalTableExportMapper<LongWritable, SqoopRecord> {
+
+  @Override
+  public void map(LongWritable key, SqoopRecord sqr, Context context)
+      throws IOException, InterruptedException {
+    writeSqoopRecord(sqr);
+    context.progress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextExportMapper.java
new file mode 100644
index 0000000..c703b97
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableTextExportMapper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Netezza export mapper for text records.
+ */
+public class NetezzaExternalTableTextExportMapper extends
+    NetezzaExternalTableExportMapper<LongWritable, Text> {
+
+  @Override
+  public void map(LongWritable key, Text text, Context context)
+      throws IOException, InterruptedException {
+    writeTextRecord(text);
+    context.progress();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
new file mode 100644
index 0000000..3a5df40
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A simple class for JDBC External table statement execution for Netezza. Even
+ * though the statements are execute only (no support for bind variables or
+ * resultsets, we use a two step process so that sql statement errors are caught
+ * during construction itself.
+ */
+public class NetezzaJDBCStatementRunner extends Thread {
+  public static final Log LOG = LogFactory
+      .getLog(NetezzaJDBCStatementRunner.class.getName());
+
+  private Connection con;
+  private Exception exception;
+  private PreparedStatement ps;
+  private Thread parent;
+
+  public boolean hasExceptions() {
+    return exception != null;
+  }
+
+  public void printException() {
+    if (exception != null) {
+      LOG.error("Errors encountered during external table JDBC processing");
+      LOG.error("Exception " + exception.getMessage(), exception);
+    }
+  }
+
+  public Throwable getExcepton() {
+    if (!hasExceptions()) {
+      return null;
+    }
+    return exception;
+  }
+
+  public NetezzaJDBCStatementRunner(Thread parent, Connection con,
+      String sqlStatement) throws SQLException {
+    this.parent = parent;
+    this.con = con;
+    this.ps = con.prepareStatement(sqlStatement);
+    this.exception = null;
+  }
+
+  public void run() {
+    boolean interruptParent = false;
+    try {
+
+      // Excecute the statement - this will make data to flow in the
+      // named pipes
+      ps.execute();
+
+    } catch (SQLException sqle) {
+      interruptParent = true;
+      LOG.error("Unable to execute external table export", sqle);
+      this.exception = sqle;
+    } finally {
+      if (con != null) {
+        try {
+          con.close();
+        } catch (Exception e) {
+          LOG.debug("Exception closing connection " + e.getMessage());
+        }
+      }
+      con = null;
+    }
+    if (interruptParent) {
+      this.parent.interrupt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaDataDrivenDBInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaDataDrivenDBInputFormat.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaDataDrivenDBInputFormat.java
new file mode 100644
index 0000000..c4e0062
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaDataDrivenDBInputFormat.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.mapreduce.netezza;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.sqoop.manager.NetezzaManager;
+import org.apache.sqoop.mapreduce.DBWritable;
+import org.apache.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+import org.apache.sqoop.mapreduce.db.netezza.NetezzaDBDataSliceSplitter;
+
+import com.cloudera.sqoop.config.ConfigurationHelper;
+
+/**
+ * Netezza specific DB input format.
+ */
+public class NetezzaDataDrivenDBInputFormat<T extends DBWritable> extends
+    DataDrivenDBInputFormat<T> implements Configurable {
+  private static final Log LOG = LogFactory
+      .getLog(NetezzaDataDrivenDBInputFormat.class);
+
+  @Override
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
+    int numMappers = ConfigurationHelper.getJobNumMaps(job);
+
+    String boundaryQuery = getDBConf().getInputBoundingQuery();
+    // Resort to base class if
+    // dataslice aligned import is not requested
+    // Not table extract
+    // No boundary query
+    // Only one mapper.
+    if (!getConf().getBoolean(
+        NetezzaManager.NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, false)
+        || getDBConf().getInputTableName() == null
+        || numMappers == 1
+        || (boundaryQuery != null && !boundaryQuery.isEmpty())) {
+      return super.getSplits(job);
+    }
+
+    // Generate a splitter that splits only on datasliceid. It is an
+    // integer split. We will just use the lower bounding query to specify
+    // the restriction of dataslice and set the upper bound to a constant
+
+    NetezzaDBDataSliceSplitter splitter = new NetezzaDBDataSliceSplitter();
+
+    return splitter.split(getConf(), null, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java
new file mode 100644
index 0000000..2a702d9
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableExportJob.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.netezza;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.manager.ConnManager;
+import org.apache.sqoop.mapreduce.DBWritable;
+import
+  org.apache.sqoop.mapreduce.db.netezza.NetezzaExternalTableRecordExportMapper;
+import
+  org.apache.sqoop.mapreduce.db.netezza.NetezzaExternalTableTextExportMapper;
+
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.ExportJobBase;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * Class that runs an export job using netezza external tables in the mapper.
+ */
+public class NetezzaExternalTableExportJob extends ExportJobBase {
+
+  public static final Log LOG = LogFactory
+      .getLog(NetezzaExternalTableExportJob.class.getName());
+
+  public NetezzaExternalTableExportJob(final ExportJobContext context) {
+    super(context, null, null, NullOutputFormat.class);
+  }
+
+  @Override
+  /**
+   * Configure the inputformat to use for the job.
+   */
+  protected void configureInputFormat(Job job, String tableName,
+      String tableClassName, String splitByCol) throws ClassNotFoundException,
+      IOException {
+
+    // Configure the delimiters, etc.
+    Configuration conf = job.getConfiguration();
+    conf.setInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY,
+        options.getInputFieldDelim());
+    conf.setInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY,
+        options.getInputRecordDelim());
+    conf.setInt(DelimiterSet.OUTPUT_ENCLOSED_BY_KEY,
+        options.getInputEnclosedBy());
+    // Netezza uses \ as the escape character. Force the use of it
+    int escapeChar = options.getOutputEscapedBy();
+    if (escapeChar > 0 && escapeChar != '\\') {
+      LOG.info("Setting escaped char to \\ for Netezza external table import");
+      conf.setInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, '\\');
+    }
+    conf.setBoolean(DelimiterSet.OUTPUT_ENCLOSE_REQUIRED_KEY,
+        options.isOutputEncloseRequired());
+
+    ConnManager mgr = context.getConnManager();
+    String username = options.getUsername();
+    if (null == username || username.length() == 0) {
+      DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+          options.getConnectString());
+    } else {
+      DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+          options.getConnectString(), username, options.getPassword());
+    }
+
+    String[] colNames = options.getColumns();
+    if (null == colNames) {
+      colNames = mgr.getColumnNames(tableName);
+    }
+
+    String[] sqlColNames = null;
+    if (null != colNames) {
+      sqlColNames = new String[colNames.length];
+      for (int i = 0; i < colNames.length; i++) {
+        sqlColNames[i] = mgr.escapeColName(colNames[i]);
+      }
+    }
+
+    DataDrivenDBInputFormat.setInput(job, DBWritable.class, tableName, null,
+        null, sqlColNames);
+
+    // Configure the actual InputFormat to use.
+    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
+  }
+
+  @Override
+  protected Class<? extends Mapper> getMapperClass() {
+    if (inputIsSequenceFiles()) {
+      return NetezzaExternalTableRecordExportMapper.class;
+    } else {
+      return NetezzaExternalTableTextExportMapper.class;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableImportJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableImportJob.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableImportJob.java
new file mode 100644
index 0000000..7ee6f70
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableImportJob.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.netezza;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.sqoop.lib.DelimiterSet;
+import org.apache.sqoop.mapreduce.DBWritable;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+import org.apache.sqoop.mapreduce.RawKeyTextOutputFormat;
+import org.apache.sqoop.mapreduce.db.netezza.NetezzaExternalTableImportMapper;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
+
+/**
+ * Class that runs an import job using netezza external tables in the mapper.
+ */
+public class NetezzaExternalTableImportJob extends ImportJobBase {
+
+  public NetezzaExternalTableImportJob(final SqoopOptions opts,
+      ImportJobContext context) {
+    super(opts, NetezzaExternalTableImportMapper.class,
+        NetezzaExternalTableInputFormat.class, RawKeyTextOutputFormat.class,
+        context);
+  }
+
+  /**
+   * Configure the inputformat to use for the job.
+   */
+
+  protected void configureInputFormat(Job job, String tableName,
+      String tableClassName, String splitByCol) throws ClassNotFoundException,
+      IOException {
+
+    ConnManager mgr = getContext().getConnManager();
+    String username = options.getUsername();
+    if (null == username || username.length() == 0) {
+      DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+          options.getConnectString());
+    } else {
+      DBConfiguration.configureDB(job.getConfiguration(), mgr.getDriverClass(),
+          options.getConnectString(), username, options.getPassword());
+    }
+
+    String[] colNames = options.getColumns();
+    if (null == colNames) {
+      colNames = mgr.getColumnNames(tableName);
+    }
+
+    String[] sqlColNames = null;
+    if (null != colNames) {
+      sqlColNames = new String[colNames.length];
+      for (int i = 0; i < colNames.length; i++) {
+        sqlColNames[i] = mgr.escapeColName(colNames[i]);
+      }
+    }
+
+    // It's ok if the where clause is null in DBInputFormat.setInput.
+    String whereClause = options.getWhereClause();
+
+    // We can't set the class properly in here, because we may not have the
+    // jar loaded in this JVM. So we start by calling setInput() with
+    // DBWritable and then overriding the string manually.
+
+    // Note that mysqldump also does *not* want a quoted table name.
+    DataDrivenDBInputFormat.setInput(job, DBWritable.class, tableName,
+        whereClause, mgr.escapeColName(splitByCol), sqlColNames);
+
+    Configuration conf = job.getConfiguration();
+    conf.setInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY,
+        options.getOutputFieldDelim());
+    conf.setInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY,
+        options.getOutputRecordDelim());
+    conf.setInt(DelimiterSet.OUTPUT_ENCLOSED_BY_KEY,
+        options.getOutputEnclosedBy());
+    // Netezza uses \ as the escape character. Force the use of it
+    int escapeChar = options.getOutputEscapedBy();
+    if (escapeChar > 0 && escapeChar != '\\') {
+      LOG.info("Setting escaped char to \\ for Netezza external table import");
+      conf.setInt(DelimiterSet.OUTPUT_ESCAPED_BY_KEY, '\\');
+    }
+    conf.setBoolean(DelimiterSet.OUTPUT_ENCLOSE_REQUIRED_KEY,
+        options.isOutputEncloseRequired());
+
+    LOG.debug("Using InputFormat: " + inputFormatClass);
+    job.setInputFormatClass(getInputFormatClass());
+  }
+
+  /**
+   * Set the mapper class implementation to use in the job, as well as any
+   * related configuration (e.g., map output types).
+   */
+  protected void configureMapper(Job job, String tableName,
+      String tableClassName) throws ClassNotFoundException, IOException {
+    job.setMapperClass(getMapperClass());
+    job.setOutputKeyClass(String.class);
+    job.setOutputValueClass(NullWritable.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputFormat.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputFormat.java
new file mode 100644
index 0000000..631c664
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputFormat.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.netezza;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.cloudera.sqoop.config.ConfigurationHelper;
+
+/**
+ * InputFormat designed to take data-driven splits and use them in the netezza
+ * external table import invocation running in the mapper.
+ *
+ * The key emitted by this mapper is the data slice id to for the Netezza
+ * external table query.
+ */
+public class NetezzaExternalTableInputFormat extends
+    InputFormat<Integer, NullWritable> {
+
+  public static final Log LOG = LogFactory
+      .getLog(NetezzaExternalTableInputFormat.class.getName());
+
+  /**
+   * A RecordReader that just takes the WHERE conditions from the DBInputSplit
+   * and relates them to the mapper as a single input record.
+   */
+  public static class NetezzaExternalTableRecordReader extends
+      RecordReader<Integer, NullWritable> {
+
+    private boolean delivered;
+    private InputSplit split;
+
+    public NetezzaExternalTableRecordReader(InputSplit split) {
+      initialize(split, null);
+    }
+
+    @Override
+    public boolean nextKeyValue() {
+      boolean hasNext = !delivered;
+      delivered = true;
+      return hasNext;
+    }
+
+    @Override
+    public Integer getCurrentKey() {
+      return ((NetezzaExternalTableInputSplit) this.split).getDataSliceId();
+    }
+
+    @Override
+    public NullWritable getCurrentValue() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public float getProgress() {
+      return delivered ? 1.0f : 0.0f;
+    }
+
+    @Override
+    public void initialize(InputSplit s, TaskAttemptContext context) {
+      this.split = s;
+      this.delivered = false;
+    }
+  }
+
+  public RecordReader<Integer, NullWritable> createRecordReader(
+      InputSplit split, TaskAttemptContext context) {
+    return new NetezzaExternalTableRecordReader(split);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException,
+      InterruptedException {
+    int targetNumTasks = ConfigurationHelper.getJobNumMaps(context);
+    List<InputSplit> splits = new ArrayList<InputSplit>(targetNumTasks);
+    for (int i = 0; i < targetNumTasks; ++i) {
+      splits.add(new NetezzaExternalTableInputSplit(i));
+    }
+    return splits;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputSplit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputSplit.java b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputSplit.java
new file mode 100644
index 0000000..95cdcba
--- /dev/null
+++ b/src/java/org/apache/sqoop/mapreduce/netezza/NetezzaExternalTableInputSplit.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.netezza;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * Netezza dataslice specific input splitter.
+ *
+ */
+public class NetezzaExternalTableInputSplit extends InputSplit implements
+    Writable {
+
+  public static final Log LOG = LogFactory
+      .getLog(NetezzaExternalTableInputSplit.class.getName());
+
+  private int dataSliceId; // The datasliceid associated with this split
+
+  public NetezzaExternalTableInputSplit() {
+    this.dataSliceId = 0;
+  }
+
+  public NetezzaExternalTableInputSplit(int dataSliceId) {
+    this.dataSliceId = dataSliceId;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return 0L;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    return new String[0];
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    dataSliceId = input.readInt();
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    output.writeInt(dataSliceId);
+  }
+
+  public Integer getDataSliceId() {
+    return dataSliceId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/0d5f73ad/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java b/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java
new file mode 100644
index 0000000..bbcd138
--- /dev/null
+++ b/src/test/com/cloudera/sqoop/manager/DirectNetezzaExportManualTest.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.sqoop.manager.DirectNetezzaManager;
+import org.junit.After;
+import org.junit.Before;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.TestExport;
+
+/**
+ * Test the DirectNetezzaManager implementation's exportJob() functionality.
+ */
+public class DirectNetezzaExportManualTest extends TestExport {
+
+  public static final Log LOG = LogFactory.getLog(
+      DirectNetezzaExportManualTest.class.getName());
+
+  static final String TABLE_PREFIX = "EMPNZ";
+
+  // instance variables populated during setUp, used during tests.
+  private DirectNetezzaManager manager;
+  private Connection conn;
+
+  @Override
+  protected Connection getConnection() {
+    return conn;
+  }
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Override
+  protected String getConnectString() {
+    return NetezzaTestUtils.getNZConnectString();
+  }
+
+  @Override
+  protected String getTablePrefix() {
+    return TABLE_PREFIX;
+  }
+
+  @Override
+  protected String getDropTableStatement(String tableName) {
+    return "DROP TABLE " + tableName;
+  }
+
+  @Before
+  public void setUp() {
+    super.setUp();
+    conn = getConnection();
+    SqoopOptions options = new SqoopOptions(
+        NetezzaTestUtils.getNZConnectString(), getTableName());
+    options.setUsername(NetezzaTestUtils.getNZUser());
+    options.setPassword(NetezzaTestUtils.getNZPassword());
+    this.manager = new DirectNetezzaManager(options);
+
+    try {
+      this.conn = manager.getConnection();
+      this.conn.setAutoCommit(false);
+    } catch (SQLException sqlE) {
+      LOG.error("Encountered SQL Exception: " + sqlE);
+      sqlE.printStackTrace();
+      fail("SQLException when running test setUp(): " + sqlE);
+    }
+  }
+
+  @After
+  public void tearDown() {
+    super.tearDown();
+    if (null != manager) {
+      try {
+        manager.close();
+      } catch (SQLException sqlE) {
+        LOG.error("Got SQLException: " + sqlE.toString());
+        fail("Got SQLException: " + sqlE.toString());
+      }
+    }
+    this.conn = null;
+    this.manager = null;
+
+  }
+
+  @Override
+  protected String [] getCodeGenArgv(String... extraArgs) {
+
+    String [] moreArgs = new String[extraArgs.length + 4];
+    int i = 0;
+    for (i = 0; i < extraArgs.length; i++) {
+      moreArgs[i] = extraArgs[i];
+    }
+
+    // Add username argument for netezza.
+    moreArgs[i++] = "--username";
+    moreArgs[i++] = NetezzaTestUtils.getNZUser();
+    moreArgs[i++] = "--password";
+    moreArgs[i++] = NetezzaTestUtils.getNZPassword();
+
+    return super.getCodeGenArgv(moreArgs);
+  }
+
+  @Override
+  protected String [] getArgv(boolean includeHadoopFlags,
+      int rowsPerStatement, int statementsPerTx, String... additionalArgv) {
+
+    String [] subArgv = newStrArray(additionalArgv, "--direct",
+        "--username", NetezzaTestUtils.getNZUser(), "--password",
+        NetezzaTestUtils.getNZPassword());
+    return super.getArgv(includeHadoopFlags, rowsPerStatement,
+        statementsPerTx, subArgv);
+  }
+
+
+
+  /**
+   * Create the table definition to export to, removing any prior table. By
+   * specifying ColumnGenerator arguments, you can add extra columns to the
+   * table of arbitrary type.
+   */
+  @Override
+  public void createTable(ColumnGenerator... extraColumns) throws SQLException {
+    PreparedStatement statement = conn.prepareStatement(
+        getDropTableStatement(getTableName()), ResultSet.TYPE_FORWARD_ONLY,
+        ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } catch (SQLException sqle) {
+      conn.rollback();
+    } finally {
+      statement.close();
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("CREATE TABLE ");
+    sb.append(getTableName());
+    sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+    int colNum = 0;
+    for (ColumnGenerator gen : extraColumns) {
+      sb.append(", " + forIdx(colNum++) + " " + gen.getType());
+    }
+    sb.append(")");
+
+    statement = conn.prepareStatement(sb.toString(),
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+  }
+  /**
+   * Test an authenticated export using netezza external table import.
+   */
+  public void testAuthExport() throws IOException, SQLException {
+    SqoopOptions options = new SqoopOptions(
+        NetezzaTestUtils.getNZConnectString(),
+        getTableName());
+    options.setUsername(NetezzaTestUtils.getNZUser());
+    options.setPassword(NetezzaTestUtils.getNZPassword());
+
+    manager = new DirectNetezzaManager(options);
+
+    Connection connection = null;
+    Statement st = null;
+
+    String tableName = getTableName();
+
+    try {
+      connection = manager.getConnection();
+      connection.setAutoCommit(false);
+      st = connection.createStatement();
+
+      // create a target database table.
+      try {
+        st.executeUpdate("DROP TABLE " + tableName);
+      } catch(SQLException sqle) {
+        LOG.info("Ignoring exception from DROP TABLE : " + sqle.getMessage());
+        connection.rollback();
+      }
+
+      LOG.info("Creating table " + tableName);
+
+      st.executeUpdate("CREATE TABLE " + tableName + " ("
+          + "id INT NOT NULL PRIMARY KEY, "
+          + "msg VARCHAR(24) NOT NULL)");
+
+      connection.commit();
+      LOG.info("Created table " + tableName);
+
+      // Write a file containing a record to export.
+      Path tablePath = getTablePath();
+      Path filePath = new Path(tablePath, "datafile");
+      Configuration conf = new Configuration();
+
+      FileSystem fs = FileSystem.get(conf);
+      fs.mkdirs(tablePath);
+      OutputStream os = fs.create(filePath);
+      BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+      w.write(getRecordLine(0));
+      w.write(getRecordLine(1));
+      w.write(getRecordLine(2));
+      w.close();
+      os.close();
+
+      // run the export and verify that the results are good.
+      runExport(getArgv(true, 10, 10,
+          "--username", NetezzaTestUtils.getNZUser(),
+          "--password", NetezzaTestUtils.getNZPassword(),
+          "--connect", NetezzaTestUtils.getNZConnectString()));
+      verifyExport(3, connection);
+    } catch (SQLException sqlE) {
+      LOG.error("Encountered SQL Exception: " + sqlE);
+      sqlE.printStackTrace();
+      fail("SQLException when accessing target table. " + sqlE);
+    } finally {
+      try {
+        if (null != st) {
+          st.close();
+        }
+      } catch (SQLException sqlE) {
+        LOG.warn("Got SQLException when closing connection: " + sqlE);
+      }
+    }
+  }
+
+
+  @Override
+  public void testMultiMapTextExportWithStaging()
+    throws IOException, SQLException {
+    // disable this test as staging is not supported in direct mode
+  }
+
+  @Override
+  public void testMultiTransactionWithStaging()
+    throws IOException, SQLException {
+    // disable this test as staging is not supported in direct mode
+  }
+
+  @Override
+  public void testColumnsExport()
+    throws IOException, SQLException {
+    // disable this test as it is not supported in direct mode
+  }
+
+  @Override
+  public void testSequenceFileExport()
+    throws IOException, SQLException {
+    // disable this test as it is not supported in direct mode
+  }
+}


Mime
View raw message