sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject sqoop git commit: SQOOP-2443: Sqoop2: Generic JDBC: Properly detect compound primary keys in GenericJdbcExecutor
Date Wed, 12 Aug 2015 21:55:57 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 927c72d15 -> ec0544c6f


SQOOP-2443: Sqoop2: Generic JDBC: Properly detect compound primary keys in GenericJdbcExecutor

(Jarek Jarcec Cecho via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ec0544c6
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ec0544c6
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ec0544c6

Branch: refs/heads/sqoop2
Commit: ec0544c6f5436abeb6880ec2929eb4190dd94069
Parents: 927c72d
Author: Abraham Elmahrek <abe@apache.org>
Authored: Wed Aug 12 14:54:54 2015 -0700
Committer: Abraham Elmahrek <abe@apache.org>
Committed: Wed Aug 12 14:54:54 2015 -0700

----------------------------------------------------------------------
 .../error/code/GenericJdbcConnectorError.java   |  4 ++
 .../connector/jdbc/GenericJdbcExecutor.java     | 56 +++++++++++++++++---
 .../jdbc/GenericJdbcFromInitializer.java        | 10 +++-
 .../connector/jdbc/GenericJdbcExecutorTest.java | 13 ++++-
 docs/src/site/sphinx/Connectors.rst             |  2 +-
 5 files changed, 74 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ec0544c6/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
b/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
index f18acbd..9a9bb66 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
@@ -89,6 +89,10 @@ public enum GenericJdbcConnectorError implements ErrorCode {
 
   GENERIC_JDBC_CONNECTOR_0023("Received error from the database"),
 
+  GENERIC_JDBC_CONNECTOR_0024("Multiple tables of the same name in different schema/catalog"),
+
+  GENERIC_JDBC_CONNECTOR_0025("No primary key"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ec0544c6/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index 1aeca7e..3770e07 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -38,9 +38,12 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
+import java.util.AbstractMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * Database executor that is based on top of JDBC spec.
@@ -366,24 +369,63 @@ public class GenericJdbcExecutor {
    *                    * (schema, table)
    *                    * (table)
    *                    Return value of any combination is "undefined".
-   * @return Primary key's name
+   * @return All columns that are consisting of tables primary key (in order)
    */
-  public String getPrimaryKey(String ...identifiers) {
+  public String[] getPrimaryKey(String ...identifiers) {
     int index = 0;
     String catalog = identifiers.length >= 3 ? identifiers[index++] : null;
     String schema = identifiers.length >= 2 ? identifiers[index++] : null;
     String table = identifiers[index];
 
+    /* Using the getPrimaryKeys call have few challenges that we're protecting ourselves
against here:
+     *
+     * 1) Call to getPrimaryKeys() returns columns ordered by COLUMN_NAME and not by KEY_SEQ.
Therefore
+     * we have to manually re-order them in order that make sense to us (e.g. by KEY_SEQ).
+     *
+     * 2) If we run the search with catalog and schema arguments set to NULL (e.g. we're
searching only
+     * by table name), we'll get all tables with given name. This is a problem in case that
users will have
+     * the same table name in multiple schemas (or catalogs). As we don't want users to force
remembering
+     * what is the default catalog and schema name for their tables, we've chosen more defensive
approach -
+     * we'll search only by table name and detect if we found two different tables, only
in this case we'll
+     * error out requesting user to specify which schema we need to use.
+     */
+    List<AbstractMap.SimpleEntry<String, Short>> primaryKeyColumns = new LinkedList<>();
+    Set<String> catalogNames = new HashSet<>();
+    Set<String> schemaNames = new HashSet<>();
+
     try {
-      DatabaseMetaData dbmd = connection.getMetaData();
-      ResultSet rs = dbmd.getPrimaryKeys(catalog, schema, table);
+      ResultSet rs = connection.getMetaData().getPrimaryKeys(catalog, schema, table);
+      assert rs != null;
+
+      // Load data from the getPrimaryKeys() call
+      while(rs.next()) {
+        primaryKeyColumns.add(new AbstractMap.SimpleEntry<>(
+          rs.getString("COLUMN_NAME"),
+          rs.getShort("KEY_SEQ")
+        ));
+        catalogNames.add(rs.getString("TABLE_CAT"));
+        schemaNames.add(rs.getString("TABLE_SCHEM"));
+      }
 
-      if (rs != null && rs.next()) {
-        return rs.getString("COLUMN_NAME");
+      // Verification
+      if(catalogNames.size() > 1 || schemaNames.size() > 1) {
+        throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0024,
+          "For search (" + catalog + ", " + schema + ", " + table + ") we found the table
in catalogs [" + StringUtils.join(catalogNames, ", ") + "] and schemas [" + StringUtils.join(schemaNames,
", ") + "]");
+      }
 
-      } else {
+      // Few shortcuts so that we don't have run full loop
+      if(primaryKeyColumns.isEmpty()) {
         return null;
+      } else if(primaryKeyColumns.size() == 1){
+        return new String[] {primaryKeyColumns.get(0).getKey()};
+      }
+
+      // Properly sort the columns by KEY_SEQ and return result
+      String [] ret = new String[primaryKeyColumns.size()];
+      for(AbstractMap.SimpleEntry<String, Short> entry : primaryKeyColumns) {
+        ret[entry.getValue() - 1] = entry.getKey();
       }
+      return ret;
 
     } catch (SQLException e) {
       logSQLException(e);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ec0544c6/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
index 9d8e4e7..8bf7b6e 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
@@ -135,7 +135,15 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration,
F
     String partitionColumnName = jobConf.fromJobConfig.partitionColumn;
     // If it's not specified, we can use primary key of given table (if it's table based
import)
     if (StringUtils.isBlank(partitionColumnName) && tableImport) {
-        partitionColumnName = executor.getPrimaryKey(jobConf.fromJobConfig.schemaName, jobConf.fromJobConfig.tableName);
+      String [] primaryKeyColumns = executor.getPrimaryKey(jobConf.fromJobConfig.schemaName,
jobConf.fromJobConfig.tableName);
+      LOG.info("Found primary key columns [" + StringUtils.join(primaryKeyColumns, ", ")
+ "]");
+      if(primaryKeyColumns == null) {
+        throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0025, "Please
specify partition column.");
+      } else if (primaryKeyColumns.length > 1) {
+        LOG.warn("Table have compound primary key, for partitioner we're using only first
column of the key: " + primaryKeyColumns[0]);
+      }
+
+      partitionColumnName = primaryKeyColumns[0];
     }
     // If we don't have partition column name, we will error out
     if (partitionColumnName != null) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ec0544c6/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
index a482ac4..59c12f3 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
@@ -35,6 +35,7 @@ public class GenericJdbcExecutorTest {
   private final String table;
   private final String emptyTable;
   private final String schema;
+  private final String compoundPrimaryKeyTable;
   private GenericJdbcExecutor executor;
 
   private static final int START = -10;
@@ -44,6 +45,7 @@ public class GenericJdbcExecutorTest {
     table = getClass().getSimpleName().toUpperCase();
     emptyTable = table + "_EMPTY";
     schema = table + "_SCHEMA";
+    compoundPrimaryKeyTable = table + "_COMPOUND";
   }
 
   @BeforeMethod(alwaysRun = true)
@@ -53,6 +55,7 @@ public class GenericJdbcExecutorTest {
     executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(emptyTable )+ "(ICOL
INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
     executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(table) + "(ICOL INTEGER
PRIMARY KEY, VCOL VARCHAR(20))");
     executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifiers(schema, table) +
"(ICOL INTEGER PRIMARY KEY, VCOL VARCHAR(20))");
+    executor.executeUpdate("CREATE TABLE " + executor.encloseIdentifier(compoundPrimaryKeyTable)
+ "(ICOL INTEGER, VCOL VARCHAR(20), PRIMARY KEY(VCOL, ICOL))");
 
     for (int i = 0; i < NUMBER_OF_ROWS; i++) {
       int value = START + i;
@@ -90,8 +93,14 @@ public class GenericJdbcExecutorTest {
     assertNull(executor.getPrimaryKey("non-existing-schema", "non-existing-table"));
     assertNull(executor.getPrimaryKey("non-existing-catalog", "non-existing-schema", "non-existing-table"));
 
-    assertEquals(executor.getPrimaryKey(table), "ICOL");
-    assertEquals(executor.getPrimaryKey(schema, table), "ICOL");
+    assertEquals(executor.getPrimaryKey(schema, table), new String[] {"ICOL"});
+    assertEquals(executor.getPrimaryKey(compoundPrimaryKeyTable), new String[] {"VCOL", "ICOL"});
+  }
+
+  @Test(expectedExceptions = SqoopException.class)
+  public void TestGetPrimaryKeySameTableInMultipleSchemas() {
+    // Same table name exists in two schemas and therefore we should fail here
+    executor.getPrimaryKey(table);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ec0544c6/docs/src/site/sphinx/Connectors.rst
----------------------------------------------------------------------
diff --git a/docs/src/site/sphinx/Connectors.rst b/docs/src/site/sphinx/Connectors.rst
index af54467..41571ba 100644
--- a/docs/src/site/sphinx/Connectors.rst
+++ b/docs/src/site/sphinx/Connectors.rst
@@ -80,7 +80,7 @@ Inputs associated with the Job configuration for the FROM direction include:
 |                             |         | *Optional* Comma separated list of columns.   
                         |                                             |
 +-----------------------------+---------+-------------------------------------------------------------------------+---------------------------------------------+
 | Partition column name       | Map     | The column name used to partition the data transfer
process.            | col1                                        |
-|                             |         | *Optional*.  Defaults to primary key of table.
                         |                                             |
+|                             |         | *Optional*.  Defaults to table's first column of
primary key.           |                                             |
 +-----------------------------+---------+-------------------------------------------------------------------------+---------------------------------------------+
 | Null value allowed for      | Boolean | True or false depending on whether NULL values
are allowed in data      | true                                        |
 | the partition column        |         | of the Partition column. *Optional*.          
                         |                                             |


Mime
View raw message