sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r1177481 - in /incubator/sqoop/trunk/src: docs/man/ docs/user/ java/com/cloudera/sqoop/manager/ java/com/cloudera/sqoop/mapreduce/ test/com/cloudera/sqoop/ test/com/cloudera/sqoop/testutil/
Date Fri, 30 Sep 2011 01:57:59 GMT
Author: blee
Date: Fri Sep 30 01:57:59 2011
New Revision: 1177481

URL: http://svn.apache.org/viewvc?rev=1177481&view=rev
Log:
SQOOP-313: Support for multiple column names for update keys

(Arvind Prabhakar via Bilung Lee)

Modified:
    incubator/sqoop/trunk/src/docs/man/sqoop-export.txt
    incubator/sqoop/trunk/src/docs/user/export.txt
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
    incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
    incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java

Modified: incubator/sqoop/trunk/src/docs/man/sqoop-export.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/man/sqoop-export.txt?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/man/sqoop-export.txt (original)
+++ incubator/sqoop/trunk/src/docs/man/sqoop-export.txt Fri Sep 30 01:57:59 2011
@@ -40,8 +40,9 @@ Export control options
 --table (table-name)::
   The table to read (required)
 
---update-key (col-name)::
-  Anchor column to use for updates
+--update-key (col-names)::
+  Anchor column to use for updates. More than one column names may be specified
+  as a comma separated list of column names.
 
 --update-mode (mode)::
   Specify how updates are performed when new rows are found with non-matching keys
@@ -92,7 +93,7 @@ See 'sqoop(1)'
 
 ////
   Copyright 2011 The Apache Software Foundation
- 
+
   Licensed to the Apache Software Foundation (ASF) under one
   or more contributor license agreements.  See the NOTICE file
   distributed with this work for additional information
@@ -100,9 +101,9 @@ See 'sqoop(1)'
   to you under the Apache License, Version 2.0 (the
   "License"); you may not use this file except in compliance
   with the License.  You may obtain a copy of the License at
- 
+
       http://www.apache.org/licenses/LICENSE-2.0
- 
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

Modified: incubator/sqoop/trunk/src/docs/user/export.txt
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/docs/user/export.txt?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/docs/user/export.txt (original)
+++ incubator/sqoop/trunk/src/docs/user/export.txt Fri Sep 30 01:57:59 2011
@@ -1,7 +1,7 @@
 
 ////
   Copyright 2011 The Apache Software Foundation
- 
+
   Licensed to the Apache Software Foundation (ASF) under one
   or more contributor license agreements.  See the NOTICE file
   distributed with this work for additional information
@@ -9,9 +9,9 @@
   to you under the Apache License, Version 2.0 (the
   "License"); you may not use this file except in compliance
   with the License.  You may obtain a copy of the License at
- 
+
       http://www.apache.org/licenses/LICENSE-2.0
- 
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,7 +54,9 @@ Argument                                
 +-m,\--num-mappers <n>+                  Use 'n' map tasks to export in\
                                          parallel
 +\--table <table-name>+                  Table to populate
-+\--update-key <col-name>+               Anchor column to use for updates
++\--update-key <col-name>+               Anchor column to use for updates.\
+                                         Use a comma separated list of columns\
+                                         if there are more than one column.
 +\--update-mode <mode>+                  Specify how updates are performed\
                                          when new rows are found with\
                                          non-matching keys in database.
@@ -142,7 +144,7 @@ empty table intended to receive these re
 If you specify the +\--update-key+ argument, Sqoop will instead modify
 an existing dataset in the database. Each input record is treated as
 an +UPDATE+ statement that modifies an existing row. The row a
-statement modifies is determined by the column name specified with
+statement modifies is determined by the column name(s) specified with
 +\--update-key+. For example, consider the following table
 definition:
 
@@ -178,6 +180,10 @@ Likewise, if the column specified with +
 uniquely identify rows and multiple rows are updated by a single
 statement, this condition is also undetected.
 
+The argument +\--update-key+ can also be given a comma separated list of
+column names. In which case, Sqoop will match all keys from this list before
+updating any existing record.
+
 Depending on the target database, you may also specify the +\--update-mode+
 argument with +allowinsert+ mode if you want to update rows if they exist
 in the database already or insert rows if they do not exist yet.

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/ConnManager.java Fri Sep 30
01:57:59 2011
@@ -26,8 +26,12 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -300,18 +304,30 @@ public abstract class ConnManager {
     // last, because the UPDATE-based OutputFormat will generate the SET
     // clause followed by the WHERE clause, and the SqoopRecord needs to
     // serialize to this layout.
-    String updateKeyCol = options.getUpdateKeyCol();
+    Set<String> updateKeys = new LinkedHashSet<String>();
+    Set<String> updateKeysUppercase = new HashSet<String>();
+    String updateKeyValue = options.getUpdateKeyCol();
+    StringTokenizer stok = new StringTokenizer(updateKeyValue, ",");
+    while (stok.hasMoreTokens()) {
+      String nextUpdateColumn = stok.nextToken().trim();
+      if (nextUpdateColumn.length() > 0) {
+        updateKeys.add(nextUpdateColumn);
+        updateKeysUppercase.add(nextUpdateColumn.toUpperCase());
+      } else {
+        throw new RuntimeException("Invalid update key column value specified"
+                    + ": '" + updateKeyValue + "'");
+      }
+    }
     String [] allColNames = getColumnNames(options.getTableName());
     List<String> dbOutCols = new ArrayList<String>();
-    String upperCaseKeyCol = updateKeyCol.toUpperCase();
     for (String col : allColNames) {
-      if (!upperCaseKeyCol.equals(col.toUpperCase())) {
+      if (!updateKeysUppercase.contains(col.toUpperCase())) {
         dbOutCols.add(col); // add non-key columns to the output order list.
       }
     }
 
     // Then add the update key column last.
-    dbOutCols.add(updateKeyCol);
+    dbOutCols.addAll(updateKeys);
     options.setDbOutputColumns(dbOutCols.toArray(
         new String[dbOutCols.size()]));
   }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/manager/OracleManager.java Fri Sep 30
01:57:59 2011
@@ -32,9 +32,13 @@ import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -409,13 +413,26 @@ public class OracleManager extends Gener
     } else {
       // We're in upsert mode. We need to explicitly set
       // the database output column ordering in the codeGenerator.
-      String updateKeyCol = options.getUpdateKeyCol();
+      Set<String> updateKeys = new LinkedHashSet<String>();
+      Set<String> updateKeysUppercase = new HashSet<String>();
+      String updateKeyValue = options.getUpdateKeyCol();
+      StringTokenizer stok = new StringTokenizer(updateKeyValue, ",");
+      while (stok.hasMoreTokens()) {
+        String nextUpdateColumn = stok.nextToken().trim();
+        if (nextUpdateColumn.length() > 0) {
+          updateKeys.add(nextUpdateColumn);
+          updateKeysUppercase.add(nextUpdateColumn.toUpperCase());
+        }  else {
+          throw new RuntimeException("Invalid update key column value specified"
+              + ": '" + updateKeyValue + "'");
+        }
+      }
+
       String [] allColNames = getColumnNames(options.getTableName());
       List<String> dbOutCols = new ArrayList<String>();
-      dbOutCols.add(updateKeyCol);
-      String upperCaseKeyCol = updateKeyCol.toUpperCase();
+      dbOutCols.addAll(updateKeys);
       for (String col : allColNames) {
-        if (!upperCaseKeyCol.equals(col.toUpperCase())) {
+        if (!updateKeysUppercase.contains(col.toUpperCase())) {
           dbOutCols.add(col); // add update columns to the output order list.
         }
       }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/AvroExportMapper.java Fri
Sep 30 01:57:59 2011
@@ -1,4 +1,6 @@
 /**
+ * Copyright 2011 The Apache Software Foundation
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcExportJob.java Fri Sep
30 01:57:59 2011
@@ -20,12 +20,6 @@
 
 package com.cloudera.sqoop.mapreduce;
 
-import com.cloudera.sqoop.manager.ConnManager;
-import com.cloudera.sqoop.manager.ExportJobContext;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
-import com.cloudera.sqoop.orm.ClassWriter;
-
 import java.io.IOException;
 import java.util.Map;
 
@@ -39,6 +33,11 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
 
+import com.cloudera.sqoop.manager.ConnManager;
+import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
+
 /**
  * Run an export using JDBC (JDBC-based ExportOutputFormat).
  */

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpdateExportJob.java Fri
Sep 30 01:57:59 2011
@@ -21,19 +21,22 @@
 package com.cloudera.sqoop.mapreduce;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
 
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
 
 /**
  * Run an update-based export using JDBC (JDBC-based UpdateOutputFormat).
@@ -101,17 +104,34 @@ public class JdbcUpdateExportJob extends
             "Export column names could not be determined for " + tableName);
       }
 
-      String updateKeyCol = options.getUpdateKeyCol();
-      if (null == updateKeyCol) {
+      String updateKeyColumns = options.getUpdateKeyCol();
+      if (null == updateKeyColumns) {
         throw new IOException("Update key column not set in export job");
       }
+      // Update key columns lookup and removal
+      Set<String> updateKeys = new LinkedHashSet<String>();
+      Set<String> updateKeysUppercase = new HashSet<String>();
+      StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
+      while (stok.hasMoreTokens()) {
+        String nextUpdateKey = stok.nextToken().trim();
+        if (nextUpdateKey.length() > 0) {
+          updateKeys.add(nextUpdateKey);
+          updateKeysUppercase.add(nextUpdateKey.toUpperCase());
+        }  else {
+          throw new RuntimeException("Invalid update key column value specified"
+              + ": '" + updateKeyColumns + "'");
+        }
+      }
+
+      if (updateKeys.size() == 0) {
+        throw new IOException("Unpdate key columns not valid in export job");
+      }
 
       // Make sure we strip out the key column from this list.
-      String [] outColNames = new String[colNames.length - 1];
+      String [] outColNames = new String[colNames.length - updateKeys.size()];
       int j = 0;
-      String upperCaseKeyCol = updateKeyCol.toUpperCase();
       for (int i = 0; i < colNames.length; i++) {
-        if (!colNames[i].toUpperCase().equals(upperCaseKeyCol)) {
+        if (!updateKeysUppercase.contains(colNames[i].toUpperCase())) {
           outColNames[j++] = colNames[i];
         }
       }
@@ -119,7 +139,7 @@ public class JdbcUpdateExportJob extends
 
       job.setOutputFormatClass(getOutputFormatClass());
       job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
-      job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol);
+      job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyColumns);
     } catch (ClassNotFoundException cnfe) {
       throw new IOException("Could not load OutputFormat", cnfe);
     }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/JdbcUpsertExportJob.java Fri
Sep 30 01:57:59 2011
@@ -21,17 +21,19 @@
 package com.cloudera.sqoop.mapreduce;
 
 import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
-import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
 
 import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.manager.ExportJobContext;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
+import com.cloudera.sqoop.mapreduce.db.DBOutputFormat;
 
 /**
  * Run an update/insert export using JDBC (JDBC-based UpsertOutputFormat).
@@ -75,14 +77,30 @@ public class JdbcUpsertExportJob extends
       }
       DBOutputFormat.setOutput(job, tableName, colNames);
 
-      String updateKeyCol = options.getUpdateKeyCol();
-      if (null == updateKeyCol) {
+      String updateKeyColumns = options.getUpdateKeyCol();
+      if (null == updateKeyColumns) {
         throw new IOException("Update key column not set in export job");
       }
+      // Update key columns lookup and removal
+      Set<String> updateKeys = new LinkedHashSet<String>();
+      StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
+      while (stok.hasMoreTokens()) {
+        String nextUpdateKey = stok.nextToken().trim();
+        if (nextUpdateKey.length() > 0) {
+          updateKeys.add(nextUpdateKey);
+        } else {
+          throw new RuntimeException("Invalid update key column value specified"
+              + ": '" + updateKeyColumns + "'");
+        }
+      }
+
+      if (updateKeys.size() == 0) {
+        throw new IOException("Unpdate key columns not valid in export job");
+      }
 
       job.setOutputFormatClass(getOutputFormatClass());
       job.getConfiguration().set(SQOOP_EXPORT_TABLE_CLASS_KEY, tableClassName);
-      job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyCol);
+      job.getConfiguration().set(SQOOP_EXPORT_UPDATE_COL_KEY, updateKeyColumns);
     } catch (ClassNotFoundException cnfe) {
       throw new IOException("Could not load OutputFormat", cnfe);
     }

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
(original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/OracleUpsertOutputFormat.java
Fri Sep 30 01:57:59 2011
@@ -22,6 +22,8 @@ package com.cloudera.sqoop.mapreduce;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.LinkedHashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,17 +71,31 @@ public class OracleUpsertOutputFormat<K 
     protected String getUpdateStatement() {
       boolean first;
 
+      // lookup table for update columns
+      Set<String> updateKeyLookup = new LinkedHashSet<String>();
+      for (String updateKey : updateCols) {
+        updateKeyLookup.add(updateKey);
+      }
+
       StringBuilder sb = new StringBuilder();
       sb.append("MERGE INTO ");
       sb.append(tableName);
       sb.append(" USING dual ON ( ");
-      sb.append(updateCol);
-      sb.append(" = ? )");
+      first = true;
+      for (int i = 0; i < updateCols.length; i++) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(" AND ");
+        }
+        sb.append(updateCols[i]).append(" = ?");
+      }
+      sb.append(" )");
 
       sb.append("  WHEN MATCHED THEN UPDATE SET ");
       first = true;
       for (String col : columnNames) {
-        if (!col.equals(updateCol)) {
+        if (!updateKeyLookup.contains(col)) {
           if (first) {
             first = false;
           } else {

Modified: incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java (original)
+++ incubator/sqoop/trunk/src/java/com/cloudera/sqoop/mapreduce/UpdateOutputFormat.java Fri
Sep 30 01:57:59 2011
@@ -25,7 +25,10 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Arrays;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,9 +36,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 
 import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
 
 /**
  * Update an existing table of data with new value data.
@@ -90,7 +93,7 @@ public class UpdateOutputFormat<K extend
 
     protected String tableName;
     protected String [] columnNames; // The columns to update.
-    protected String updateCol; // The column containing the fixed key.
+    protected String [] updateCols; // The columns containing the fixed key.
 
     public UpdateRecordWriter(TaskAttemptContext context)
         throws ClassNotFoundException, SQLException {
@@ -101,7 +104,22 @@ public class UpdateOutputFormat<K extend
       DBConfiguration dbConf = new DBConfiguration(conf);
       this.tableName = dbConf.getOutputTableName();
       this.columnNames = dbConf.getOutputFieldNames();
-      this.updateCol = conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
+      String updateKeyColumns =
+          conf.get(ExportJobBase.SQOOP_EXPORT_UPDATE_COL_KEY);
+
+      Set<String> updateKeys = new LinkedHashSet<String>();
+      StringTokenizer stok = new StringTokenizer(updateKeyColumns, ",");
+      while (stok.hasMoreTokens()) {
+        String nextUpdateKey = stok.nextToken().trim();
+        if (nextUpdateKey.length() > 0) {
+          updateKeys.add(nextUpdateKey);
+        } else {
+          throw new RuntimeException("Invalid update key column value specified"
+              + ": '" + updateKeyColumns + "'");
+        }
+      }
+
+      updateCols = updateKeys.toArray(new String[updateKeys.size()]);
     }
 
     @Override
@@ -132,8 +150,8 @@ public class UpdateOutputFormat<K extend
     /**
      * @return the column we are using to determine the row to update.
      */
-    protected final String getUpdateCol() {
-      return updateCol;
+    protected final String[] getUpdateColumns() {
+      return updateCols;
     }
 
     @Override
@@ -182,8 +200,15 @@ public class UpdateOutputFormat<K extend
       }
 
       sb.append(" WHERE ");
-      sb.append(this.updateCol);
-      sb.append("=?");
+      first = true;
+      for (int i = 0; i < updateCols.length; i++) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(" AND ");
+        }
+        sb.append(updateCols[i]).append("=?");
+      }
       return sb.toString();
     }
   }

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestAvroImportExportRoundtrip.java Fri
Sep 30 01:57:59 2011
@@ -51,7 +51,8 @@ public class TestAvroImportExportRoundtr
 
     runImport(getOutputArgvForQuery(true));
     deleteTableData();
-    runExport(getExportArgvForQuery(true, 10, 10, newStrArray(argv, "-m", "" + 1)));
+    runExport(getExportArgvForQuery(true, 10, 10, newStrArray(argv, "-m",
+        "" + 1)));
 
     checkFirstColumnSum();
   }
@@ -104,7 +105,8 @@ public class TestAvroImportExportRoundtr
     }
 
     args.add("--query");
-    args.add("select * from " + HsqldbTestServer.getTableName() + " where $CONDITIONS");
+    args.add("select * from " + HsqldbTestServer.getTableName()
+        + " where $CONDITIONS");
     args.add("--connect");
     args.add(HsqldbTestServer.getUrl());
     args.add("--target-dir");

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/TestExportUpdate.java Fri Sep 30 01:57:59
2011
@@ -29,16 +29,14 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 
-import com.cloudera.sqoop.testutil.CommonArgs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
 
+import com.cloudera.sqoop.testutil.CommonArgs;
 import com.cloudera.sqoop.testutil.ExportJobTestCase;
 
-import org.junit.Before;
-
 /**
  * Test that we can update a copy of data in the database,
  * based on newer data in HDFS.
@@ -85,6 +83,115 @@ public class TestExportUpdate extends Ex
   }
 
   /**
+   * <p>Creates a table with three columns - A INT, B INT and C VARCHAR(32).
+   * This table is populated with records in a set of three with total records
+   * with the total number of unique values of A equal to the specified aMax
+   * value. For each value of A, there will be three records with value of
+   * B ranging from 0-2, and a corresponding value of C.</p>
+   * <p>For example if <tt>aMax = 2</tt>, the table will contain the
+   * following records:
+   * <pre>
+   *    A   |   B   |  C
+   * ----------------------
+   *    0   |   0   | 0foo0
+   *    0   |   1   | 0foo1
+   *    0   |   2   | 0foo2
+   *    1   |   0   | 1foo0
+   *    1   |   1   | 1foo1
+   *    1   |   2   | 1foo2
+   * </pre></p>
+   * @param firstKeyRange the number of
+   * @throws SQLException
+   */
+  private void createMultiKeyTable(int aMax) throws SQLException {
+    Connection conn = getConnection();
+
+    PreparedStatement statement = conn.prepareStatement(
+        "CREATE TABLE " + getTableName()
+        + " (A INT NOT NULL, B INT NOT NULL, C VARCHAR(32))");
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+      statement = null;
+    }
+
+    try {
+      for (int i = 0; i< aMax; i++) {
+        for (int j = 0; j < 3; j++) {
+          statement = conn.prepareStatement("INSERT INTO " + getTableName()
+              + " VALUES (" + i + ", " + j + ", '"
+              + i + "foo" + j + "')");
+          statement.executeUpdate();
+          statement.close();
+          statement = null;
+        }
+      }
+    } finally {
+      if (null != statement) {
+        statement.close();
+      }
+    }
+
+    conn.commit();
+  }
+
+  /**
+   * <p>Creates update files for multi-key update test. The total number of
+   * update records will be number of files times the number of aKeysPerFile
+   * times 3. Column A value will start with the specified <tt>startAtValue</tt>
+   * and for each value there will be three records corresponding to Column
+   * B values [0-2].</p>
+   * @param numFiles number of files to create
+   * @param aKeysPerFile number of records sets with different column A values
+   * @param startAtValue the starting value of column A
+   * @param bKeyValues the list of values for the column B
+   * @throws IOException
+   */
+  private void createMultiKeyUpdateFiles(int numFiles, int aKeysPerFile,
+      int startAtValue, int[] bKeyValues)
+      throws IOException {
+    Configuration conf = getConf();
+    if (!isOnPhysicalCluster()) {
+      conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
+    }
+    FileSystem fs = FileSystem.get(conf);
+
+    int aValue = startAtValue;
+    for (int i = 0; i < numFiles; i++) {
+      OutputStream os = fs.create(new Path(getTablePath(), "" + i + ".txt"));
+      BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
+
+      for (int j = 0; j < aKeysPerFile; j++) {
+        for (int k = 0; k < bKeyValues.length; k++) {
+          w.write(getUpdateStringForMultiKeyRow(aValue, bKeyValues[k]));
+        }
+        aValue++;
+      }
+
+      w.close();
+      os.close();
+    }
+  }
+
+  /**
+   * Generate a string of text representing an update for one row
+   * of the multi-key table. The values of columns A and B are given
+   * and the value of column C is generated as <em>a</em>bar<em>b</em>.
+   * @param a the value of column a
+   * @param b the value of column b
+   */
+  private String getUpdateStringForMultiKeyRow(int a, int b) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(a).append("\t").append(b).append("\t").append(a);
+    sb.append("bar").append(b).append("\n");
+
+    return sb.toString();
+  }
+
+
+  /**
    * Create a set of files that will be used as the input to the update
    * process.
    * @param numFiles the number of files to generate
@@ -196,6 +303,60 @@ public class TestExportUpdate extends Ex
     }
   }
 
+  private void verifyMultiKeyRow(String[] keyColumnNames, int[] keyValues,
+      Object ...expectedVals) throws SQLException {
+    StringBuilder querySb = new StringBuilder("SELECT A, B, C FROM ");
+    querySb.append(getTableName()).append(" WHERE ");
+    boolean first = true;
+    for (int i = 0; i< keyColumnNames.length; i++) {
+      if (first) {
+        first = false;
+      } else {
+        querySb.append(" AND ");
+      }
+      querySb.append(keyColumnNames[i]).append(" = ");
+      querySb.append(keyValues[i]);
+    }
+
+    String query = querySb.toString();
+    PreparedStatement statement = null;
+    ResultSet rs = null;
+
+    try {
+      Connection conn = getConnection();
+      statement = conn.prepareStatement(query);
+      rs = statement.executeQuery();
+
+      boolean success = rs.next();
+      assertTrue("Expected at least one output record", success);
+
+      // Assert that all three columns have the correct values.
+      for (int i = 0; i < expectedVals.length; i++) {
+        String expected = expectedVals[i].toString();
+        String result = rs.getString(i + 1);
+        assertEquals("Invalid response for column " + i + "; got " + result
+            + " when expected " + expected, expected, result);
+      }
+
+      // This query should have returned exactly one row.
+      success = rs.next();
+      assertFalse("Expected no more than one output record", success);
+    } finally {
+      if (null != rs) {
+        try {
+          rs.close();
+        } catch (SQLException sqle) {
+          LOG.error("Error closing result set: "
+              + StringUtils.stringifyException(sqle));
+        }
+      }
+
+      if (null != statement) {
+        statement.close();
+      }
+    }
+  }
+
   /**
    * Verify that a particular row has the expected values.
    */
@@ -260,6 +421,130 @@ public class TestExportUpdate extends Ex
     verifyRow("A", "9", "9", "foo18", "18");
   }
 
+  /**
+   * Creates a table with two columns that together act as unique keys
+   * and then modifies a subset of the rows via update.
+   * @throws Exception
+   */
+  public void testMultiKeyUpdate() throws Exception {
+    createMultiKeyTable(3);
+
+    createMultiKeyUpdateFiles(1, 1, 1, new int[] {0, 1, 3});
+
+    runExport(getArgv(true, 2, 2, "-m", "1",
+        "--update-key", "A,B"));
+    verifyRowCount(9);
+    // Check a few rows...
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 0, 0 }, 0, 0, "0foo0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 0, 1 }, 0, 1, "0foo1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 0, 2 }, 0, 2, "0foo2");
+
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 1, 0 }, 1, 0, "1bar0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 1, 1 }, 1, 1, "1bar1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 1, 2 }, 1, 2, "1foo2");
+
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 2, 0 }, 2, 0, "2foo0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 2, 1 }, 2, 1, "2foo1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 2, 2 }, 2, 2, "2foo2");
+
+  }
+
+  /**
+   * Creates a table with two columns that together act as unique keys
+   * and then modifies a subset of the rows via update.
+   * @throws Exception
+   */
+  public void testMultiKeyUpdateMultipleFilesNoUpdate() throws Exception {
+    createMultiKeyTable(4);
+
+    createMultiKeyUpdateFiles(2, 1, 1, new int[] {3, 4, 5});
+
+    runExport(getArgv(true, 2, 2, "-m", "1",
+        "--update-key", "A,B"));
+    verifyRowCount(12);
+    // Check a few rows...
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 0, 0 }, 0, 0, "0foo0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 0, 1 }, 0, 1, "0foo1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 0, 2 }, 0, 2, "0foo2");
+
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 1, 0 }, 1, 0, "1foo0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 1, 1 }, 1, 1, "1foo1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 1, 2 }, 1, 2, "1foo2");
+
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 2, 0 }, 2, 0, "2foo0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 2, 1 }, 2, 1, "2foo1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 2, 2 }, 2, 2, "2foo2");
+
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 3, 0 }, 3, 0, "3foo0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 3, 1 }, 3, 1, "3foo1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 3, 2 }, 3, 2, "3foo2");
+  }
+
+  /**
+   * Creates a table with two columns that together act as unique keys
+   * and then modifies a subset of the rows via update.
+   * @throws Exception
+   */
+  public void testMultiKeyUpdateMultipleFilesFullUpdate() throws Exception {
+    createMultiKeyTable(4);
+
+    createMultiKeyUpdateFiles(2, 2, 0, new int[] {0, 1, 2});
+
+    runExport(getArgv(true, 2, 2, "-m", "1",
+        "--update-key", "A,B"));
+    verifyRowCount(12);
+    // Check a few rows...
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 0, 0 }, 0, 0, "0bar0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 0, 1 }, 0, 1, "0bar1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 0, 2 }, 0, 2, "0bar2");
+
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 1, 0 }, 1, 0, "1bar0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 1, 1 }, 1, 1, "1bar1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 1, 2 }, 1, 2, "1bar2");
+
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 2, 0 }, 2, 0, "2bar0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 2, 1 }, 2, 1, "2bar1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 2, 2 }, 2, 2, "2bar2");
+
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 3, 0 }, 3, 0, "3bar0");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 3, 1 }, 3, 1, "3bar1");
+    verifyMultiKeyRow(new String[] { "A", "B"},
+        new int[] { 3, 2 }, 3, 2, "3bar2");
+  }
+
+
   public void testEmptyTable() throws Exception {
     // Test that an empty table will "accept" updates that modify
     // no rows; no new data is injected into the database.

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java Fri
Sep 30 01:57:59 2011
@@ -135,7 +135,7 @@ public abstract class BaseSqoopTestCase 
   protected ConnManager getManager() {
     return manager;
   }
-  
+
 
   /**
    * @return a connection to the database under test.

Modified: incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java
URL: http://svn.apache.org/viewvc/incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java?rev=1177481&r1=1177480&r2=1177481&view=diff
==============================================================================
--- incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java (original)
+++ incubator/sqoop/trunk/src/test/com/cloudera/sqoop/testutil/ExportJobTestCase.java Fri
Sep 30 01:57:59 2011
@@ -45,7 +45,7 @@ public abstract class ExportJobTestCase 
 
   public static final Log LOG = LogFactory.getLog(
       ExportJobTestCase.class.getName());
-  
+
   @Before
   public void setUp() {
     // start the server



Mime
View raw message