sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1159773 - in /incubator/sqoop/trunk/src: docs/man/ docs/user/ java/com/cloudera/sqoop/ java/com/cloudera/sqoop/manager/ java/com/cloudera/sqoop/mapreduce/ java/com/cloudera/sqoop/tool/ test/com/cloudera/sqoop/manager/
Date Fri, 19 Aug 2011 20:27:12 GMT
Author: arvind
Date: Fri Aug 19 20:27:11 2011
New Revision: 1159773

URL: http://svn.apache.org/viewvc?rev=1159773&view=rev
Log:
SQOOP-314. Support for batch insert.

(Bilung Lee via Arvind Prabhakar)

Added:
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java
Modified:
    incubator/sqoop/trunk/src/docs/man/sqoop-export.txt
    incubator/sqoop/trunk/src/docs/user/export.txt
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SQLServerManager.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java

Modified: incubator/sqoop/trunk/src/docs/man/sqoop-export.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/man/sqoop-export.txt?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/man/sqoop-export.txt (original)
+++ incubator/sqoop/trunk/src/docs/man/sqoop-export.txt Fri Aug 19 20:27:11 2011
@@ -55,6 +55,10 @@ Export control options
 --clear-staging-table::
   Will result in deletion of any data that exists in the staging table.
 
+--batch::
+  Use batch mode for underlying statement execution. This is useful, for example,
+  for those databases that do not support multirow insert in a single statement yet.
+
 include::input-args.txt[]
 
 include::output-args.txt[]

Modified: incubator/sqoop/trunk/src/docs/user/export.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/user/export.txt?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/user/export.txt (original)
+++ incubator/sqoop/trunk/src/docs/user/export.txt Fri Aug 19 20:27:11 2011
@@ -61,6 +61,8 @@ Argument                                
                                          the destination table.
 +\--clear-staging-table+                 Indicates that any data present in\
                                          the staging table can be deleted.
++\--batch+                               Use batch mode for underlying\
+                                         statement execution.
 ------------------------------------------------------------------------
 
 The +\--table+ and +\--export-dir+ arguments are required. These

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/SqoopOptions.java Fri Aug 19 20:27:11
2011
@@ -143,6 +143,7 @@ public class SqoopOptions implements Clo
   @StoredAsProperty("hdfs.append.dir") private boolean append;
   @StoredAsProperty("hdfs.file.format") private FileLayout layout;
   @StoredAsProperty("direct.import") private boolean direct; // "direct mode."
+  @StoredAsProperty("db.batch") private boolean batchMode;
   private String tmpDir; // where temp data goes; usually /tmp; not serialized.
   private String hiveHome; // not serialized to metastore.
   @StoredAsProperty("hive.import") private boolean hiveImport;
@@ -1015,6 +1016,18 @@ public class SqoopOptions implements Clo
   }
 
   /**
+   * @return true if underlying statements to be executed in batch mode,
+   * or false if to be executed in a single multirow statement.
+   */
+  public boolean isBatchMode() {
+    return batchMode;
+  }
+
+  public void setBatchMode(boolean mode) {
+    this.batchMode = mode;
+  }
+
+  /**
    * @return the number of map tasks to use for import.
    */
   public int getNumMappers() {

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java Fri Aug 19
20:27:11 2011
@@ -37,8 +37,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
 import com.cloudera.sqoop.mapreduce.JdbcExportJob;
-import com.cloudera.sqoop.mapreduce.OracleExportOutputFormat;
 import com.cloudera.sqoop.mapreduce.db.OracleDataDrivenDBInputFormat;
 import com.cloudera.sqoop.util.ExportException;
 import com.cloudera.sqoop.util.ImportException;
@@ -377,7 +377,7 @@ public class OracleManager extends Gener
       throws IOException, ExportException {
     context.setConnManager(this);
     JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
-                                  OracleExportOutputFormat.class);
+                                  ExportBatchOutputFormat.class);
     exportJob.runExport();
   }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SQLServerManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SQLServerManager.java?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SQLServerManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/SQLServerManager.java Fri Aug
19 20:27:11 2011
@@ -24,8 +24,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.mapreduce.ExportBatchOutputFormat;
 import com.cloudera.sqoop.mapreduce.JdbcExportJob;
-import com.cloudera.sqoop.mapreduce.SQLServerExportOutputFormat;
 import com.cloudera.sqoop.util.ExportException;
 
 /**
@@ -53,7 +53,7 @@ public class SQLServerManager extends Ge
       throws IOException, ExportException {
     context.setConnManager(this);
     JdbcExportJob exportJob = new JdbcExportJob(context, null, null,
-      SQLServerExportOutputFormat.class);
+      ExportBatchOutputFormat.class);
     exportJob.runExport();
   }
 

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java
(original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AsyncSqlRecordWriter.java
Fri Aug 19 20:27:11 2011
@@ -175,10 +175,9 @@ public abstract class AsyncSqlRecordWrit
     try {
       try {
         execUpdate(true, true);
+        execThread.join();
       } catch (SQLException sqle) {
         throw new IOException(sqle);
-      } finally {
-        execThread.join();
       }
 
       // If we're not leaving on an error return path already,

Added: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java?rev=1159773&view=auto
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java
(added)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportBatchOutputFormat.java
Fri Aug 19 20:27:11 2011
@@ -0,0 +1,140 @@
+/**
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.cloudera.sqoop.lib.SqoopRecord;
+
+/**
+ * This class uses batch mode to execute underlying statements instead of
+ * using a single multirow insert statement as its superclass.
+ */
+public class ExportBatchOutputFormat<K extends SqoopRecord, V>
+    extends ExportOutputFormat<K, V> {
+
+  private static final Log LOG =
+      LogFactory.getLog(ExportBatchOutputFormat.class);
+
+  @Override
+  /** {@inheritDoc} */
+  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
+      throws IOException {
+    try {
+      return new ExportBatchRecordWriter(context);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * RecordWriter to write the output to a row in a database table.
+   * The actual database updates are executed in a second thread.
+   */
+  public class ExportBatchRecordWriter extends ExportRecordWriter {
+
+    public ExportBatchRecordWriter(TaskAttemptContext context)
+        throws ClassNotFoundException, SQLException {
+      super(context);
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    protected boolean isBatchExec() {
+      // We use batches here.
+      return true;
+    }
+
+    @Override
+    /** {@inheritDoc} */
+    protected PreparedStatement getPreparedStatement(
+        List<SqoopRecord> userRecords) throws SQLException {
+
+      PreparedStatement stmt = null;
+
+      // Synchronize on connection to ensure this does not conflict
+      // with the operations in the update thread.
+      Connection conn = getConnection();
+      synchronized (conn) {
+        stmt = conn.prepareStatement(getInsertStatement(userRecords.size()));
+      }
+
+      // Inject the record parameters into the VALUES clauses.
+      for (SqoopRecord record : userRecords) {
+        record.write(stmt, 0);
+        stmt.addBatch();
+      }
+
+      return stmt;
+    }
+
+    /**
+     * @return an INSERT statement.
+     */
+    protected String getInsertStatement(int numRows) {
+      StringBuilder sb = new StringBuilder();
+
+      sb.append("INSERT INTO " + tableName + " ");
+
+      int numSlots;
+      if (this.columnNames != null) {
+        numSlots = this.columnNames.length;
+
+        sb.append("(");
+        boolean first = true;
+        for (String col : columnNames) {
+          if (!first) {
+            sb.append(", ");
+          }
+
+          sb.append(col);
+          first = false;
+        }
+
+        sb.append(") ");
+      } else {
+        numSlots = this.columnCount; // set if columnNames is null.
+      }
+
+      sb.append("VALUES ");
+
+      // generates the (?, ?, ?...).
+      sb.append("(");
+      for (int i = 0; i < numSlots; i++) {
+        if (i != 0) {
+          sb.append(", ");
+        }
+
+        sb.append("?");
+      }
+      sb.append(")");
+
+      return sb.toString();
+    }
+  }
+}

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportJobBase.java Fri Aug
19 20:27:11 2011
@@ -224,7 +224,11 @@ public class ExportJobBase extends JobBa
       throws ClassNotFoundException {
     Class<? extends OutputFormat> configuredOF = super.getOutputFormatClass();
     if (null == configuredOF) {
-      return ExportOutputFormat.class;
+      if (!options.isBatchMode()) {
+        return ExportOutputFormat.class;
+      } else {
+        return ExportBatchOutputFormat.class;
+      }
     } else {
       return configuredOF;
     }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/ExportOutputFormat.java Fri
Aug 19 20:27:11 2011
@@ -87,9 +87,9 @@ public class ExportOutputFormat<K extend
    */
   public class ExportRecordWriter extends AsyncSqlRecordWriter<K, V> {
 
-    private String tableName;
-    private String [] columnNames; // The columns to insert into.
-    private int columnCount; // If columnNames is null, tells ## of cols.
+    protected String tableName;
+    protected String [] columnNames; // The columns to insert into.
+    protected int columnCount; // If columnNames is null, tells ## of cols.
 
     public ExportRecordWriter(TaskAttemptContext context)
         throws ClassNotFoundException, SQLException {

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/BaseSqoopTool.java Fri Aug 19 20:27:11
2011
@@ -74,6 +74,7 @@ public abstract class BaseSqoopTool exte
   public static final String PASSWORD_ARG = "password";
   public static final String PASSWORD_PROMPT_ARG = "P";
   public static final String DIRECT_ARG = "direct";
+  public static final String BATCH_ARG = "batch";
   public static final String TABLE_ARG = "table";
   public static final String STAGING_TABLE_ARG = "staging-table";
   public static final String CLEAR_STAGING_TABLE_ARG = "clear-staging-table";

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/tool/ExportTool.java Fri Aug 19 20:27:11
2011
@@ -169,6 +169,11 @@ public class ExportTool extends BaseSqoo
         + "staging table can be deleted")
         .withLongOpt(CLEAR_STAGING_TABLE_ARG)
         .create());
+    exportOpts.addOption(OptionBuilder
+        .withDescription("Indicates underlying statements "
+        + "to be executed in batch mode")
+        .withLongOpt(BATCH_ARG)
+        .create());
 
     return exportOpts;
   }
@@ -220,6 +225,10 @@ public class ExportTool extends BaseSqoo
         out.setDirectMode(true);
       }
 
+      if (in.hasOption(BATCH_ARG)) {
+        out.setBatchMode(true);
+      }
+
       if (in.hasOption(TABLE_ARG)) {
         out.setTableName(in.getOptionValue(TABLE_ARG));
       }

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java?rev=1159773&r1=1159772&r2=1159773&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/manager/JdbcMySQLExportTest.java Fri
Aug 19 20:27:11 2011
@@ -18,6 +18,7 @@
 
 package com.cloudera.sqoop.manager;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 
@@ -142,4 +143,27 @@ public class JdbcMySQLExportTest extends
     return super.getArgv(includeHadoopFlags, rowsPerStatement,
         statementsPerTx, subArgv);
   }
+
+  public void testIntColInBatchMode() throws IOException, SQLException {
+    final int TOTAL_RECORDS = 10;
+
+    // generate a column equivalent to rownum.
+    ColumnGenerator gen = new ColumnGenerator() {
+      public String getExportText(int rowNum) {
+        return "" + rowNum;
+      }
+      public String getVerifyText(int rowNum) {
+        return "" + rowNum;
+      }
+      public String getType() {
+        return "INTEGER";
+      }
+    };
+
+    createTextFile(0, TOTAL_RECORDS, false, gen);
+    createTable(gen);
+    runExport(getArgv(true, 10, 10, "--batch"));
+    verifyExport(TOTAL_RECORDS);
+    assertColMinAndMax(forIdx(0), gen);
+  }
 }



Mime
View raw message