sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1190489 [1/6] - in /incubator/sqoop/trunk/src/java: com/cloudera/sqoop/mapreduce/ org/apache/sqoop/mapreduce/
Date Fri, 28 Oct 2011 18:22:19 GMT
Author: arvind
Date: Fri Oct 28 18:22:16 2011
New Revision: 1190489

URL: http://svn.apache.org/viewvc?rev=1190489&view=rev
Log:
SQOOP-376. Migrate mapreduce package to new namespace.

(Bilung Lee via Arvind Prabhakar)

Added:
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AutoProgressMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroExportMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroImportMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroInputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/AvroRecordReader.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/CombineShimRecordReader.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DelegatingOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportBatchOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportInputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportJobBase.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ExportOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/HBaseImportMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcExportJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpdateExportJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JdbcUpsertExportJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/JobBase.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeMapperBase.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecord.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeRecordMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeReducer.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MergeTextMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpImportJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpInputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLDumpMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportJob.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLRecordExportMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/MySQLTextExportMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/NullOutputCommitter.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleExportOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/OracleUpsertOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/RawKeyTextOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SQLServerExportOutputFormat.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileExportMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/SequenceFileImportMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextExportMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/TextImportMapper.java
    incubator/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/UpdateOutputFormat.java
Modified:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AutoProgressMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/CombineShimRecordReader.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DelegatingOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/HBaseImportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/HBaseImportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ImportJobBase.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JobBase.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeMapperBase.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeRecord.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeRecordMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeReducer.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MergeTextMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpImportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpInputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLDumpMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLExportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLRecordExportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/MySQLTextExportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/NullOutputCommitter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleExportOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/RawKeyTextOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SQLServerExportOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileExportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/SequenceFileImportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextExportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/TextImportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,289 +18,65 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.concurrent.SynchronousQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.StringUtils;
-
 import com.cloudera.sqoop.lib.SqoopRecord;
 
 /**
- * Abstract OutputFormat class that allows the RecordWriter to buffer
- * up SQL commands which should be executed in a separate thread after
- * enough commands are created.
- *
- * This supports a configurable "spill threshold" at which
- * point intermediate transactions are committed.
- *
- * Uses DBOutputFormat/DBConfiguration for configuring the output.
- * This is used in conjunction with the abstract AsyncSqlRecordWriter
- * class.
- *
- * Clients of this OutputFormat must implement getRecordWriter(); the
- * returned RecordWriter is intended to subclass AsyncSqlRecordWriter.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public abstract class AsyncSqlOutputFormat<K extends SqoopRecord, V>
-    extends OutputFormat<K, V> {
+    extends org.apache.sqoop.mapreduce.AsyncSqlOutputFormat<K, V> {
 
-  /** conf key: number of rows to export per INSERT statement. */
   public static final String RECORDS_PER_STATEMENT_KEY =
-      "sqoop.export.records.per.statement";
+      org.apache.sqoop.mapreduce.AsyncSqlOutputFormat.
+      RECORDS_PER_STATEMENT_KEY;
 
-  /** conf key: number of INSERT statements to bundle per tx.
-   * If this is set to -1, then a single transaction will be used
-   * per task. Note that each statement may encompass multiple
-   * rows, depending on the value of sqoop.export.records.per.statement.
-   */
   public static final String STATEMENTS_PER_TRANSACTION_KEY =
-      "sqoop.export.statements.per.transaction";
-
-  /**
-   * Default number of records to put in an INSERT statement or
-   * other batched update statement.
-   */
-  public static final int DEFAULT_RECORDS_PER_STATEMENT = 100;
+      org.apache.sqoop.mapreduce.AsyncSqlOutputFormat.
+      STATEMENTS_PER_TRANSACTION_KEY;
 
-  /**
-   * Default number of statements to execute before committing the
-   * current transaction.
-   */
-  public static final int DEFAULT_STATEMENTS_PER_TRANSACTION = 100;
+  public static final int DEFAULT_RECORDS_PER_STATEMENT =
+      org.apache.sqoop.mapreduce.AsyncSqlOutputFormat.
+      DEFAULT_RECORDS_PER_STATEMENT;
+
+  public static final int DEFAULT_STATEMENTS_PER_TRANSACTION =
+      org.apache.sqoop.mapreduce.AsyncSqlOutputFormat.
+      DEFAULT_STATEMENTS_PER_TRANSACTION;
+
+  public static final int UNLIMITED_STATEMENTS_PER_TRANSACTION =
+      org.apache.sqoop.mapreduce.AsyncSqlOutputFormat.
+      UNLIMITED_STATEMENTS_PER_TRANSACTION;
 
   /**
-   * Value for STATEMENTS_PER_TRANSACTION_KEY signifying that we should
-   * not commit until the RecordWriter is being closed, regardless of
-   * the number of statements we execute.
+   * @deprecated Moving to use org.apache.sqoop namespace.
    */
-  public static final int UNLIMITED_STATEMENTS_PER_TRANSACTION = -1;
+  public static class AsyncDBOperation
+      extends org.apache.sqoop.mapreduce.AsyncSqlOutputFormat.
+      AsyncDBOperation {
 
-  private static final Log LOG = LogFactory.getLog(AsyncSqlOutputFormat.class);
-
-  @Override
-  /** {@inheritDoc} */
-  public void checkOutputSpecs(JobContext context)
-      throws IOException, InterruptedException {
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    return new NullOutputCommitter();
-  }
-
-  /**
-   * Represents a database update operation that should be performed
-   * by an asynchronous background thread.
-   * AsyncDBOperation objects are immutable.
-   * They MAY contain a statement which should be executed. The
-   * statement may also be null.
-   *
-   * They may also set 'commitAndClose' to true. If true, then the
-   * executor of this operation should commit the current
-   * transaction, even if stmt is null, and then stop the executor
-   * thread.
-   */
-  public static class AsyncDBOperation {
-    private final PreparedStatement stmt;
-    private final boolean isBatch;
-    private final boolean commit;
-    private final boolean stopThread;
-
-    @Deprecated
-    /** Do not use AsyncDBOperation(PreparedStatement s, boolean
-     * commitAndClose, boolean batch). Use AsyncDBOperation(PreparedStatement
-     *  s, boolean batch, boolean commit, boolean stopThread) instead.
-     */
     public AsyncDBOperation(PreparedStatement s, boolean commitAndClose,
         boolean batch) {
-        this(s, batch, commitAndClose, commitAndClose);
+        super(s, commitAndClose, batch);
     }
 
-    /**
-     * Create an asynchronous database operation.
-     * @param s the statement, if any, to execute.
-     * @param batch is true if this is a batch PreparedStatement, or false
-     * if it's a normal singleton statement.
-     * @param commit is true if this statement should be committed to the
-     * database.
-     * @param stopThread if true, the executor thread should stop after this
-     * operation.
-     */
     public AsyncDBOperation(PreparedStatement s, boolean batch,
         boolean commit, boolean stopThread) {
-      this.stmt = s;
-      this.isBatch = batch;
-      this.commit = commit;
-      this.stopThread = stopThread;
-    }
-
-    /**
-     * @return a statement to run as an update.
-     */
-    public PreparedStatement getStatement() {
-      return stmt;
-    }
-
-    /**
-     * @return true if the executor should commit the current transaction.
-     * If getStatement() is non-null, the statement is run first.
-     */
-    public boolean requiresCommit() {
-      return this.commit;
+      super(s, batch, commit, stopThread);
     }
 
-    /**
-     * @return true if the executor should stop after this command.
-     */
-    public boolean stop() {
-      return this.stopThread;
-    }
-
-    /**
-     * @return true if this is a batch SQL statement.
-     */
-    public boolean execAsBatch() {
-      return this.isBatch;
-    }
   }
 
   /**
-   * A thread that runs the database interactions asynchronously
-   * from the OutputCollector.
+   * @deprecated Moving to use org.apache.sqoop namespace.
    */
-  public static class AsyncSqlExecThread extends Thread {
-
-    private final Connection conn; // The connection to the database.
-    private SQLException err; // Error from a previously-run statement.
+  public static class AsyncSqlExecThread
+      extends org.apache.sqoop.mapreduce.AsyncSqlOutputFormat.
+      AsyncSqlExecThread{
 
-    // How we receive database operations from the RecordWriter.
-    private SynchronousQueue<AsyncDBOperation> opsQueue;
-
-    protected int curNumStatements; // statements executed thus far in the tx.
-    protected final int stmtsPerTx;  // statements per transaction.
-
-    /**
-     * Create a new update thread that interacts with the database.
-     * @param conn the connection to use. This must only be used by this
-     * thread.
-     * @param stmtsPerTx the number of statements to execute before committing
-     * the current transaction.
-     */
     public AsyncSqlExecThread(Connection conn, int stmtsPerTx) {
-      this.conn = conn;
-      this.err = null;
-      this.opsQueue = new SynchronousQueue<AsyncDBOperation>();
-      this.stmtsPerTx = stmtsPerTx;
-    }
-
-    public void run() {
-      while (true) {
-        AsyncDBOperation op = null;
-        try {
-          op = opsQueue.take();
-        } catch (InterruptedException ie) {
-          LOG.warn("Interrupted retrieving from operation queue: "
-              + StringUtils.stringifyException(ie));
-          continue;
-        }
-
-        if (null == op) {
-          // This shouldn't be allowed to happen.
-          LOG.warn("Null operation in queue; illegal state.");
-          continue;
-        }
-
-        PreparedStatement stmt = op.getStatement();
-        // Synchronize on the connection to ensure it does not conflict
-        // with the prepareStatement() call in the main thread.
-        synchronized (conn) {
-          try {
-            if (null != stmt) {
-              if (op.execAsBatch()) {
-                stmt.executeBatch();
-              } else {
-                stmt.execute();
-              }
-              stmt.close();
-              stmt = null;
-              this.curNumStatements++;
-            }
-
-            if (op.requiresCommit() || (curNumStatements >= stmtsPerTx
-                && stmtsPerTx != UNLIMITED_STATEMENTS_PER_TRANSACTION)) {
-              LOG.debug("Committing transaction of " + curNumStatements
-                  + " statements");
-              this.conn.commit();
-              this.curNumStatements = 0;
-            }
-          } catch (SQLException sqlE) {
-            setLastError(sqlE);
-          } finally {
-            // Close the statement on our way out if that didn't happen
-            // via the normal execution path.
-            if (null != stmt) {
-              try {
-                stmt.close();
-              } catch (SQLException sqlE) {
-                setLastError(sqlE);
-              }
-            }
-
-            // Always check whether we should end the loop, regardless
-            // of the presence of an exception.
-            if (op.stop()) {
-              return;
-            }
-          } // try .. catch .. finally.
-        } // synchronized (conn)
-      }
-    }
-
-    /**
-     * Allows a user to enqueue the next database operation to run.
-     * Since the connection can only execute a single operation at a time,
-     * the put() method may block if another operation is already underway.
-     * @param op the database operation to perform.
-     */
-    public void put(AsyncDBOperation op) throws InterruptedException {
-      opsQueue.put(op);
-    }
-
-    /**
-     * If a previously-executed statement resulted in an error, post it here.
-     * If the error slot was already filled, then subsequent errors are
-     * squashed until the user calls this method (which clears the error
-     * slot).
-     * @return any SQLException that occurred due to a previously-run
-     * statement.
-     */
-    public synchronized SQLException getLastError() {
-      SQLException e = this.err;
-      this.err = null;
-      return e;
+      super(conn, stmtsPerTx);
     }
 
-    private synchronized void setLastError(SQLException e) {
-      if (this.err == null) {
-        // Just set it.
-        LOG.error("Got exception in update thread: "
-            + StringUtils.stringifyException(e));
-        this.err = e;
-      } else {
-        // Slot is full. Log it and discard.
-        LOG.error("SQLException in update thread but error slot full: "
-            + StringUtils.stringifyException(e));
-      }
-    }
   }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,207 +18,19 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.util.LoggingUtils;
 import com.cloudera.sqoop.lib.SqoopRecord;
 
 /**
- * Abstract RecordWriter base class that buffers SqoopRecords to be injected
- * into JDBC SQL PreparedStatements to be executed by the
- * AsyncSqlOutputFormat's background thread.
- *
- * Record objects are buffered before actually performing the INSERT
- * statements; this requires that the key implement the SqoopRecord interface.
- *
- * Uses DBOutputFormat/DBConfiguration for configuring the output.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public abstract class AsyncSqlRecordWriter<K extends SqoopRecord, V>
-    extends RecordWriter<K, V> {
-
-  private static final Log LOG = LogFactory.getLog(AsyncSqlRecordWriter.class);
-
-  private Connection connection;
-
-  private Configuration conf;
-
-  protected final int rowsPerStmt; // rows to insert per statement.
-
-  // Buffer for records to be put into export SQL statements.
-  private List<SqoopRecord> records;
-
-  // Background thread to actually perform the updates.
-  private AsyncSqlOutputFormat.AsyncSqlExecThread execThread;
-  private boolean startedExecThread;
+    extends org.apache.sqoop.mapreduce.AsyncSqlRecordWriter<K, V> {
 
   public AsyncSqlRecordWriter(TaskAttemptContext context)
       throws ClassNotFoundException, SQLException {
-    this.conf = context.getConfiguration();
-
-    this.rowsPerStmt = conf.getInt(
-        AsyncSqlOutputFormat.RECORDS_PER_STATEMENT_KEY,
-        AsyncSqlOutputFormat.DEFAULT_RECORDS_PER_STATEMENT);
-    int stmtsPerTx = conf.getInt(
-        AsyncSqlOutputFormat.STATEMENTS_PER_TRANSACTION_KEY,
-        AsyncSqlOutputFormat.DEFAULT_STATEMENTS_PER_TRANSACTION);
-
-    DBConfiguration dbConf = new DBConfiguration(conf);
-    this.connection = dbConf.getConnection();
-    this.connection.setAutoCommit(false);
-
-    this.records = new ArrayList<SqoopRecord>(this.rowsPerStmt);
-
-    this.execThread = new AsyncSqlOutputFormat.AsyncSqlExecThread(
-        connection, stmtsPerTx);
-    this.execThread.setDaemon(true);
-    this.startedExecThread = false;
-  }
-
-  /**
-   * Allow subclasses access to the Connection instance we hold.
-   * This Connection is shared with the asynchronous SQL exec thread.
-   * Any uses of the Connection must be synchronized on it.
-   * @return the Connection object used for this SQL transaction.
-   */
-  protected final Connection getConnection() {
-    return this.connection;
+    super(context);
   }
 
-  /**
-   * Allow subclasses access to the Configuration.
-   * @return the Configuration for this MapReduc task.
-   */
-  protected final Configuration getConf() {
-    return this.conf;
-  }
-
-  /**
-   * Should return 'true' if the PreparedStatements generated by the
-   * RecordWriter are intended to be executed in "batch" mode, or false
-   * if it's just one big statement.
-   */
-  protected boolean isBatchExec() {
-    return false;
-  }
-
-  /**
-   * Generate the PreparedStatement object that will be fed into the execution
-   * thread. All parameterized fields of the PreparedStatement must be set in
-   * this method as well; this is usually based on the records collected from
-   * the user in the userRecords list.
-   *
-   * Note that any uses of the Connection object here must be synchronized on
-   * the Connection.
-   *
-   * @param userRecords a list of records that should be injected into SQL
-   * statements.
-   * @return a PreparedStatement to be populated with rows
-   * from the collected record list.
-   */
-  protected abstract PreparedStatement getPreparedStatement(
-      List<SqoopRecord> userRecords) throws SQLException;
-
-  /**
-   * Takes the current contents of 'records' and formats and executes the
-   * INSERT statement.
-   * @param closeConn if true, commits the transaction and closes the
-   * connection.
-   */
-  private void execUpdate(boolean commit, boolean stopThread)
-      throws InterruptedException, SQLException {
-
-    if (!startedExecThread) {
-      this.execThread.start();
-      this.startedExecThread = true;
-    }
-
-    PreparedStatement stmt = null;
-    boolean successfulPut = false;
-    try {
-      if (records.size() > 0) {
-        stmt = getPreparedStatement(records);
-        this.records.clear();
-      }
-
-      // Pass this operation off to the update thread. This will block if
-      // the update thread is already performing an update.
-      AsyncSqlOutputFormat.AsyncDBOperation op =
-          new AsyncSqlOutputFormat.AsyncDBOperation(stmt, isBatchExec(),
-                  commit, stopThread);
-      execThread.put(op);
-      successfulPut = true; // op has been posted to the other thread.
-    } finally {
-      if (!successfulPut && null != stmt) {
-        // We created a statement but failed to enqueue it. Close it.
-        stmt.close();
-      }
-    }
-
-    // Check for any previous SQLException. If one happened, rethrow it here.
-    SQLException lastException = execThread.getLastError();
-    if (null != lastException) {
-      LoggingUtils.logAll(LOG, lastException);
-      throw lastException;
-    }
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public void close(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    try {
-      try {
-        execUpdate(true, true);
-        execThread.join();
-      } catch (SQLException sqle) {
-        throw new IOException(sqle);
-      }
-
-      // If we're not leaving on an error return path already,
-      // now that execThread is definitely stopped, check that the
-      // error slot remains empty.
-      SQLException lastErr = execThread.getLastError();
-      if (null != lastErr) {
-        throw new IOException(lastErr);
-      }
-    } finally {
-      try {
-        closeConnection(context);
-      } catch (SQLException sqle) {
-        throw new IOException(sqle);
-      }
-    }
-  }
-
-  public void closeConnection(TaskAttemptContext context)
-      throws SQLException {
-    this.connection.close();
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public void write(K key, V value)
-      throws InterruptedException, IOException {
-    try {
-      records.add((SqoopRecord) key.clone());
-      if (records.size() >= this.rowsPerStmt) {
-        execUpdate(false, false);
-      }
-    } catch (CloneNotSupportedException cnse) {
-      throw new IOException("Could not buffer record", cnse);
-    } catch (SQLException sqlException) {
-      throw new IOException(sqlException);
-    }
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AutoProgressMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AutoProgressMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AutoProgressMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AutoProgressMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,186 +18,25 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-
 /**
- * Identity mapper that continuously reports progress via a background thread.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class AutoProgressMapper<KEYIN, VALIN, KEYOUT, VALOUT>
-    extends Mapper<KEYIN, VALIN, KEYOUT, VALOUT> {
-
-  public static final Log LOG = LogFactory.getLog(
-      AutoProgressMapper.class.getName());
-
-  /**
-   * Total number of millis for which progress will be reported by the
-   * auto-progress thread. If this is zero, then the auto-progress thread will
-   * never voluntarily exit.
-   */
-  private int maxProgressPeriod;
-
-  /**
-   * Number of milliseconds to sleep for between loop iterations. Must be less
-   * than report interval.
-   */
-  private int sleepInterval;
-
-  /**
-   * Number of milliseconds between calls to Reporter.progress().
-   * Should be a multiple of the sleepInterval.
-   */
-  private int reportInterval;
+    extends org.apache.sqoop.mapreduce.AutoProgressMapper
+    <KEYIN, VALIN, KEYOUT, VALOUT> {
 
   public static final String MAX_PROGRESS_PERIOD_KEY =
-      "sqoop.mapred.auto.progress.max";
+      org.apache.sqoop.mapreduce.AutoProgressMapper.MAX_PROGRESS_PERIOD_KEY;
   public static final String SLEEP_INTERVAL_KEY =
-      "sqoop.mapred.auto.progress.sleep";
+      org.apache.sqoop.mapreduce.AutoProgressMapper.SLEEP_INTERVAL_KEY;
   public static final String REPORT_INTERVAL_KEY =
-      "sqoop.mapred.auto.progress.report";
-
-  // Sleep for 10 seconds at a time.
-  static final int DEFAULT_SLEEP_INTERVAL = 10000;
-
-  // Report progress every 30 seconds.
-  static final int DEFAULT_REPORT_INTERVAL = 30000;
-
-  // Disable max progress, by default.
-  static final int DEFAULT_MAX_PROGRESS = 0;
-
-  private class ProgressThread extends Thread {
-
-    private volatile boolean keepGoing; // While this is true, thread runs.
-
-    private Context context;
-    private long startTimeMillis;
-    private long lastReportMillis;
-
-    public ProgressThread(final Context ctxt) {
-      this.context = ctxt;
-      this.keepGoing = true;
-    }
-
-    public void signalShutdown() {
-      this.keepGoing = false; // volatile update.
-      this.interrupt();
-    }
-
-    public void run() {
-      this.lastReportMillis = System.currentTimeMillis();
-      this.startTimeMillis = this.lastReportMillis;
-
-      final long MAX_PROGRESS = AutoProgressMapper.this.maxProgressPeriod;
-      final long REPORT_INTERVAL = AutoProgressMapper.this.reportInterval;
-      final long SLEEP_INTERVAL = AutoProgressMapper.this.sleepInterval;
-
-      // In a loop:
-      //   * Check that we haven't run for too long (maxProgressPeriod).
-      //   * If it's been a report interval since we last made progress,
-      //     make more.
-      //   * Sleep for a bit.
-      //   * If the parent thread has signaled for exit, do so.
-      while (this.keepGoing) {
-        long curTimeMillis = System.currentTimeMillis();
-
-        if (MAX_PROGRESS != 0
-            && curTimeMillis - this.startTimeMillis > MAX_PROGRESS) {
-          this.keepGoing = false;
-          LOG.info("Auto-progress thread exiting after " + MAX_PROGRESS
-              + " ms.");
-          break;
-        }
-
-        if (curTimeMillis - this.lastReportMillis > REPORT_INTERVAL) {
-          // It's been a full report interval -- claim progress.
-          LOG.debug("Auto-progress thread reporting progress");
-          this.context.progress();
-          this.lastReportMillis = curTimeMillis;
-        }
-
-        // Unless we got an interrupt while we were working,
-        // sleep a bit before doing more work.
-        if (!this.interrupted()) {
-          try {
-            Thread.sleep(SLEEP_INTERVAL);
-          } catch (InterruptedException ie) {
-            // we were notified on something; not necessarily an error.
-          }
-        }
-      }
-
-      LOG.info("Auto-progress thread is finished. keepGoing=" + this.keepGoing);
-    }
-  }
-
-  /**
-   * Set configuration parameters for the auto-progress thread.
-   */
-  private void configureAutoProgress(Configuration job) {
-    this.maxProgressPeriod = job.getInt(MAX_PROGRESS_PERIOD_KEY,
-        DEFAULT_MAX_PROGRESS);
-    this.sleepInterval = job.getInt(SLEEP_INTERVAL_KEY,
-        DEFAULT_SLEEP_INTERVAL);
-    this.reportInterval = job.getInt(REPORT_INTERVAL_KEY,
-        DEFAULT_REPORT_INTERVAL);
-
-    if (this.reportInterval < 1) {
-      LOG.warn("Invalid " + REPORT_INTERVAL_KEY + "; setting to "
-          + DEFAULT_REPORT_INTERVAL);
-      this.reportInterval = DEFAULT_REPORT_INTERVAL;
-    }
-
-    if (this.sleepInterval > this.reportInterval || this.sleepInterval < 1) {
-      LOG.warn("Invalid " + SLEEP_INTERVAL_KEY + "; setting to "
-          + DEFAULT_SLEEP_INTERVAL);
-      this.sleepInterval = DEFAULT_SLEEP_INTERVAL;
-    }
-
-    if (this.maxProgressPeriod < 0) {
-      LOG.warn("Invalid " + MAX_PROGRESS_PERIOD_KEY + "; setting to "
-          + DEFAULT_MAX_PROGRESS);
-      this.maxProgressPeriod = DEFAULT_MAX_PROGRESS;
-    }
-  }
-
-
-  // map() method intentionally omitted; Mapper.map() is the identity mapper.
-
-
-  /**
-   * Run the mapping process for this task, wrapped in an auto-progress system.
-   */
-  @Override
-  public void run(Context context) throws IOException, InterruptedException {
-    configureAutoProgress(context.getConfiguration());
-    ProgressThread thread = this.new ProgressThread(context);
+      org.apache.sqoop.mapreduce.AutoProgressMapper.REPORT_INTERVAL_KEY;
 
-    try {
-      thread.setDaemon(true);
-      thread.start();
+  public static final int DEFAULT_SLEEP_INTERVAL =
+      org.apache.sqoop.mapreduce.AutoProgressMapper.DEFAULT_SLEEP_INTERVAL;
+  public static final int DEFAULT_REPORT_INTERVAL =
+      org.apache.sqoop.mapreduce.AutoProgressMapper.DEFAULT_REPORT_INTERVAL;
+  public static final int DEFAULT_MAX_PROGRESS =
+      org.apache.sqoop.mapreduce.AutoProgressMapper.DEFAULT_MAX_PROGRESS;
 
-      // use default run() method to actually drive the mapping.
-      super.run(context);
-    } finally {
-      // Tell the progress thread to exit..
-      LOG.debug("Instructing auto-progress thread to quit.");
-      thread.signalShutdown();
-      try {
-        // And wait for that to happen.
-        LOG.debug("Waiting for progress thread shutdown...");
-        thread.join();
-        LOG.debug("Progress thread shutdown detected.");
-      } catch (InterruptedException ie) {
-        LOG.warn("Interrupted when waiting on auto-progress thread: "
-            + ie.toString());
-      }
-    }
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,188 +18,13 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import com.cloudera.sqoop.lib.SqoopRecord;
-import com.cloudera.sqoop.orm.ClassWriter;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.generic.GenericEnumSymbol;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DefaultStringifier;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-
 /**
- * Exports records from an Avro data file.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class AvroExportMapper
-    extends AutoProgressMapper<AvroWrapper<GenericRecord>, NullWritable,
-              SqoopRecord, NullWritable> {
-
-  private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
-
-  private static final String TIME_TYPE = "java.sql.Time";
-
-  private static final String DATE_TYPE = "java.sql.Date";
-
-  private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
-
-  static final String AVRO_COLUMN_TYPES_MAP = "sqoop.avro.column.types.map";
-
-  private MapWritable columnTypes;
-  private SqoopRecord recordImpl;
-
-  @Override
-  protected void setup(Context context)
-      throws IOException, InterruptedException {
-
-    super.setup(context);
-
-    Configuration conf = context.getConfiguration();
-
-    // Instantiate a copy of the user's class to hold and parse the record.
-    String recordClassName = conf.get(
-        ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
-    if (null == recordClassName) {
-      throw new IOException("Export table class name ("
-          + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
-          + ") is not set!");
-    }
-
-    try {
-      Class cls = Class.forName(recordClassName, true,
-          Thread.currentThread().getContextClassLoader());
-      recordImpl = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException(cnfe);
-    }
-
-    if (null == recordImpl) {
-      throw new IOException("Could not instantiate object of type "
-          + recordClassName);
-    }
-
-    columnTypes = DefaultStringifier.load(conf, AVRO_COLUMN_TYPES_MAP,
-        MapWritable.class);
-  }
-
-  @Override
-  protected void map(AvroWrapper<GenericRecord> key, NullWritable value,
-      Context context) throws IOException, InterruptedException {
-    context.write(toSqoopRecord(key.datum()), NullWritable.get());
-  }
-
-  private SqoopRecord toSqoopRecord(GenericRecord record) throws IOException {
-    Schema avroSchema = record.getSchema();
-    for (Map.Entry<Writable, Writable> e : columnTypes.entrySet()) {
-      String columnName = e.getKey().toString();
-      String columnType = e.getValue().toString();
-      String cleanedCol = ClassWriter.toIdentifier(columnName);
-      Field field = getField(avroSchema, cleanedCol, record);
-      if (field == null) {
-        throw new IOException("Cannot find field " + cleanedCol
-          + " in Avro schema " + avroSchema);
-      } else {
-        Object avroObject = record.get(field.name());
-        Object fieldVal = fromAvro(avroObject, field.schema(), columnType);
-        recordImpl.setField(cleanedCol, fieldVal);
-      }
-    }
-    return recordImpl;
-  }
-
-  private Field getField(Schema avroSchema, String fieldName,
-      GenericRecord record) {
-    for (Field field : avroSchema.getFields()) {
-      if (field.name().equalsIgnoreCase(fieldName)) {
-        return field;
-      }
-    }
-    return null;
-  }
-
-  private Object fromAvro(Object avroObject, Schema fieldSchema,
-      String columnType) {
-    // map from Avro type to Sqoop's Java representation of the SQL type
-    // see SqlManager#toJavaType
-
-    if (avroObject == null) {
-      return null;
-    }
+    extends org.apache.sqoop.mapreduce.AvroExportMapper {
 
-    switch (fieldSchema.getType()) {
-      case NULL:
-        return null;
-      case BOOLEAN:
-      case INT:
-      case FLOAT:
-      case DOUBLE:
-        return avroObject;
-      case LONG:
-        if (columnType.equals(DATE_TYPE)) {
-          return new Date((Long) avroObject);
-        } else if (columnType.equals(TIME_TYPE)) {
-          return new Time((Long) avroObject);
-        } else if (columnType.equals(TIMESTAMP_TYPE)) {
-          return new Timestamp((Long) avroObject);
-        }
-        return avroObject;
-      case BYTES:
-        ByteBuffer bb = (ByteBuffer) avroObject;
-        BytesWritable bw = new BytesWritable();
-        bw.set(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
-        return bw;
-      case STRING:
-        if (columnType.equals(BIG_DECIMAL_TYPE)) {
-          return new BigDecimal(avroObject.toString());
-        } else if (columnType.equals(DATE_TYPE)) {
-          return Date.valueOf(avroObject.toString());
-        } else if (columnType.equals(TIME_TYPE)) {
-          return Time.valueOf(avroObject.toString());
-        } else if (columnType.equals(TIMESTAMP_TYPE)) {
-          return Timestamp.valueOf(avroObject.toString());
-        }
-        return avroObject.toString();
-      case ENUM:
-        return ((GenericEnumSymbol) avroObject).toString();
-      case UNION:
-        List<Schema> types = fieldSchema.getTypes();
-        if (types.size() != 2) {
-          throw new IllegalArgumentException("Only support union with null");
-        }
-        Schema s1 = types.get(0);
-        Schema s2 = types.get(1);
-        if (s1.getType() == Schema.Type.NULL) {
-          return fromAvro(avroObject, s2, columnType);
-        } else if (s2.getType() == Schema.Type.NULL) {
-          return fromAvro(avroObject, s1, columnType);
-        } else {
-          throw new IllegalArgumentException("Only support union with null");
-        }
-      case FIXED:
-        return new BytesWritable(((GenericFixed) avroObject).bytes());
-      case RECORD:
-      case ARRAY:
-      case MAP:
-      default:
-        throw new IllegalArgumentException("Cannot convert Avro type "
-            + fieldSchema.getType());
-    }
-  }
+  public static final String AVRO_COLUMN_TYPES_MAP =
+      org.apache.sqoop.mapreduce.AvroExportMapper.AVRO_COLUMN_TYPES_MAP;
 
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroImportMapper.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,85 +18,9 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import com.cloudera.sqoop.lib.BlobRef;
-import com.cloudera.sqoop.lib.ClobRef;
-import com.cloudera.sqoop.lib.SqoopRecord;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-
 /**
- * Imports records by transforming them to Avro records in an Avro data file.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class AvroImportMapper
-    extends AutoProgressMapper<LongWritable, SqoopRecord,
-              AvroWrapper<GenericRecord>, NullWritable> {
-
-  private final AvroWrapper<GenericRecord> wrapper =
-    new AvroWrapper<GenericRecord>();
-  private Schema schema;
-
-  @Override
-  protected void setup(Context context) {
-    schema = AvroJob.getMapOutputSchema(context.getConfiguration());
-  }
-
-  @Override
-  protected void map(LongWritable key, SqoopRecord val, Context context)
-      throws IOException, InterruptedException {
-    wrapper.datum(toGenericRecord(val));
-    context.write(wrapper, NullWritable.get());
-  }
-
-
-  private GenericRecord toGenericRecord(SqoopRecord val) {
-    Map<String, Object> fieldMap = val.getFieldMap();
-    GenericRecord record = new GenericData.Record(schema);
-    for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
-      record.put(entry.getKey(), toAvro(entry.getValue()));
-    }
-    return record;
-  }
-
-  /**
-   * Convert the Avro representation of a Java type (that has already been
-   * converted from the SQL equivalent).
-   * @param o
-   * @return
-   */
-  private Object toAvro(Object o) {
-    if (o instanceof BigDecimal) {
-      return o.toString();
-    } else if (o instanceof Date) {
-      return ((Date) o).getTime();
-    } else if (o instanceof Time) {
-      return ((Time) o).getTime();
-    } else if (o instanceof Timestamp) {
-      return ((Timestamp) o).getTime();
-    } else if (o instanceof BytesWritable) {
-      BytesWritable bw = (BytesWritable) o;
-      return ByteBuffer.wrap(bw.getBytes(), 0, bw.getLength());
-    } else if (o instanceof ClobRef) {
-      throw new UnsupportedOperationException("ClobRef not suported");
-    } else if (o instanceof BlobRef) {
-      throw new UnsupportedOperationException("BlobRef not suported");
-    }
-    // primitive types (Integer, etc) are left unchanged
-    return o;
-  }
-
-
+    extends org.apache.sqoop.mapreduce.AvroImportMapper {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroInputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,42 +18,10 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/** An {@link org.apache.hadoop.mapred.InputFormat} for Avro data files. */
+/**
+ * @deprecated Moving to use org.apache.sqoop namespace.
+ */
 public class AvroInputFormat<T>
-  extends FileInputFormat<AvroWrapper<T>, NullWritable> {
-
-  @Override
-  protected List<FileStatus> listStatus(JobContext job) throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
-    for (FileStatus file : super.listStatus(job)) {
-      if (file.getPath().getName().endsWith(
-          org.apache.avro.mapred.AvroOutputFormat.EXT)) {
-        result.add(file);
-      }
-    }
-    return result;
-  }
-
-  @Override
-  public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(
-      InputSplit split, TaskAttemptContext context) throws IOException,
-      InterruptedException {
-    context.setStatus(split.toString());
-    return new AvroRecordReader<T>();
-  }
-
+    extends org.apache.sqoop.mapreduce.AvroInputFormat<T> {
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroJob.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,20 +22,21 @@ import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 
 /**
- * Helper class for setting up an Avro MapReduce job.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public final class AvroJob {
-  public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
 
-  private AvroJob() {
-  }
+  public static final String MAP_OUTPUT_SCHEMA =
+      org.apache.sqoop.mapreduce.AvroJob.MAP_OUTPUT_SCHEMA;
+
+  private AvroJob() { }
 
   public static void setMapOutputSchema(Configuration job, Schema s) {
-    job.set(MAP_OUTPUT_SCHEMA, s.toString());
+    org.apache.sqoop.mapreduce.AvroJob.setMapOutputSchema(job, s);
   }
 
-  /** Return a job's map output key schema. */
   public static Schema getMapOutputSchema(Configuration job) {
-    return Schema.parse(job.get(MAP_OUTPUT_SCHEMA));
+    return org.apache.sqoop.mapreduce.AvroJob.getMapOutputSchema(job);
   }
+
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,48 +18,9 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/** An {@link org.apache.hadoop.mapred.OutputFormat} for Avro data files. */
+/**
+ * @deprecated Moving to use org.apache.sqoop namespace.
+ */
 public class AvroOutputFormat<T>
-  extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
-
-  @Override
-  public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
-      TaskAttemptContext context) throws IOException, InterruptedException {
-
-    Schema schema = AvroJob.getMapOutputSchema(context.getConfiguration());
-
-    final DataFileWriter<T> WRITER =
-      new DataFileWriter<T>(new GenericDatumWriter<T>());
-
-    Path path = getDefaultWorkFile(context,
-        org.apache.avro.mapred.AvroOutputFormat.EXT);
-    WRITER.create(schema,
-        path.getFileSystem(context.getConfiguration()).create(path));
-
-    return new RecordWriter<AvroWrapper<T>, NullWritable>() {
-      @Override
-      public void write(AvroWrapper<T> wrapper, NullWritable ignore)
-        throws IOException {
-        WRITER.append(wrapper.datum());
-      }
-      @Override
-      public void close(TaskAttemptContext context) throws IOException,
-          InterruptedException {
-        WRITER.close();
-      }
-    };
-  }
-
+    extends org.apache.sqoop.mapreduce.AvroOutputFormat<T> {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroRecordReader.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,88 +18,10 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.mapred.FsInput;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/** An {@link RecordReader} for Avro data files. */
+/**
+ * @deprecated Moving to use org.apache.sqoop namespace.
+ */
 public class AvroRecordReader<T>
-  extends RecordReader<AvroWrapper<T>, NullWritable> {
-
-  private FileReader<T> reader;
-  private long start;
-  private long end;
-  private AvroWrapper<T> key;
-  private NullWritable value;
-
-  @Override
-  public void initialize(InputSplit genericSplit, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    FileSplit split = (FileSplit) genericSplit;
-    Configuration conf = context.getConfiguration();
-    SeekableInput in = new FsInput(split.getPath(), conf);
-    DatumReader<T> datumReader = new GenericDatumReader<T>();
-    this.reader = DataFileReader.openReader(in, datumReader);
-    reader.sync(split.getStart());                    // sync to start
-    this.start = reader.tell();
-    this.end = split.getStart() + split.getLength();
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (!reader.hasNext() || reader.pastSync(end)) {
-      key = null;
-      value = null;
-      return false;
-    }
-    if (key == null) {
-      key = new AvroWrapper<T>();
-    }
-    if (value == null) {
-      value = NullWritable.get();
-    }
-    key.datum(reader.next(key.datum()));
-    return true;
-  }
-
-  @Override
-  public AvroWrapper<T> getCurrentKey() throws IOException,
-      InterruptedException {
-    return key;
-  }
-
-  @Override
-  public NullWritable getCurrentValue()
-      throws IOException, InterruptedException {
-    return value;
-  }
-
-  @Override
-  public float getProgress() throws IOException {
-    if (end == start) {
-      return 0.0f;
-    } else {
-      return Math.min(1.0f, (getPos() - start) / (float)(end - start));
-    }
-  }
-
-  public long getPos() throws IOException {
-    return reader.tell();
-  }
-
-  @Override
-  public void close() throws IOException { reader.close(); }
+    extends org.apache.sqoop.mapreduce.AvroRecordReader<T> {
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/CombineShimRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/CombineShimRecordReader.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/CombineShimRecordReader.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/CombineShimRecordReader.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,115 +19,19 @@
 package com.cloudera.sqoop.mapreduce;
 
 import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
-import org.apache.hadoop.util.ReflectionUtils;
 
 /**
- * RecordReader that CombineFileRecordReader can instantiate, which itself
- * translates a CombineFileSplit into a FileSplit.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class CombineShimRecordReader
-   extends RecordReader<LongWritable, Object> {
-
-  public static final Log LOG =
-     LogFactory.getLog(CombineShimRecordReader.class.getName());
+    extends org.apache.sqoop.mapreduce.CombineShimRecordReader {
 
-  private CombineFileSplit split;
-  private TaskAttemptContext context;
-  private int index;
-  private RecordReader<LongWritable, Object> rr;
-
-  /**
-   * Constructor invoked by CombineFileRecordReader that identifies part of a
-   * CombineFileSplit to use.
-   */
   public CombineShimRecordReader(CombineFileSplit split,
       TaskAttemptContext context, Integer index)
       throws IOException, InterruptedException {
-    this.index = index;
-    this.split = (CombineFileSplit) split;
-    this.context = context;
-
-    createChildReader();
-  }
-
-  @Override
-  public void initialize(InputSplit curSplit, TaskAttemptContext curContext)
-      throws IOException, InterruptedException {
-    this.split = (CombineFileSplit) curSplit;
-    this.context = curContext;
-
-    if (null == rr) {
-      createChildReader();
-    }
-
-    FileSplit fileSplit = new FileSplit(this.split.getPath(index),
-        this.split.getOffset(index), this.split.getLength(index),
-        this.split.getLocations());
-    this.rr.initialize(fileSplit, this.context);
+    super(split, context, index);
   }
 
-  @Override
-  public float getProgress() throws IOException, InterruptedException {
-    return rr.getProgress();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (null != rr) {
-      rr.close();
-      rr = null;
-    }
-  }
-
-  @Override
-  public LongWritable getCurrentKey()
-      throws IOException, InterruptedException {
-    return rr.getCurrentKey();
-  }
-
-  @Override
-  public Object getCurrentValue()
-      throws IOException, InterruptedException {
-    return rr.getCurrentValue();
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    return rr.nextKeyValue();
-  }
-
-  /**
-   * Actually instantiate the user's chosen RecordReader implementation.
-   */
-  @SuppressWarnings("unchecked")
-  private void createChildReader() throws IOException, InterruptedException {
-    LOG.debug("ChildSplit operates on: " + split.getPath(index));
-
-    Configuration conf = context.getConfiguration();
-
-    // Determine the file format we're reading.
-    Class rrClass;
-    if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
-      rrClass = SequenceFileRecordReader.class;
-    } else {
-      rrClass = LineRecordReader.class;
-    }
-
-    // Create the appropriate record reader.
-    this.rr = (RecordReader<LongWritable, Object>)
-        ReflectionUtils.newInstance(rrClass, conf);
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DataDrivenImportJob.java Fri Oct 28 18:22:16 2011
@@ -21,189 +21,24 @@
 package com.cloudera.sqoop.mapreduce;
 
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.config.ConfigurationHelper;
-import com.cloudera.sqoop.lib.LargeObjectLoader;
-import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ImportJobContext;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
-import com.cloudera.sqoop.orm.AvroSchemaGenerator;
-
-import java.io.IOException;
-import java.sql.SQLException;
-
-import org.apache.avro.Schema;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
 /**
- * Actually runs a jdbc import job using the ORM files generated by the
- * sqoop.orm package. Uses DataDrivenDBInputFormat.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
-public class DataDrivenImportJob extends ImportJobBase {
+public class DataDrivenImportJob
+    extends org.apache.sqoop.mapreduce.DataDrivenImportJob {
 
-  public static final Log LOG = LogFactory.getLog(
-      DataDrivenImportJob.class.getName());
-
-  @SuppressWarnings("unchecked")
   public DataDrivenImportJob(final SqoopOptions opts) {
-    super(opts, null, DataDrivenDBInputFormat.class, null, null);
+    super(opts);
   }
 
   public DataDrivenImportJob(final SqoopOptions opts,
       final Class<? extends InputFormat> inputFormatClass,
       ImportJobContext context) {
-    super(opts, null, inputFormatClass, null, context);
-  }
-
-  @Override
-  protected void configureMapper(Job job, String tableName,
-      String tableClassName) throws IOException {
-    if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
-      // For text files, specify these as the output types; for
-      // other types, we just use the defaults.
-      job.setOutputKeyClass(Text.class);
-      job.setOutputValueClass(NullWritable.class);
-    } else if (options.getFileLayout()
-        == SqoopOptions.FileLayout.AvroDataFile) {
-      ConnManager connManager = getContext().getConnManager();
-      AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
-          connManager, tableName);
-      Schema schema = generator.generate();
-      AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
-    }
-
-    job.setMapperClass(getMapperClass());
+    super(opts, inputFormatClass, context);
   }
 
-  @Override
-  protected Class<? extends Mapper> getMapperClass() {
-    if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
-      return TextImportMapper.class;
-    } else if (options.getFileLayout()
-        == SqoopOptions.FileLayout.SequenceFile) {
-      return SequenceFileImportMapper.class;
-    } else if (options.getFileLayout()
-        == SqoopOptions.FileLayout.AvroDataFile) {
-      return AvroImportMapper.class;
-    }
-
-    return null;
-  }
-
-  @Override
-  protected Class<? extends OutputFormat> getOutputFormatClass()
-      throws ClassNotFoundException {
-    if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
-      return RawKeyTextOutputFormat.class;
-    } else if (options.getFileLayout()
-        == SqoopOptions.FileLayout.SequenceFile) {
-      return SequenceFileOutputFormat.class;
-    } else if (options.getFileLayout()
-        == SqoopOptions.FileLayout.AvroDataFile) {
-      return AvroOutputFormat.class;
-    }
-
-    return null;
-  }
-
-  @Override
-  protected void configureInputFormat(Job job, String tableName,
-      String tableClassName, String splitByCol) throws IOException {
-    ConnManager mgr = getContext().getConnManager();
-    try {
-      String username = options.getUsername();
-      if (null == username || username.length() == 0) {
-        DBConfiguration.configureDB(job.getConfiguration(),
-            mgr.getDriverClass(), options.getConnectString(),
-            options.getFetchSize());
-      } else {
-        DBConfiguration.configureDB(job.getConfiguration(),
-            mgr.getDriverClass(), options.getConnectString(),
-            username, options.getPassword(), options.getFetchSize());
-      }
-
-      if (null != tableName) {
-        // Import a table.
-        String [] colNames = options.getColumns();
-        if (null == colNames) {
-          colNames = mgr.getColumnNames(tableName);
-        }
-
-        String [] sqlColNames = null;
-        if (null != colNames) {
-          sqlColNames = new String[colNames.length];
-          for (int i = 0; i < colNames.length; i++) {
-            sqlColNames[i] = mgr.escapeColName(colNames[i]);
-          }
-        }
-
-        // It's ok if the where clause is null in DBInputFormat.setInput.
-        String whereClause = options.getWhereClause();
-
-        // We can't set the class properly in here, because we may not have the
-        // jar loaded in this JVM. So we start by calling setInput() with
-        // DBWritable and then overriding the string manually.
-        DataDrivenDBInputFormat.setInput(job, DBWritable.class,
-            mgr.escapeTableName(tableName), whereClause,
-            mgr.escapeColName(splitByCol), sqlColNames);
-
-        // If user specified boundary query on the command line propagate it to
-        // the job
-        if(options.getBoundaryQuery() != null) {
-          DataDrivenDBInputFormat.setBoundingQuery(job.getConfiguration(),
-                  options.getBoundaryQuery());
-        }
-      } else {
-        // Import a free-form query.
-        String inputQuery = options.getSqlQuery();
-        String sanitizedQuery = inputQuery.replace(
-            DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
-
-        String inputBoundingQuery = options.getBoundaryQuery();
-
-        if(inputBoundingQuery == null) {
-          inputBoundingQuery =
-            mgr.getInputBoundsQuery(splitByCol, sanitizedQuery);
-          if (inputBoundingQuery == null) {
-            if (splitByCol != null) {
-              inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX("
-                      + splitByCol + ") FROM (" + sanitizedQuery + ") AS t1";
-            } else {
-              inputBoundingQuery = "";
-            }
-          }
-        }
-        DataDrivenDBInputFormat.setInput(job, DBWritable.class,
-            inputQuery, inputBoundingQuery);
-        new DBConfiguration(job.getConfiguration()).setInputOrderBy(
-            splitByCol);
-      }
-
-      LOG.debug("Using table class: " + tableClassName);
-      job.getConfiguration().set(ConfigurationHelper.getDbInputClassProperty(),
-          tableClassName);
-
-      job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY,
-          options.getInlineLobLimit());
-
-      LOG.debug("Using InputFormat: " + inputFormatClass);
-      job.setInputFormatClass(inputFormatClass);
-    } finally {
-      try {
-        mgr.close();
-      } catch (SQLException sqlE) {
-        LOG.warn("Error closing connection: " + sqlE);
-      }
-    }
-  }
 }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DelegatingOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DelegatingOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DelegatingOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/DelegatingOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,119 +18,15 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.Closeable;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
 import com.cloudera.sqoop.lib.FieldMappable;
-import com.cloudera.sqoop.lib.FieldMapProcessor;
-import com.cloudera.sqoop.lib.ProcessingException;
 
 /**
- * OutputFormat that produces a RecordReader which instantiates
- * a FieldMapProcessor which will process FieldMappable
- * output keys.
- *
- * <p>The output value is ignored.</p>
- *
- * <p>The FieldMapProcessor implementation may do any arbitrary
- * processing on the object. For example, it may write an object
- * to HBase, etc.</p>
- *
- * <p>If the FieldMapProcessor implementation also implements
- * Closeable, it will be close()'d in the RecordReader's close()
- * method.</p>
- *
- * <p>If the FMP implements Configurable, it will be configured
- * correctly via ReflectionUtils.</p>
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class DelegatingOutputFormat<K extends FieldMappable, V>
-    extends OutputFormat<K, V> {
+    extends org.apache.sqoop.mapreduce.DelegatingOutputFormat <K, V> {
 
-  /** conf key: the FieldMapProcessor class to instantiate. */
   public static final String DELEGATE_CLASS_KEY =
-      "sqoop.output.delegate.field.map.processor.class";
-
-  @Override
-  /** {@inheritDoc} */
-  public void checkOutputSpecs(JobContext context)
-      throws IOException, InterruptedException {
-    Configuration conf = context.getConfiguration();
-
-    if (null == conf.get(DELEGATE_CLASS_KEY)) {
-      throw new IOException("Delegate FieldMapProcessor class is not set.");
-    }
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    return new NullOutputCommitter();
-  }
-
-  @Override
-  /** {@inheritDoc} */
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-      throws IOException {
-    try {
-      return new DelegatingRecordWriter(context);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException(cnfe);
-    }
-  }
-
-  /**
-   * RecordWriter to write the output to a row in a database table.
-   * The actual database updates are executed in a second thread.
-   */
-  public class DelegatingRecordWriter extends RecordWriter<K, V> {
-
-    private Configuration conf;
-
-    private FieldMapProcessor mapProcessor;
-
-    public DelegatingRecordWriter(TaskAttemptContext context)
-        throws ClassNotFoundException {
-
-      this.conf = context.getConfiguration();
-
-      @SuppressWarnings("unchecked")
-      Class<? extends FieldMapProcessor> procClass =
-          (Class<? extends FieldMapProcessor>)
-          conf.getClass(DELEGATE_CLASS_KEY, null);
-      this.mapProcessor = ReflectionUtils.newInstance(procClass, this.conf);
-    }
-
-    protected Configuration getConf() {
-      return this.conf;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    public void close(TaskAttemptContext context)
-        throws IOException, InterruptedException {
-      if (mapProcessor instanceof Closeable) {
-        ((Closeable) mapProcessor).close();
-      }
-    }
+      org.apache.sqoop.mapreduce.DelegatingOutputFormat.DELEGATE_CLASS_KEY;
 
-    @Override
-    /** {@inheritDoc} */
-    public void write(K key, V value)
-        throws InterruptedException, IOException {
-      try {
-        mapProcessor.accept(key);
-      } catch (ProcessingException pe) {
-        throw new IOException(pe);
-      }
-    }
-  }
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,123 +18,12 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
 import com.cloudera.sqoop.lib.SqoopRecord;
 
 /**
- * This class uses batch mode to execute underlying statements instead of
- * using a single multirow insert statement as its superclass.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class ExportBatchOutputFormat<K extends SqoopRecord, V>
-    extends ExportOutputFormat<K, V> {
-
-  private static final Log LOG =
-      LogFactory.getLog(ExportBatchOutputFormat.class);
-
-  @Override
-  /** {@inheritDoc} */
-  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
-      throws IOException {
-    try {
-      return new ExportBatchRecordWriter(context);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * RecordWriter to write the output to a row in a database table.
-   * The actual database updates are executed in a second thread.
-   */
-  public class ExportBatchRecordWriter extends ExportRecordWriter {
-
-    public ExportBatchRecordWriter(TaskAttemptContext context)
-        throws ClassNotFoundException, SQLException {
-      super(context);
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    protected boolean isBatchExec() {
-      // We use batches here.
-      return true;
-    }
-
-    @Override
-    /** {@inheritDoc} */
-    protected PreparedStatement getPreparedStatement(
-        List<SqoopRecord> userRecords) throws SQLException {
-
-      PreparedStatement stmt = null;
-
-      // Synchronize on connection to ensure this does not conflict
-      // with the operations in the update thread.
-      Connection conn = getConnection();
-      synchronized (conn) {
-        stmt = conn.prepareStatement(getInsertStatement(userRecords.size()));
-      }
-
-      // Inject the record parameters into the VALUES clauses.
-      for (SqoopRecord record : userRecords) {
-        record.write(stmt, 0);
-        stmt.addBatch();
-      }
-
-      return stmt;
-    }
-
-    /**
-     * @return an INSERT statement.
-     */
-    protected String getInsertStatement(int numRows) {
-      StringBuilder sb = new StringBuilder();
-
-      sb.append("INSERT INTO " + tableName + " ");
-
-      int numSlots;
-      if (this.columnNames != null) {
-        numSlots = this.columnNames.length;
-
-        sb.append("(");
-        boolean first = true;
-        for (String col : columnNames) {
-          if (!first) {
-            sb.append(", ");
-          }
-
-          sb.append(col);
-          first = false;
-        }
-
-        sb.append(") ");
-      } else {
-        numSlots = this.columnCount; // set if columnNames is null.
-      }
-
-      sb.append("VALUES ");
-
-      // generates the (?, ?, ?...).
-      sb.append("(");
-      for (int i = 0; i < numSlots; i++) {
-        if (i != 0) {
-          sb.append(", ");
-        }
-
-        sb.append("?");
-      }
-      sb.append(")");
-
-      return sb.toString();
-    }
-  }
+    extends org.apache.sqoop.mapreduce.ExportBatchOutputFormat
+    <SqoopRecord,V> {
 }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportInputFormat.java?rev=1190489&r1=1190488&r2=1190489&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportInputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportInputFormat.java Fri Oct 28 18:22:16 2011
@@ -1,6 +1,4 @@
 /**
- * Copyright 2011 The Apache Software Foundation
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,104 +18,23 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 
 /**
- * InputFormat that generates a user-defined number of splits to inject data
- * into the database.
+ * @deprecated Moving to use org.apache.sqoop namespace.
  */
 public class ExportInputFormat
-   extends CombineFileInputFormat<LongWritable, Object> {
-
-  public static final Log LOG =
-     LogFactory.getLog(ExportInputFormat.class.getName());
-
-  public static final int DEFAULT_NUM_MAP_TASKS = 4;
-
-  public ExportInputFormat() {
-  }
+   extends org.apache.sqoop.mapreduce.ExportInputFormat {
 
-  /**
-   * @return the number of bytes across all files in the job.
-   */
-  private long getJobSize(JobContext job) throws IOException {
-    List<FileStatus> stats = listStatus(job);
-    long count = 0;
-    for (FileStatus stat : stats) {
-      count += stat.getLen();
-    }
-
-    return count;
-  }
-
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
-    // Set the max split size based on the number of map tasks we want.
-    long numTasks = getNumMapTasks(job);
-    long numFileBytes = getJobSize(job);
-    long maxSplitSize = numFileBytes / numTasks;
-
-    setMaxSplitSize(maxSplitSize);
-
-    LOG.debug("Target numMapTasks=" + numTasks);
-    LOG.debug("Total input bytes=" + numFileBytes);
-    LOG.debug("maxSplitSize=" + maxSplitSize);
-
-    List<InputSplit> splits =  super.getSplits(job);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Generated splits:");
-      for (InputSplit split : splits) {
-        LOG.debug("  " + split);
-      }
-    }
-    return splits;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public RecordReader createRecordReader(
-      InputSplit split, TaskAttemptContext context) throws IOException {
-
-    CombineFileSplit combineSplit = (CombineFileSplit) split;
-
-    // Use CombineFileRecordReader since this can handle CombineFileSplits
-    // and instantiate another RecordReader in a loop; do this with the
-    // CombineShimRecordReader.
-    RecordReader rr = new CombineFileRecordReader(combineSplit, context,
-        CombineShimRecordReader.class);
-
-    return rr;
-  }
+  public static final int DEFAULT_NUM_MAP_TASKS =
+      org.apache.sqoop.mapreduce.ExportInputFormat.DEFAULT_NUM_MAP_TASKS;
 
-  /**
-   * Allows the user to control the number of map tasks used for this
-   * export job.
-   */
   public static void setNumMapTasks(JobContext job, int numTasks) {
-    job.getConfiguration().setInt(ExportJobBase.EXPORT_MAP_TASKS_KEY, numTasks);
+    org.apache.sqoop.mapreduce.ExportInputFormat.setNumMapTasks(job, numTasks);
   }
 
-  /**
-   * @return the number of map tasks to use in this export job.
-   */
   public static int getNumMapTasks(JobContext job) {
-    return job.getConfiguration().getInt(ExportJobBase.EXPORT_MAP_TASKS_KEY,
-        DEFAULT_NUM_MAP_TASKS);
+    return org.apache.sqoop.mapreduce.ExportInputFormat.getNumMapTasks(job);
   }
 
 }



Mime
View raw message