sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1190489 [2/6] - in /incubator/sqoop/trunk/src/java: com/cloudera/sqoop/mapreduce/ org/apache/sqoop/mapreduce/
Date Fri, 28 Oct 2011 18:22:19 GMT
Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,394 +18,48 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.config.ConfigurationHelper;
-import com.cloudera.sqoop.lib.SqoopRecord;
-import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.orm.TableClassName;
-import com.cloudera.sqoop.util.ExportException;
-import com.cloudera.sqoop.util.PerfCounters;
 
 /**
- * Base class for running an export MapReduce job.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class ExportJobBase extends JobBase {
-
-  /**
-   * The (inferred) type of a file or group of files.
-   */
-  public enum FileType {
-    SEQUENCE_FILE, AVRO_DATA_FILE, UNKNOWN
-  }
+public class ExportJobBase
+    extends org.apache.sqoop.mapreduce.ExportJobBase {
 
-  public static final Log LOG = LogFactory.getLog(
-      ExportJobBase.class.getName());
-
-  /** What SqoopRecord class to use to read a record for export. */
   public static final String SQOOP_EXPORT_TABLE_CLASS_KEY =
-      "sqoop.mapreduce.export.table.class";
+      org.apache.sqoop.mapreduce.ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY;
 
-  /**
-   * What column of the table to use for the WHERE clause of
-   * an updating export.
-   */
   public static final String SQOOP_EXPORT_UPDATE_COL_KEY =
-      "sqoop.mapreduce.export.update.col";
+      org.apache.sqoop.mapreduce.ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY;
 
-  /** Number of map tasks to use for an export. */
   public static final String EXPORT_MAP_TASKS_KEY =
-      "sqoop.mapreduce.export.map.tasks";
-
-  protected ExportJobContext context;
+      org.apache.sqoop.mapreduce.ExportJobBase.EXPORT_MAP_TASKS_KEY;
 
   public ExportJobBase(final ExportJobContext ctxt) {
-    this(ctxt, null, null, null);
+    super(ctxt);
   }
 
   public ExportJobBase(final ExportJobContext ctxt,
       final Class<? extends Mapper> mapperClass,
       final Class<? extends InputFormat> inputFormatClass,
       final Class<? extends OutputFormat> outputFormatClass) {
-    super(ctxt.getOptions(), mapperClass, inputFormatClass, outputFormatClass);
-    this.context = ctxt;
+    super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
   }
 
-  /**
-   * @return true if p is a SequenceFile, or a directory containing
-   * SequenceFiles.
-   */
   public static boolean isSequenceFiles(Configuration conf, Path p)
       throws IOException {
-    return getFileType(conf, p) == FileType.SEQUENCE_FILE;
+    return org.apache.sqoop.mapreduce.ExportJobBase.isSequenceFiles(conf, p);
   }
 
-  /**
-   * @return the type of the file represented by p (or the files in p, if a
-   * directory)
-   */
   public static FileType getFileType(Configuration conf, Path p)
       throws IOException {
-    FileSystem fs = p.getFileSystem(conf);
-
-    try {
-      FileStatus stat = fs.getFileStatus(p);
-
-      if (null == stat) {
-        // Couldn't get the item.
-        LOG.warn("Input path " + p + " does not exist");
-        return FileType.UNKNOWN;
-      }
-
-      if (stat.isDir()) {
-        FileStatus [] subitems = fs.listStatus(p);
-        if (subitems == null || subitems.length == 0) {
-          LOG.warn("Input path " + p + " contains no files");
-          return FileType.UNKNOWN; // empty dir.
-        }
-
-        // Pick a child entry to examine instead.
-        boolean foundChild = false;
-        for (int i = 0; i < subitems.length; i++) {
-          stat = subitems[i];
-          if (!stat.isDir() && !stat.getPath().getName().startsWith("_")) {
-            foundChild = true;
-            break; // This item is a visible file. Check it.
-          }
-        }
-
-        if (!foundChild) {
-          stat = null; // Couldn't find a reasonable candidate.
-        }
-      }
-
-      if (null == stat) {
-        LOG.warn("null FileStatus object in isSequenceFiles(); "
-            + "assuming false.");
-        return FileType.UNKNOWN;
-      }
-
-      Path target = stat.getPath();
-      return fromMagicNumber(target, conf);
-    } catch (FileNotFoundException fnfe) {
-      LOG.warn("Input path " + p + " does not exist");
-      return FileType.UNKNOWN; // doesn't exist!
-    }
-  }
-
-  /**
-   * @param file a file to test.
-   * @return true if 'file' refers to a SequenceFile.
-   */
-  private static FileType fromMagicNumber(Path file, Configuration conf) {
-    // Test target's header to see if it contains magic numbers indicating its
-    // file type
-    byte [] header = new byte[3];
-    FSDataInputStream is = null;
-    try {
-      FileSystem fs = file.getFileSystem(conf);
-      is = fs.open(file);
-      is.readFully(header);
-    } catch (IOException ioe) {
-      // Error reading header or EOF; assume unknown
-      LOG.warn("IOException checking input file header: " + ioe);
-      return FileType.UNKNOWN;
-    } finally {
-      try {
-        if (null != is) {
-          is.close();
-        }
-      } catch (IOException ioe) {
-        // ignore; closing.
-        LOG.warn("IOException closing input stream: " + ioe + "; ignoring.");
-      }
-    }
-
-    if (header[0] == 'S' && header[1] == 'E' && header[2] == 'Q') {
-      return FileType.SEQUENCE_FILE;
-    }
-    if (header[0] == 'O' && header[1] == 'b' && header[2] == 'j') {
-      return FileType.AVRO_DATA_FILE;
-    }
-    return FileType.UNKNOWN;
-  }
-
-  /**
-   * @return the Path to the files we are going to export to the db.
-   */
-  protected Path getInputPath() throws IOException {
-    Path inputPath = new Path(context.getOptions().getExportDir());
-    Configuration conf = options.getConf();
-    inputPath = inputPath.makeQualified(FileSystem.get(conf));
-    return inputPath;
-  }
-
-  @Override
-  protected void configureInputFormat(Job job, String tableName,
-      String tableClassName, String splitByCol)
-      throws ClassNotFoundException, IOException {
-
-    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
-    FileInputFormat.addInputPath(job, getInputPath());
-  }
-
-  @Override
-  protected Class<? extends InputFormat> getInputFormatClass()
-      throws ClassNotFoundException {
-    Class<? extends InputFormat> configuredIF = super.getInputFormatClass();
-    if (null == configuredIF) {
-      return ExportInputFormat.class;
-    } else {
-      return configuredIF;
-    }
+    return org.apache.sqoop.mapreduce.ExportJobBase.getFileType(conf, p);
   }
 
-  @Override
-  protected Class<? extends OutputFormat> getOutputFormatClass()
-      throws ClassNotFoundException {
-    Class<? extends OutputFormat> configuredOF = super.getOutputFormatClass();
-    if (null == configuredOF) {
-      if (!options.isBatchMode()) {
-        return ExportOutputFormat.class;
-      } else {
-        return ExportBatchOutputFormat.class;
-      }
-    } else {
-      return configuredOF;
-    }
-  }
-
-  @Override
-  protected void configureMapper(Job job, String tableName,
-      String tableClassName) throws ClassNotFoundException, IOException {
-
-    job.setMapperClass(getMapperClass());
-
-    // Concurrent writes of the same records would be problematic.
-    ConfigurationHelper.setJobMapSpeculativeExecution(job, false);
-
-    job.setMapOutputKeyClass(SqoopRecord.class);
-    job.setMapOutputValueClass(NullWritable.class);
-  }
-
-  @Override
-  protected int configureNumTasks(Job job) throws IOException {
-    int numMaps = super.configureNumTasks(job);
-    job.getConfiguration().setInt(EXPORT_MAP_TASKS_KEY, numMaps);
-    return numMaps;
-  }
-
-  @Override
-  protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
-      InterruptedException {
-
-    PerfCounters perfCounters = new PerfCounters();
-    perfCounters.startClock();
-
-    boolean success = job.waitForCompletion(true);
-    perfCounters.stopClock();
-
-    Counters jobCounters = job.getCounters();
-    // If the job has been retired, these may be unavailable.
-    if (null == jobCounters) {
-      displayRetiredJobNotice(LOG);
-    } else {
-      perfCounters.addBytes(jobCounters.getGroup("FileSystemCounters")
-        .findCounter("HDFS_BYTES_READ").getValue());
-      LOG.info("Transferred " + perfCounters.toString());
-      long numRecords =  ConfigurationHelper.getNumMapInputRecords(job);
-      LOG.info("Exported " + numRecords + " records.");
-    }
-
-    return success;
-  }
-
-  /**
-   * Run an export job to dump a table from HDFS to a database. If a staging
-   * table is specified and the connection manager supports staging of data,
-   * the export will first populate the staging table and then migrate the
-   * data to the target table.
-   * @throws IOException if the export job encounters an IO error
-   * @throws ExportException if the job fails unexpectedly or is misconfigured.
-   */
-  public void runExport() throws ExportException, IOException {
-
-    ConnManager cmgr = context.getConnManager();
-    SqoopOptions options = context.getOptions();
-    Configuration conf = options.getConf();
-
-    String outputTableName = context.getTableName();
-    String stagingTableName = context.getOptions().getStagingTableName();
-
-    String tableName = outputTableName;
-    boolean stagingEnabled = false;
-    if (stagingTableName != null) { // user has specified the staging table
-      if (cmgr.supportsStagingForExport()) {
-        LOG.info("Data will be staged in the table: " + stagingTableName);
-        tableName = stagingTableName;
-        stagingEnabled = true;
-      } else {
-        throw new ExportException("The active connection manager ("
-            + cmgr.getClass().getCanonicalName()
-            + ") does not support staging of data for export. "
-            + "Please retry without specifying the --staging-table option.");
-      }
-    }
-
-    String tableClassName =
-        new TableClassName(options).getClassForTable(outputTableName);
-    String ormJarFile = context.getJarFile();
-
-    LOG.info("Beginning export of " + outputTableName);
-    loadJars(conf, ormJarFile, tableClassName);
-
-    if (stagingEnabled) {
-      // Prepare the staging table
-      if (options.doClearStagingTable()) {
-        try {
-          // Delete all records from staging table
-          cmgr.deleteAllRecords(stagingTableName);
-        } catch (SQLException ex) {
-          throw new ExportException(
-              "Failed to empty staging table before export run", ex);
-        }
-      } else {
-        // User has not explicitly specified the clear staging table option.
-        // Assert that the staging table is empty.
-        try {
-          long rowCount = cmgr.getTableRowCount(stagingTableName);
-          if (rowCount != 0L) {
-            throw new ExportException("The specified staging table ("
-                + stagingTableName + ") is not empty. To force deletion of "
-                + "its data, please retry with --clear-staging-table option.");
-          }
-        } catch (SQLException ex) {
-          throw new ExportException(
-              "Failed to count data rows in staging table: "
-                  + stagingTableName, ex);
-        }
-      }
-    }
-
-    try {
-      Job job = new Job(conf);
-
-      // Set the external jar to use for the job.
-      job.getConfiguration().set("mapred.jar", ormJarFile);
-
-      configureInputFormat(job, tableName, tableClassName, null);
-      configureOutputFormat(job, tableName, tableClassName);
-      configureMapper(job, tableName, tableClassName);
-      configureNumTasks(job);
-      cacheJars(job, context.getConnManager());
-      setJob(job);
-      boolean success = runJob(job);
-      if (!success) {
-        throw new ExportException("Export job failed!");
-      }
-    } catch (InterruptedException ie) {
-      throw new IOException(ie);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException(cnfe);
-    } finally {
-      unloadJars();
-    }
-
-    // Unstage the data if needed
-    if (stagingEnabled) {
-      // Migrate data from staging table to the output table
-      try {
-        LOG.info("Starting to migrate data from staging table to destination.");
-        cmgr.migrateData(stagingTableName, outputTableName);
-      } catch (SQLException ex) {
-        LOG.error("Failed to move data from staging table ("
-            + stagingTableName + ") to target table ("
-            + outputTableName + ")", ex);
-        throw new ExportException(
-            "Failed to move data from staging table", ex);
-      }
-    }
-  }
-
-  /**
-   * @return true if the input directory contains SequenceFiles.
-   * @deprecated use {@link #getInputFileType()} instead
-   */
-  @Deprecated
-  protected boolean inputIsSequenceFiles() {
-    try {
-      return isSequenceFiles(
-          context.getOptions().getConf(), getInputPath());
-    } catch (IOException ioe) {
-      LOG.warn("Could not check file format for export; assuming text");
-      return false;
-    }
-  }
-
-  protected FileType getInputFileType() {
-    try {
-      return getFileType(context.getOptions().getConf(), getInputPath());
-    } catch (IOException ioe) {
-      return FileType.UNKNOWN;
-    }
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,191 +18,11 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-
 import com.cloudera.sqoop.lib.SqoopRecord;
 
 /**
- * Insert the emitted keys as records into a database table.
- * This supports a configurable "spill threshold" at which
- * point intermediate transactions are committed.
- *
- * Record objects are buffered before actually performing the INSERT
- * statements; this requires that the key implement the
- * SqoopRecord interface.
- *
- * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class ExportOutputFormat<K extends SqoopRecord, V>
-    extends AsyncSqlOutputFormat<K, V> {
-
-  private static final Log LOG = LogFactory.getLog(ExportOutputFormat.class);
-
-  @Override
-  /** {@inheritDoc} */
-  public void checkOutputSpecs(JobContext context)
-      throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-    DBConfiguration dbConf = new DBConfiguration(conf);
-
-    // Sanity check all the configuration values we need.
-    if (null == conf.get(DBConfiguration.URL_PROPERTY)) {
-      throw new IOException("Database connection URL is not set.");
-    } else if (null == dbConf.getOutputTableName()) {
-      throw new IOException("Table name is not set for export");
-    } else if (null == dbConf.getOutputFieldNames()
-        && 0 == dbConf.getOutputFieldCount()) {
-      throw new IOException(
-          "Output field names are null and zero output field count set.");
-    }
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-      throws IOException {
-    try {
-      return new ExportRecordWriter(context);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * RecordWriter to write the output to a row in a database table.
-   * The actual database updates are executed in a second thread.
-   */
-  public class ExportRecordWriter extends AsyncSqlRecordWriter<K, V> {
-
-    protected String tableName;
-    protected String [] columnNames; // The columns to insert into.
-    protected int columnCount; // If columnNames is null, tells ## of cols.
-
-    public ExportRecordWriter(TaskAttemptContext context)
-        throws ClassNotFoundException, SQLException {
-      super(context);
-
-      Configuration conf = getConf();
-
-      DBConfiguration dbConf = new DBConfiguration(conf);
-      tableName = dbConf.getOutputTableName();
-      columnNames = dbConf.getOutputFieldNames();
-      columnCount = dbConf.getOutputFieldCount();
-    }
-
-    /**
-     * @return the name of the table we are inserting into.
-     */
-    protected final String getTableName() {
-      return tableName;
-    }
-
-    /**
-     * @return the list of columns we are updating.
-     */
-    protected final String [] getColumnNames() {
-      if (null == columnNames) {
-        return null;
-      } else {
-        return Arrays.copyOf(columnNames, columnNames.length);
-      }
-    }
-
-    /**
-     * @return the number of columns we are updating.
-     */
-    protected final int getColumnCount() {
-      return columnCount;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    protected PreparedStatement getPreparedStatement(
-        List<SqoopRecord> userRecords) throws SQLException {
-
-      PreparedStatement stmt = null;
-
-      // Synchronize on connection to ensure this does not conflict
-      // with the operations in the update thread.
-      Connection conn = getConnection();
-      synchronized (conn) {
-        stmt = conn.prepareStatement(getInsertStatement(userRecords.size()));
-      }
-
-      // Inject the record parameters into the VALUES clauses.
-      int position = 0;
-      for (SqoopRecord record : userRecords) {
-        position += record.write(stmt, position);
-      }
-
-      return stmt;
-    }
-
-    /**
-     * @return an INSERT statement suitable for inserting 'numRows' rows.
-     */
-    protected String getInsertStatement(int numRows) {
-      StringBuilder sb = new StringBuilder();
-
-      sb.append("INSERT INTO " + tableName + " ");
-
-      int numSlots;
-      if (this.columnNames != null) {
-        numSlots = this.columnNames.length;
-
-        sb.append("(");
-        boolean first = true;
-        for (String col : columnNames) {
-          if (!first) {
-            sb.append(", ");
-          }
-
-          sb.append(col);
-          first = false;
-        }
-
-        sb.append(") ");
-      } else {
-        numSlots = this.columnCount; // set if columnNames is null.
-      }
-
-      sb.append("VALUES ");
-
-      // generates the (?, ?, ?...) used for each row.
-      StringBuilder sbRow = new StringBuilder();
-      sbRow.append("(");
-      for (int i = 0; i < numSlots; i++) {
-        if (i != 0) {
-          sbRow.append(", ");
-        }
-
-        sbRow.append("?");
-      }
-      sbRow.append(")");
-
-      // Now append that numRows times.
-      for (int i = 0; i < numRows; i++) {
-        if (i != 0) {
-          sb.append(", ");
-        }
-
-        sb.append(sbRow);
-      }
-
-      return sb.toString();
-    }
-  }
+    extends org.apache.sqoop.mapreduce.ExportOutputFormat<K, V> {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/HBaseImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/HBaseImportJob.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/HBaseImportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/HBaseImportJob.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,163 +18,24 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.hbase.HBasePutProcessor;
-import com.cloudera.sqoop.lib.FieldMapProcessor;
-import com.cloudera.sqoop.lib.SqoopRecord;
-import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ImportJobContext;
-import com.cloudera.sqoop.util.ImportException;
 
 /**
- * Runs an HBase import via DataDrivenDBInputFormat to the HBasePutProcessor
- * in the DelegatingOutputFormat.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class HBaseImportJob extends DataDrivenImportJob {
+public class HBaseImportJob
+    extends org.apache.sqoop.mapreduce.HBaseImportJob {
 
   public static final Log LOG = LogFactory.getLog(
       HBaseImportJob.class.getName());
 
   public HBaseImportJob(final SqoopOptions opts,
       final ImportJobContext importContext) {
-    super(opts, importContext.getInputFormat(), importContext);
-  }
-
-  @Override
-  protected void configureMapper(Job job, String tableName,
-      String tableClassName) throws IOException {
-    job.setOutputKeyClass(SqoopRecord.class);
-    job.setOutputValueClass(NullWritable.class);
-    job.setMapperClass(getMapperClass());
-  }
-
-  @Override
-  protected Class<? extends Mapper> getMapperClass() {
-    return HBaseImportMapper.class;
+    super(opts, importContext);
   }
 
-  @Override
-  protected Class<? extends OutputFormat> getOutputFormatClass()
-      throws ClassNotFoundException {
-    return DelegatingOutputFormat.class;
-  }
-
-  @Override
-  protected void configureOutputFormat(Job job, String tableName,
-      String tableClassName) throws ClassNotFoundException, IOException {
-
-    // Use the DelegatingOutputFormat with the HBasePutProcessor.
-    job.setOutputFormatClass(getOutputFormatClass());
-
-    Configuration conf = job.getConfiguration();
-    conf.setClass("sqoop.output.delegate.field.map.processor.class",
-        HBasePutProcessor.class,
-        FieldMapProcessor.class);
-
-    // Set the HBase parameters (table, column family, row key):
-    conf.set(HBasePutProcessor.TABLE_NAME_KEY, options.getHBaseTable());
-    conf.set(HBasePutProcessor.COL_FAMILY_KEY, options.getHBaseColFamily());
-
-    // What column of the input becomes the row key?
-    String rowKeyCol = options.getHBaseRowKeyColumn();
-    if (null == rowKeyCol) {
-      // User didn't explicitly set one. If there's a split-by column set,
-      // use that.
-      rowKeyCol = options.getSplitByCol();
-    }
-
-    if (null == rowKeyCol) {
-      // No split-by column is explicitly set.
-      // If the table has a primary key, use that.
-      ConnManager manager = getContext().getConnManager();
-      rowKeyCol = manager.getPrimaryKey(tableName);
-    }
-
-    if (null == rowKeyCol) {
-      // Give up here if this is still unset.
-      throw new IOException("Could not determine the row-key column. "
-          + "Use --hbase-row-key to specify the input column that "
-          + "names each row.");
-    }
-
-    conf.set(HBasePutProcessor.ROW_KEY_COLUMN_KEY, rowKeyCol);
-  }
-
-  @Override
-  /** Create the target HBase table before running the job. */
-  protected void jobSetup(Job job) throws IOException, ImportException {
-    Configuration conf = job.getConfiguration();
-    String tableName = conf.get(HBasePutProcessor.TABLE_NAME_KEY);
-    String familyName = conf.get(HBasePutProcessor.COL_FAMILY_KEY);
-
-    if (null == tableName) {
-      throw new ImportException(
-          "Import to HBase error: Table name not specified");
-    }
-
-    if (null == familyName) {
-      throw new ImportException(
-          "Import to HBase error: Column family not specified");
-    }
-
-    // Add HBase configuration files to this conf object.
-    HBaseConfiguration.addHbaseResources(conf);
-
-    HBaseAdmin admin = new HBaseAdmin(conf);
-
-    // Check to see if the table exists.
-    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
-    byte [] familyBytes = Bytes.toBytes(familyName);
-    HColumnDescriptor colDesc = new HColumnDescriptor(familyBytes);
-    if (!admin.tableExists(tableName)) {
-      if (options.getCreateHBaseTable()) {
-        // Create the table.
-        LOG.info("Creating missing HBase table " + tableName);
-        tableDesc.addFamily(colDesc);
-        admin.createTable(tableDesc);
-      } else {
-        LOG.warn("Could not find HBase table " + tableName);
-        LOG.warn("This job may fail. Either explicitly create the table,");
-        LOG.warn("or re-run with --hbase-create-table.");
-      }
-    } else if (!tableDesc.hasFamily(familyBytes)) {
-      if (options.getCreateHBaseTable()) {
-        // Create the column family.
-        LOG.info("Creating missing column family " + familyName);
-        admin.disableTable(tableName);
-        admin.addColumn(tableName, colDesc);
-        admin.enableTable(tableName);
-      } else {
-        LOG.warn("Could not find column family " + familyName + " in table "
-            + tableName);
-        LOG.warn("This job may fail. Either create the column family,");
-        LOG.warn("or re-run with --hbase-create-table.");
-      }
-    }
-
-    // Make sure HBase libraries are shipped as part of the job.
-    TableMapReduceUtil.addDependencyJars(job);
-    TableMapReduceUtil.addDependencyJars(conf, HTable.class);
-
-    super.jobSetup(job);
-  }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/HBaseImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/HBaseImportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/HBaseImportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/HBaseImportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,24 +18,10 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
 /**
- * Imports records by writing them to HBase via the DelegatingOutputFormat
- * and the HBasePutProcessor.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class HBaseImportMapper
-    extends AutoProgressMapper<LongWritable, SqoopRecord, SqoopRecord,
-    NullWritable> {
-
-  @Override
-  public void map(LongWritable key, SqoopRecord val, Context context)
-      throws IOException, InterruptedException {
-    context.write(val, NullWritable.get());
-  }
+    extends org.apache.sqoop.mapreduce.HBaseImportMapper {
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,49 +18,24 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.orm.TableClassName;
-import com.cloudera.sqoop.util.ImportException;
-import com.cloudera.sqoop.util.PerfCounters;
-import com.cloudera.sqoop.config.ConfigurationHelper;
-import com.cloudera.sqoop.io.CodecMap;
 import com.cloudera.sqoop.manager.ImportJobContext;
 
 /**
- * Base class for running an import MapReduce job.
- * Allows dependency injection, etc, for easy customization of import job types.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class ImportJobBase extends JobBase {
-
-  private ImportJobContext context;
-
-  public static final Log LOG = LogFactory.getLog(
-      ImportJobBase.class.getName());
+public class ImportJobBase
+    extends org.apache.sqoop.mapreduce.ImportJobBase {
 
   public ImportJobBase() {
-    this(null);
+    super();
   }
 
   public ImportJobBase(final SqoopOptions opts) {
-    this(opts, null, null, null, null);
+    super(opts);
   }
 
   public ImportJobBase(final SqoopOptions opts,
@@ -70,138 +43,7 @@ public class ImportJobBase extends JobBa
       final Class<? extends InputFormat> inputFormatClass,
       final Class<? extends OutputFormat> outputFormatClass,
       final ImportJobContext context) {
-    super(opts, mapperClass, inputFormatClass, outputFormatClass);
-    this.context = context;
-  }
-
-  /**
-   * Configure the output format to use for the job.
-   */
-  @Override
-  protected void configureOutputFormat(Job job, String tableName,
-      String tableClassName) throws ClassNotFoundException, IOException {
-
-    job.setOutputFormatClass(getOutputFormatClass());
-
-    if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
-      job.getConfiguration().set("mapred.output.value.class", tableClassName);
-    }
-
-    if (options.shouldUseCompression()) {
-      FileOutputFormat.setCompressOutput(job, true);
-
-      String codecName = options.getCompressionCodec();
-      Class<? extends CompressionCodec> codecClass;
-      if (codecName == null) {
-        codecClass = GzipCodec.class;
-      } else {
-        Configuration conf = job.getConfiguration();
-        codecClass = CodecMap.getCodec(codecName, conf).getClass();
-      }
-      FileOutputFormat.setOutputCompressorClass(job, codecClass);
-
-      if (options.getFileLayout() == SqoopOptions.FileLayout.SequenceFile) {
-        SequenceFileOutputFormat.setOutputCompressionType(job,
-            CompressionType.BLOCK);
-      }
-    }
-
-    Path outputPath = context.getDestination();
-    FileOutputFormat.setOutputPath(job, outputPath);
+    super(opts, mapperClass, inputFormatClass, outputFormatClass, context);
   }
 
-  /**
-   * Actually run the MapReduce job.
-   */
-  @Override
-  protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
-      InterruptedException {
-
-    PerfCounters perfCounters = new PerfCounters();
-    perfCounters.startClock();
-
-    boolean success = job.waitForCompletion(true);
-    perfCounters.stopClock();
-
-    Counters jobCounters = job.getCounters();
-    // If the job has been retired, these may be unavailable.
-    if (null == jobCounters) {
-      displayRetiredJobNotice(LOG);
-    } else {
-      perfCounters.addBytes(jobCounters.getGroup("FileSystemCounters")
-        .findCounter("HDFS_BYTES_WRITTEN").getValue());
-      LOG.info("Transferred " + perfCounters.toString());
-      long numRecords = ConfigurationHelper.getNumMapOutputRecords(job);
-      LOG.info("Retrieved " + numRecords + " records.");
-    }
-    return success;
-  }
-
-
-  /**
-   * Run an import job to read a table in to HDFS.
-   *
-   * @param tableName  the database table to read; may be null if a free-form
-   * query is specified in the SqoopOptions, and the ImportJobBase subclass
-   * supports free-form queries.
-   * @param ormJarFile the Jar file to insert into the dcache classpath.
-   * (may be null)
-   * @param splitByCol the column of the database table to use to split
-   * the import
-   * @param conf A fresh Hadoop Configuration to use to build an MR job.
-   * @throws IOException if the job encountered an IO problem
-   * @throws ImportException if the job failed unexpectedly or was
-   * misconfigured.
-   */
-  public void runImport(String tableName, String ormJarFile, String splitByCol,
-      Configuration conf) throws IOException, ImportException {
-
-    if (null != tableName) {
-      LOG.info("Beginning import of " + tableName);
-    } else {
-      LOG.info("Beginning query import.");
-    }
-
-    String tableClassName =
-        new TableClassName(options).getClassForTable(tableName);
-    loadJars(conf, ormJarFile, tableClassName);
-
-    try {
-      Job job = new Job(conf);
-
-      // Set the external jar to use for the job.
-      job.getConfiguration().set("mapred.jar", ormJarFile);
-
-      configureInputFormat(job, tableName, tableClassName, splitByCol);
-      configureOutputFormat(job, tableName, tableClassName);
-      configureMapper(job, tableName, tableClassName);
-      configureNumTasks(job);
-      cacheJars(job, getContext().getConnManager());
-
-      jobSetup(job);
-      setJob(job);
-      boolean success = runJob(job);
-      if (!success) {
-        throw new ImportException("Import job failed!");
-      }
-    } catch (InterruptedException ie) {
-      throw new IOException(ie);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException(cnfe);
-    } finally {
-      unloadJars();
-    }
-  }
-
-  /**
-   * Open-ended "setup" routine that is called after the job is configured
-   * but just before it is submitted to MapReduce. Subclasses may override
-   * if necessary.
-   */
-  protected void jobSetup(Job job) throws IOException, ImportException {
-  }
-
-  protected ImportJobContext getContext() {
-    return context;
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,33 +18,16 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DefaultStringifier;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-
-import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
 
 /**
- * Run an export using JDBC (JDBC-based ExportOutputFormat).
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class JdbcExportJob extends ExportJobBase {
-
-  private FileType fileType;
-
-  public static final Log LOG = LogFactory.getLog(
-      JdbcExportJob.class.getName());
+public class JdbcExportJob
+    extends org.apache.sqoop.mapreduce.JdbcExportJob {
 
   public JdbcExportJob(final ExportJobContext context) {
     super(context);
@@ -59,84 +40,5 @@ public class JdbcExportJob extends Expor
     super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
   }
 
-  @Override
-  protected void configureInputFormat(Job job, String tableName,
-      String tableClassName, String splitByCol)
-      throws ClassNotFoundException, IOException {
-
-    fileType = getInputFileType();
-
-    super.configureInputFormat(job, tableName, tableClassName, splitByCol);
-
-    if (fileType == FileType.AVRO_DATA_FILE) {
-      LOG.debug("Configuring for Avro export");
-      ConnManager connManager = context.getConnManager();
-      Map<String, Integer> columnTypeInts =
-        connManager.getColumnTypes(tableName, options.getSqlQuery());
-      MapWritable columnTypes = new MapWritable();
-      for (Map.Entry<String, Integer> e : columnTypeInts.entrySet()) {
-        Text columnName = new Text(e.getKey());
-        Text columnText = new Text(connManager.toJavaType(e.getValue()));
-        columnTypes.put(columnName, columnText);
-      }
-      DefaultStringifier.store(job.getConfiguration(), columnTypes,
-          AvroExportMapper.AVRO_COLUMN_TYPES_MAP);
-    }
-
-  }
-
-  @Override
-  protected Class<? extends InputFormat> getInputFormatClass()
-      throws ClassNotFoundException {
-    if (fileType == FileType.AVRO_DATA_FILE) {
-      return AvroInputFormat.class;
-    }
-    return super.getInputFormatClass();
-  }
-
-  @Override
-  protected Class<? extends Mapper> getMapperClass() {
-    switch (fileType) {
-      case SEQUENCE_FILE:
-        return SequenceFileExportMapper.class;
-      case AVRO_DATA_FILE:
-        return AvroExportMapper.class;
-      case UNKNOWN:
-      default:
-        return TextExportMapper.class;
-    }
-  }
-
-  @Override
-  protected void configureOutputFormat(Job job, String tableName,
-      String tableClassName) throws IOException {
-
-    ConnManager mgr = context.getConnManager();
-    try {
-      String username = options.getUsername();
-      if (null == username || username.length() == 0) {
-        DBConfiguration.configureDB(job.getConfiguration(),
-            mgr.getDriverClass(),
-            options.getConnectString());
-      } else {
-        DBConfiguration.configureDB(job.getConfiguration(),
-            mgr.getDriverClass(),
-            options.getConnectString(),
-            username, options.getPassword());
-      }
-
-      String [] colNames = options.getColumns();
-      if (null == colNames) {
-        colNames = mgr.getColumnNames(tableName);
-      }
-      DBOutputFormat.setOutput(job, tableName, colNames);
-
-      job.setOutputFormatClass(getOutputFormatClass());
-      job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException("Could not load OutputFormat", cnfe);
-    }
-  }
-
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,43 +19,20 @@
 package com.cloudera.sqoop.mapreduce;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-
-import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
 
 /**
- * Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class JdbcUpdateExportJob extends ExportJobBase {
-
-  public static final Log LOG = LogFactory.getLog(
-      JdbcUpdateExportJob.class.getName());
-
-  /**
-   * Return an instance of the UpdateOutputFormat class object loaded
-   * from the shim jar.
-   */
-  private static Class<? extends OutputFormat> getUpdateOutputFormat()
-       throws IOException {
-    return UpdateOutputFormat.class;
-  }
+public class JdbcUpdateExportJob
+    extends org.apache.sqoop.mapreduce.JdbcUpdateExportJob {
 
   public JdbcUpdateExportJob(final ExportJobContext context)
       throws IOException {
-    super(context, null, null, getUpdateOutputFormat());
+    super(context);
   }
 
   public JdbcUpdateExportJob(final ExportJobContext ctxt,
@@ -67,82 +42,5 @@ public class JdbcUpdateExportJob extends
     super(ctxt, mapperClass, inputFormatClass, outputFormatClass);
   }
 
-  @Override
-  protected Class<? extends Mapper> getMapperClass() {
-    if (inputIsSequenceFiles()) {
-      return SequenceFileExportMapper.class;
-    } else {
-      return TextExportMapper.class;
-    }
-  }
-
-  @Override
-  protected void configureOutputFormat(Job job, String tableName,
-      String tableClassName) throws IOException {
-
-    ConnManager mgr = context.getConnManager();
-    try {
-      String username = options.getUsername();
-      if (null == username || username.length() == 0) {
-        DBConfiguration.configureDB(job.getConfiguration(),
-            mgr.getDriverClass(),
-            options.getConnectString());
-      } else {
-        DBConfiguration.configureDB(job.getConfiguration(),
-            mgr.getDriverClass(),
-            options.getConnectString(),
-            username, options.getPassword());
-      }
-
-      String [] colNames = options.getColumns();
-      if (null == colNames) {
-        colNames = mgr.getColumnNames(tableName);
-      }
-
-      if (null == colNames) {
-        throw new IOException(
-            "Export column names could not be determined for " + tableName);
-      }
-
-      String updateKeyColumns = options.getUpdateKeyCol();
-      if (null == updateKeyColumns) {
-        throw new IOException("Update key column not set in export job");
-      }
-      // Update key columns lookup and removal
-      Set<String> updateKeys = new LinkedHashSet<String>();
-      Set<String> updateKeysUppercase = new HashSet<String>();
-      StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
-      while (stok.hasMoreTokens()) {
-        String nextUpdateKey = stok.nextToken().trim();
-        if (nextUpdateKey.length() > 0) {
-          updateKeys.add(nextUpdateKey);
-          updateKeysUppercase.add(nextUpdateKey.toUpperCase());
-        }  else {
-          throw new RuntimeException("Invalid update key column value specified"
-              + ": '" + updateKeyColumns + "'");
-        }
-      }
-
-      if (updateKeys.size() == 0) {
-        throw new IOException("Unpdate key columns not valid in export job");
-      }
-
-      // Make sure we strip out the key column from this list.
-      String [] outColNames = new String[colNames.length - updateKeys.size()];
-      int j = 0;
-      for (int i = 0; i < colNames.length; i++) {
-        if (!updateKeysUppercase.contains(colNames[i].toUpperCase())) {
-          outColNames[j++] = colNames[i];
-        }
-      }
-      DBOutputFormat.setOutput(job, tableName, outColNames);
-
-      job.setOutputFormatClass(getOutputFormatClass());
-      job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
-      job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyColumns);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException("Could not load OutputFormat", cnfe);
-    }
-  }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,89 +19,20 @@
 package com.cloudera.sqoop.mapreduce;
 
 import java.io.IOException;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
-
-import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
 
 /**
- * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class JdbcUpsertExportJob extends JdbcUpdateExportJob {
-
-  public static final Log LOG = LogFactory.getLog(
-      JdbcUpsertExportJob.class.getName());
+public class JdbcUpsertExportJob
+    extends org.apache.sqoop.mapreduce.JdbcUpsertExportJob {
 
   public JdbcUpsertExportJob(final ExportJobContext context,
       final Class<? extends OutputFormat> outputFormatClass)
       throws IOException {
-    super(context, null, null, outputFormatClass);
+    super(context, outputFormatClass);
   }
 
-  @Override
-  protected void configureOutputFormat(Job job, String tableName,
-      String tableClassName) throws IOException {
-
-    ConnManager mgr = context.getConnManager();
-    try {
-      String username = options.getUsername();
-      if (null == username || username.length() == 0) {
-        DBConfiguration.configureDB(job.getConfiguration(),
-            mgr.getDriverClass(),
-            options.getConnectString());
-      } else {
-        DBConfiguration.configureDB(job.getConfiguration(),
-            mgr.getDriverClass(),
-            options.getConnectString(),
-            username, options.getPassword());
-      }
-
-      String [] colNames = options.getColumns();
-      if (null == colNames) {
-        colNames = mgr.getColumnNames(tableName);
-      }
-      if (null == colNames) {
-        throw new IOException(
-            "Export column names could not be determined for " + tableName);
-      }
-      DBOutputFormat.setOutput(job, tableName, colNames);
-
-      String updateKeyColumns = options.getUpdateKeyCol();
-      if (null == updateKeyColumns) {
-        throw new IOException("Update key column not set in export job");
-      }
-      // Update key columns lookup and removal
-      Set<String> updateKeys = new LinkedHashSet<String>();
-      StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
-      while (stok.hasMoreTokens()) {
-        String nextUpdateKey = stok.nextToken().trim();
-        if (nextUpdateKey.length() > 0) {
-          updateKeys.add(nextUpdateKey);
-        } else {
-          throw new RuntimeException("Invalid update key column value specified"
-              + ": '" + updateKeyColumns + "'");
-        }
-      }
-
-      if (updateKeys.size() == 0) {
-        throw new IOException("Unpdate key columns not valid in export job");
-      }
-
-      job.setOutputFormatClass(getOutputFormatClass());
-      job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
-      job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyColumns);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException("Could not load OutputFormat", cnfe);
-    }
-  }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JobBase.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JobBase.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JobBase.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,316 +18,30 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.File;
-import java.io.IOException;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-
-import org.apache.hadoop.util.StringUtils;
-
 import com.cloudera.sqoop.SqoopOptions;
 
-import com.cloudera.sqoop.config.ConfigurationHelper;
-import com.cloudera.sqoop.manager.ConnManager;
-
-import com.cloudera.sqoop.tool.SqoopTool;
-import com.cloudera.sqoop.util.ClassLoaderStack;
-import com.cloudera.sqoop.util.Jars;
-
 /**
- * Base class for configuring and running a MapReduce job.
- * Allows dependency injection, etc, for easy customization of import job types.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class JobBase {
-
-  public static final Log LOG = LogFactory.getLog(JobBase.class.getName());
-
-  protected SqoopOptions options;
-  protected Class<? extends Mapper> mapperClass;
-  protected Class<? extends InputFormat> inputFormatClass;
-  protected Class<? extends OutputFormat> outputFormatClass;
-
-  private Job mrJob;
-
-  private ClassLoader prevClassLoader = null;
+public class JobBase
+    extends org.apache.sqoop.mapreduce.JobBase {
 
   public JobBase() {
-    this(null);
+    super();
   }
 
   public JobBase(final SqoopOptions opts) {
-    this(opts, null, null, null);
+    super(opts);
   }
 
   public JobBase(final SqoopOptions opts,
       final Class<? extends Mapper> mapperClass,
       final Class<? extends InputFormat> inputFormatClass,
       final Class<? extends OutputFormat> outputFormatClass) {
-
-    this.options = opts;
-    this.mapperClass = mapperClass;
-    this.inputFormatClass = inputFormatClass;
-    this.outputFormatClass = outputFormatClass;
-  }
-
-  /**
-   * @return the mapper class to use for the job.
-   */
-  protected Class<? extends Mapper> getMapperClass()
-      throws ClassNotFoundException {
-    return this.mapperClass;
+    super(opts, mapperClass, inputFormatClass, outputFormatClass);
   }
 
-  /**
-   * @return the inputformat class to use for the job.
-   */
-  protected Class<? extends InputFormat> getInputFormatClass()
-      throws ClassNotFoundException {
-    return this.inputFormatClass;
-  }
-
-  /**
-   * @return the outputformat class to use for the job.
-   */
-  protected Class<? extends OutputFormat> getOutputFormatClass()
-      throws ClassNotFoundException {
-    return this.outputFormatClass;
-  }
-
-  /** Set the OutputFormat class to use for this job. */
-  public void setOutputFormatClass(Class<? extends OutputFormat> cls) {
-    this.outputFormatClass = cls;
-  }
-
-  /** Set the InputFormat class to use for this job. */
-  public void setInputFormatClass(Class<? extends InputFormat> cls) {
-    this.inputFormatClass = cls;
-  }
-
-  /** Set the Mapper class to use for this job. */
-  public void setMapperClass(Class<? extends Mapper> cls) {
-    this.mapperClass = cls;
-  }
-
-  /**
-   * Set the SqoopOptions configuring this job.
-   */
-  public void setOptions(SqoopOptions opts) {
-    this.options = opts;
-  }
-
-  /**
-   * Put jar files required by Sqoop into the DistributedCache.
-   * @param job the Job being submitted.
-   * @param mgr the ConnManager to use.
-   */
-  protected void cacheJars(Job job, ConnManager mgr)
-      throws IOException {
-
-    Configuration conf = job.getConfiguration();
-    FileSystem fs = FileSystem.getLocal(conf);
-    Set<String> localUrls = new HashSet<String>();
-
-    addToCache(Jars.getSqoopJarPath(), fs, localUrls);
-    if (null != mgr) {
-      addToCache(Jars.getDriverClassJar(mgr), fs, localUrls);
-      addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls);
-    }
-
-    SqoopTool tool = this.options.getActiveSqoopTool();
-    if (null != tool) {
-      // Make sure the jar for the tool itself is on the classpath. (In case
-      // this is a third-party plugin tool.)
-      addToCache(Jars.getJarPathForClass(tool.getClass()), fs, localUrls);
-      List<String> toolDeps = tool.getDependencyJars();
-      if (null != toolDeps) {
-        for (String depFile : toolDeps) {
-          addToCache(depFile, fs, localUrls);
-        }
-      }
-    }
-
-    // If the user specified a particular jar file name,
-
-    // Add anything in $SQOOP_HOME/lib, if this is set.
-    String sqoopHome = System.getenv("SQOOP_HOME");
-    if (null != sqoopHome) {
-      File sqoopHomeFile = new File(sqoopHome);
-      File sqoopLibFile = new File(sqoopHomeFile, "lib");
-      if (sqoopLibFile.exists()) {
-        addDirToCache(sqoopLibFile, fs, localUrls);
-      }
-    } else {
-      LOG.warn("SQOOP_HOME is unset. May not be able to find "
-          + "all job dependencies.");
-    }
-
-    // If we didn't put anything in our set, then there's nothing to cache.
-    if (localUrls.isEmpty()) {
-      return;
-    }
-
-    // Add these to the 'tmpjars' array, which the MR JobSubmitter
-    // will upload to HDFS and put in the DistributedCache libjars.
-    String tmpjars = conf.get("tmpjars");
-    StringBuilder sb = new StringBuilder();
-    if (null != tmpjars) {
-      sb.append(tmpjars);
-      sb.append(",");
-    }
-    sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
-    conf.set("tmpjars", sb.toString());
-  }
-
-  private void addToCache(String file, FileSystem fs, Set<String> localUrls) {
-    if (null == file) {
-      return;
-    }
-
-    Path p = new Path(file);
-    String qualified = p.makeQualified(fs).toString();
-    LOG.debug("Adding to job classpath: " + qualified);
-    localUrls.add(qualified);
-  }
-
-  /**
-   * Add the .jar elements of a directory to the DCache classpath,
-   * nonrecursively.
-   */
-  private void addDirToCache(File dir, FileSystem fs, Set<String> localUrls) {
-    if (null == dir) {
-      return;
-    }
-
-    for (File libfile : dir.listFiles()) {
-      if (libfile.exists() && !libfile.isDirectory()
-          && libfile.getName().endsWith("jar")) {
-        addToCache(libfile.toString(), fs, localUrls);
-      }
-    }
-  }
-
-  /**
-   * If jars must be loaded into the local environment, do so here.
-   */
-  protected void loadJars(Configuration conf, String ormJarFile,
-      String tableClassName) throws IOException {
-    boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
-        || "local".equals(conf.get("mapred.job.tracker"));
-    if (isLocal) {
-      // If we're using the LocalJobRunner, then instead of using the compiled
-      // jar file as the job source, we're running in the current thread. Push
-      // on another classloader that loads from that jar in addition to
-      // everything currently on the classpath.
-      this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
-          tableClassName);
-    }
-  }
-
-  /**
-   * If any classloader was invoked by loadJars, free it here.
-   */
-  protected void unloadJars() {
-    if (null != this.prevClassLoader) {
-      // unload the special classloader for this jar.
-      ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
-    }
-  }
-
-  /**
-   * Configure the inputformat to use for the job.
-   */
-  protected void configureInputFormat(Job job, String tableName,
-      String tableClassName, String splitByCol)
-      throws ClassNotFoundException, IOException {
-    //TODO: 'splitByCol' is import-job specific; lift it out of this API.
-    Class<? extends InputFormat> ifClass = getInputFormatClass();
-    LOG.debug("Using InputFormat: " + ifClass);
-    job.setInputFormatClass(ifClass);
-  }
-
-  /**
-   * Configure the output format to use for the job.
-   */
-  protected void configureOutputFormat(Job job, String tableName,
-      String tableClassName) throws ClassNotFoundException, IOException {
-    Class<? extends OutputFormat> ofClass = getOutputFormatClass();
-    LOG.debug("Using OutputFormat: " + ofClass);
-    job.setOutputFormatClass(ofClass);
-  }
-
-  /**
-   * Set the mapper class implementation to use in the job,
-   * as well as any related configuration (e.g., map output types).
-   */
-  protected void configureMapper(Job job, String tableName,
-      String tableClassName) throws ClassNotFoundException, IOException {
-    job.setMapperClass(getMapperClass());
-  }
-
-  /**
-   * Configure the number of map/reduce tasks to use in the job.
-   */
-  protected int configureNumTasks(Job job) throws IOException {
-    int numMapTasks = options.getNumMappers();
-    if (numMapTasks < 1) {
-      numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
-      LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
-    }
-
-    ConfigurationHelper.setJobNumMaps(job, numMapTasks);
-    job.setNumReduceTasks(0);
-    return numMapTasks;
-  }
-
-  /** Set the main job that will be run. */
-  protected void setJob(Job job) {
-    mrJob = job;
-  }
-
-  /**
-   * @return the main MapReduce job that is being run, or null if no
-   * job has started.
-   */
-  public Job getJob() {
-    return mrJob;
-  }
-
-  /**
-   * Actually run the MapReduce job.
-   */
-  protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
-      InterruptedException {
-    return job.waitForCompletion(true);
-  }
-
-  /**
-   * Display a notice on the log that the current MapReduce job has
-   * been retired, and thus Counters are unavailable.
-   * @param log the Log to display the info to.
-   */
-  protected void displayRetiredJobNotice(Log log) {
-    log.info("The MapReduce job has already been retired. Performance");
-    log.info("counters are unavailable. To get this information, ");
-    log.info("you will need to enable the completed job store on ");
-    log.info("the jobtracker with:");
-    log.info("mapreduce.jobtracker.persist.jobstatus.active = true");
-    log.info("mapreduce.jobtracker.persist.jobstatus.hours = 1");
-    log.info("A jobtracker restart is required for these settings");
-    log.info("to take effect.");
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeJob.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeJob.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,128 +18,27 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.util.Jars;
 
 /**
- * Run a MapReduce job that merges two datasets.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class MergeJob extends JobBase {
-
-  /** Configuration key specifying the path to the "old" dataset. */
-  public static final String MERGE_OLD_PATH_KEY = "sqoop.merge.old.path";
+public class MergeJob
+    extends org.apache.sqoop.mapreduce.MergeJob {
 
-  /** Configuration key specifying the path to the "new" dataset. */
-  public static final String MERGE_NEW_PATH_KEY = "sqoop.merge.new.path";
-
-  /** Configuration key specifying the name of the key column for joins. */
-  public static final String MERGE_KEY_COL_KEY = "sqoop.merge.key.col";
-
-  /** Configuration key specifying the SqoopRecord class name for
-   * the records we are merging.
-   */
-  public static final String MERGE_SQOOP_RECORD_KEY = "sqoop.merge.class";
+  public static final String MERGE_OLD_PATH_KEY =
+      org.apache.sqoop.mapreduce.MergeJob.MERGE_OLD_PATH_KEY;
+  public static final String MERGE_NEW_PATH_KEY =
+      org.apache.sqoop.mapreduce.MergeJob.MERGE_NEW_PATH_KEY;
+  public static final String MERGE_KEY_COL_KEY =
+      org.apache.sqoop.mapreduce.MergeJob.MERGE_KEY_COL_KEY;
+  public static final String MERGE_SQOOP_RECORD_KEY =
+      org.apache.sqoop.mapreduce.MergeJob.MERGE_SQOOP_RECORD_KEY;
 
   public MergeJob(final SqoopOptions opts) {
-    super(opts, null, null, null);
+    super(opts);
   }
 
-  public boolean runMergeJob() throws IOException {
-    Configuration conf = options.getConf();
-    Job job = new Job(conf);
-
-    String userClassName = options.getClassName();
-    if (null == userClassName) {
-      // Shouldn't get here.
-      throw new IOException("Record class name not specified with "
-          + "--class-name.");
-    }
-
-    // Set the external jar to use for the job.
-    String existingJar = options.getExistingJarName();
-    if (existingJar != null) {
-      // User explicitly identified a jar path.
-      LOG.debug("Setting job jar to user-specified jar: " + existingJar);
-      job.getConfiguration().set("mapred.jar", existingJar);
-    } else {
-      // Infer it from the location of the specified class, if it's on the
-      // classpath.
-      try {
-        Class<? extends Object> userClass = conf.getClassByName(userClassName);
-        if (null != userClass) {
-          String userJar = Jars.getJarPathForClass(userClass);
-          LOG.debug("Setting job jar based on user class " + userClassName
-              + ": " + userJar);
-          job.getConfiguration().set("mapred.jar", userJar);
-        } else {
-          LOG.warn("Specified class " + userClassName + " is not in a jar. "
-              + "MapReduce may not find the class");
-        }
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException(cnfe);
-      }
-    }
-
-    try {
-      Path oldPath = new Path(options.getMergeOldPath());
-      Path newPath = new Path(options.getMergeNewPath());
-
-      Configuration jobConf = job.getConfiguration();
-      FileSystem fs = FileSystem.get(jobConf);
-      oldPath = oldPath.makeQualified(fs);
-      newPath = newPath.makeQualified(fs);
-
-      FileInputFormat.addInputPath(job, oldPath);
-      FileInputFormat.addInputPath(job, newPath);
-
-      jobConf.set(MERGE_OLD_PATH_KEY, oldPath.toString());
-      jobConf.set(MERGE_NEW_PATH_KEY, newPath.toString());
-      jobConf.set(MERGE_KEY_COL_KEY, options.getMergeKeyCol());
-      jobConf.set(MERGE_SQOOP_RECORD_KEY, userClassName);
-
-      FileOutputFormat.setOutputPath(job, new Path(options.getTargetDir()));
-
-      if (ExportJobBase.isSequenceFiles(jobConf, newPath)) {
-        job.setInputFormatClass(SequenceFileInputFormat.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setMapperClass(MergeRecordMapper.class);
-      } else {
-        job.setMapperClass(MergeTextMapper.class);
-        job.setOutputFormatClass(RawKeyTextOutputFormat.class);
-      }
-
-      jobConf.set("mapred.output.key.class", userClassName);
-      job.setOutputValueClass(NullWritable.class);
-
-      job.setReducerClass(MergeReducer.class);
-
-      // Set the intermediate data types.
-      job.setMapOutputKeyClass(Text.class);
-      job.setMapOutputValueClass(MergeRecord.class);
-
-      // Make sure Sqoop and anything else we need is on the classpath.
-      cacheJars(job, null);
-      setJob(job);
-      return this.runJob(job);
-    } catch (InterruptedException ie) {
-      throw new IOException(ie);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException(cnfe);
-    }
-  }
 }
 
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeMapperBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeMapperBase.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeMapperBase.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeMapperBase.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,72 +18,9 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-import com.cloudera.sqoop.lib.SqoopRecord;
-
 /**
- * Given a set of SqoopRecord instances which are from a "new" dataset
- * or an "old" dataset, extract a key column from the record and tag
- * each record with a bit specifying whether it is a new or old record.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class MergeMapperBase<INKEY, INVAL>
-    extends Mapper<INKEY, INVAL, Text, MergeRecord> {
-
-  public static final Log LOG = LogFactory.getLog(
-      MergeMapperBase.class.getName());
-
-  private String keyColName; // name of the key column.
-  private boolean isNew; // true if this split is from the new dataset.
-
-  @Override
-  protected void setup(Context context)
-      throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-    keyColName = conf.get(MergeJob.MERGE_KEY_COL_KEY);
-
-    InputSplit is = context.getInputSplit();
-    FileSplit fs = (FileSplit) is;
-    Path splitPath = fs.getPath();
-
-    if (splitPath.toString().startsWith(
-        conf.get(MergeJob.MERGE_NEW_PATH_KEY))) {
-      this.isNew = true;
-    } else if (splitPath.toString().startsWith(
-        conf.get(MergeJob.MERGE_OLD_PATH_KEY))) {
-      this.isNew = false;
-    } else {
-      throw new IOException("File " + splitPath + " is not under new path "
-          + conf.get(MergeJob.MERGE_NEW_PATH_KEY) + " or old path "
-          + conf.get(MergeJob.MERGE_OLD_PATH_KEY));
-    }
-  }
-
-  protected void processRecord(SqoopRecord r, Context c)
-      throws IOException, InterruptedException {
-    MergeRecord mr = new MergeRecord(r, isNew);
-    Map<String, Object> fieldMap = r.getFieldMap();
-    if (null == fieldMap) {
-      throw new IOException("No field map in record " + r);
-    }
-    Object keyObj = fieldMap.get(keyColName);
-    if (null == keyObj) {
-      throw new IOException("Cannot join values on null key. "
-          + "Did you specify a key column that exists?");
-    } else {
-      c.write(new Text(keyObj.toString()), mr);
-    }
-  }
+    extends org.apache.sqoop.mapreduce.MergeMapperBase<INKEY, INVAL> {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeRecord.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeRecord.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeRecord.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeRecord.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,117 +18,20 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
 import com.cloudera.sqoop.lib.SqoopRecord;
 
 /**
- * Class that holds a record to be merged. This contains a SqoopRecord which
- * is the "guts" of the item, and a boolean value indicating whether it is a
- * "new" record or an "old" record. In the Reducer, we prefer to emit a new
- * record rather than an old one, if a new one is available.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class MergeRecord implements Configurable, Writable {
-  private SqoopRecord sqoopRecord;
-  private boolean isNew;
-  private Configuration config;
+public class MergeRecord
+    extends org.apache.sqoop.mapreduce.MergeRecord {
 
-  /** Construct an empty MergeRecord. */
   public MergeRecord() {
-    this.sqoopRecord = null;
-    this.isNew = false;
-    this.config = new Configuration();
+    super();
   }
 
-  /**
-   * Construct a MergeRecord with all fields initialized.
-   */
   public MergeRecord(SqoopRecord sr, boolean recordIsNew) {
-    this.sqoopRecord = sr;
-    this.isNew = recordIsNew;
-    this.config = new Configuration();
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public void setConf(Configuration conf) {
-    this.config = conf;
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public Configuration getConf() {
-    return this.config;
-  }
-
-  /** @return true if this record came from the "new" dataset. */
-  public boolean isNewRecord() {
-    return isNew;
-  }
-
-  /**
-   * Set the isNew field to 'newVal'.
-   */
-  public void setNewRecord(boolean newVal) {
-    this.isNew = newVal;
-  }
-
-  /**
-   * @return the underlying SqoopRecord we're shipping.
-   */
-  public SqoopRecord getSqoopRecord() {
-    return this.sqoopRecord;
+    super(sr, recordIsNew);
   }
 
-  /**
-   * Set the SqoopRecord instance we should pass from the mapper to the
-   * reducer.
-   */
-  public void setSqoopRecord(SqoopRecord record) {
-    this.sqoopRecord = record;
-  }
-
-  @Override
-  /**
-   * {@inheritDoc}
-   */
-  public void readFields(DataInput in) throws IOException {
-    this.isNew = in.readBoolean();
-    String className = Text.readString(in);
-    if (null == this.sqoopRecord) {
-      // If we haven't already instantiated an inner SqoopRecord, do so here.
-      try {
-        Class<? extends SqoopRecord> recordClass =
-            (Class<? extends SqoopRecord>) config.getClassByName(className);
-        this.sqoopRecord = recordClass.newInstance();
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-
-    this.sqoopRecord.readFields(in);
-  }
-
-  @Override
-  /**
-   * {@inheritDoc}
-   */
-  public void write(DataOutput out) throws IOException {
-    out.writeBoolean(this.isNew);
-    Text.writeString(out, this.sqoopRecord.getClass().getName());
-    this.sqoopRecord.write(out);
-  }
-
-  @Override
-  public String toString() {
-    return "" + this.sqoopRecord;
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeRecordMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeRecordMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeRecordMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeRecordMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,20 +18,9 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-
-import com.cloudera.sqoop.lib.SqoopRecord;
-
 /**
- * Mapper for the merge program which operates on SequenceFiles.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class MergeRecordMapper
-    extends MergeMapperBase<LongWritable, SqoopRecord> {
-
-  public void map(LongWritable key, SqoopRecord val, Context c)
-      throws IOException, InterruptedException {
-    processRecord(val, c);
-  }
+    extends org.apache.sqoop.mapreduce.MergeRecordMapper {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeReducer.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeReducer.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeReducer.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeReducer.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,41 +18,10 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.cloudera.sqoop.lib.SqoopRecord;
-
 /**
- * Reducer for merge tool. Given records tagged as 'old' or 'new', emit
- * a new one if possible; otherwise, an old one.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class MergeReducer
-    extends Reducer<Text, MergeRecord, SqoopRecord, NullWritable> {
-
-  @Override
-  public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
-      throws IOException, InterruptedException {
-    SqoopRecord bestRecord = null;
-    try {
-      for (MergeRecord val : vals) {
-        if (null == bestRecord && !val.isNewRecord()) {
-          // Use an old record if we don't have a new record.
-          bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
-        } else if (val.isNewRecord()) {
-          bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
-        }
-      }
-    } catch (CloneNotSupportedException cnse) {
-      throw new IOException(cnse);
-    }
-
-    if (null != bestRecord) {
-      c.write(bestRecord, NullWritable.get());
-    }
-  }
+    extends org.apache.sqoop.mapreduce.MergeReducer {
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeTextMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeTextMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeTextMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeTextMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,44 +18,9 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.cloudera.sqoop.lib.RecordParser;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
 /**
- * Mapper for the merge program which operates on text files that we need to
- * parse into SqoopRecord instances.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class MergeTextMapper extends MergeMapperBase<LongWritable, Text> {
-
-  private SqoopRecord record;
-
-  @Override
-  protected void setup(Context c) throws IOException, InterruptedException {
-    Configuration conf = c.getConfiguration();
-
-    Class<? extends SqoopRecord> recordClass =
-        (Class<? extends SqoopRecord>) conf.getClass(
-        MergeJob.MERGE_SQOOP_RECORD_KEY, SqoopRecord.class);
-    this.record = ReflectionUtils.newInstance(recordClass, conf);
-
-    super.setup(c);
-  }
-
-  public void map(LongWritable key, Text val, Context c)
-      throws IOException, InterruptedException {
-    try {
-      this.record.parse(val);
-    } catch (RecordParser.ParseError pe) {
-      throw new IOException(pe);
-    }
-
-    processRecord(this.record, c);
-  }
+public class MergeTextMapper
+    extends org.apache.sqoop.mapreduce.MergeTextMapper {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,115 +18,18 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ImportJobContext;
-import com.cloudera.sqoop.manager.MySQLUtils;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
 
 /**
- * Class that runs an import job using mysqldump in the mapper.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class MySQLDumpImportJob extends ImportJobBase {
-
-  public static final Log LOG =
-      LogFactory.getLog(MySQLDumpImportJob.class.getName());
+public class MySQLDumpImportJob
+    extends org.apache.sqoop.mapreduce.MySQLDumpImportJob {
 
   public MySQLDumpImportJob(final SqoopOptions opts, ImportJobContext context)
       throws ClassNotFoundException {
-    super(opts, MySQLDumpMapper.class, MySQLDumpInputFormat.class,
-        RawKeyTextOutputFormat.class, context);
-  }
-
-  /**
-   * Configure the inputformat to use for the job.
-   */
-  protected void configureInputFormat(Job job, String tableName,
-      String tableClassName, String splitByCol)
-      throws ClassNotFoundException, IOException {
-
-    if (null == tableName) {
-        LOG.error(
-            "mysqldump-based import cannot support free-form query imports.");
-        LOG.error("Do not use --direct and --query together for MySQL.");
-        throw new IOException("null tableName for MySQLDumpImportJob.");
-    }
-
-    ConnManager mgr = getContext().getConnManager();
-    String username = options.getUsername();
-    if (null == username || username.length() == 0) {
-      DBConfiguration.configureDB(job.getConfiguration(),
-          mgr.getDriverClass(), options.getConnectString());
-    } else {
-      DBConfiguration.configureDB(job.getConfiguration(),
-          mgr.getDriverClass(), options.getConnectString(), username,
-          options.getPassword());
-    }
-
-    String [] colNames = options.getColumns();
-    if (null == colNames) {
-      colNames = mgr.getColumnNames(tableName);
-    }
-
-    String [] sqlColNames = null;
-    if (null != colNames) {
-      sqlColNames = new String[colNames.length];
-      for (int i = 0; i < colNames.length; i++) {
-        sqlColNames[i] = mgr.escapeColName(colNames[i]);
-      }
-    }
-
-    // It's ok if the where clause is null in DBInputFormat.setInput.
-    String whereClause = options.getWhereClause();
-
-    // We can't set the class properly in here, because we may not have the
-    // jar loaded in this JVM. So we start by calling setInput() with
-    // DBWritable and then overriding the string manually.
-
-    // Note that mysqldump also does *not* want a quoted table name.
-    DataDrivenDBInputFormat.setInput(job, DBWritable.class,
-        tableName, whereClause,
-        mgr.escapeColName(splitByCol), sqlColNames);
-
-    Configuration conf = job.getConfiguration();
-    conf.setInt(MySQLUtils.OUTPUT_FIELD_DELIM_KEY,
-        options.getOutputFieldDelim());
-    conf.setInt(MySQLUtils.OUTPUT_RECORD_DELIM_KEY,
-        options.getOutputRecordDelim());
-    conf.setInt(MySQLUtils.OUTPUT_ENCLOSED_BY_KEY,
-        options.getOutputEnclosedBy());
-    conf.setInt(MySQLUtils.OUTPUT_ESCAPED_BY_KEY,
-        options.getOutputEscapedBy());
-    conf.setBoolean(MySQLUtils.OUTPUT_ENCLOSE_REQUIRED_KEY,
-        options.isOutputEncloseRequired());
-    String [] extraArgs = options.getExtraArgs();
-    if (null != extraArgs) {
-      conf.setStrings(MySQLUtils.EXTRA_ARGS_KEY, extraArgs);
-    }
-
-    LOG.debug("Using InputFormat: " + inputFormatClass);
-    job.setInputFormatClass(getInputFormatClass());
-  }
-
-  /**
-   * Set the mapper class implementation to use in the job,
-   * as well as any related configuration (e.g., map output types).
-   */
-  protected void configureMapper(Job job, String tableName,
-      String tableClassName) throws ClassNotFoundException, IOException {
-    job.setMapperClass(getMapperClass());
-    job.setOutputKeyClass(String.class);
-    job.setOutputValueClass(NullWritable.class);
+    super(opts, context);
   }
 
 }



Mime
View raw message