sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject svn commit: r1309268 - in /sqoop/trunk/src: java/org/apache/sqoop/mapreduce/ test/com/cloudera/sqoop/ test/com/cloudera/sqoop/manager/
Date Wed, 04 Apr 2012 06:58:53 GMT
Author: jarcec
Date: Wed Apr  4 06:58:52 2012
New Revision: 1309268

URL: http://svn.apache.org/viewvc?rev=1309268&view=rev
Log:
SQOOP-468. Oracle free form queries fail.

(Cheolsoo Park via Jarek Jarcec Cecho)

Added:
    sqoop/trunk/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java
    sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java
    sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java
Modified:
    sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java

Modified: sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java?rev=1309268&r1=1309267&r2=1309268&view=diff
==============================================================================
--- sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java (original)
+++ sqoop/trunk/src/java/org/apache/sqoop/mapreduce/DataDrivenImportJob.java Wed Apr  4 06:58:52
2012
@@ -111,6 +111,36 @@ public class DataDrivenImportJob extends
     return null;
   }
 
+  /**
+   * Build the boundary query for the column of the result set created by
+   * the given query.
+   * @param col column name whose boundaries we're interested in.
+   * @param query sub-query used to create the result set.
+   * @return input boundary query as a string
+   */
+  private String buildBoundaryQuery(String col, String query) {
+    if (col == null) {
+      return "";
+    }
+
+    // Replace table name with alias 't1' if column name is a fully
+    // qualified name.  This is needed because "tableName"."columnName"
+    // in the input boundary query causes a SQL syntax error in most dbs
+    // including Oracle and MySQL.
+    String alias = "t1";
+    int dot = col.lastIndexOf('.');
+    String qualifiedName = (dot == -1) ? col : alias + col.substring(dot);
+
+    ConnManager mgr = getContext().getConnManager();
+    String ret = mgr.getInputBoundsQuery(qualifiedName, query);
+    if (ret != null) {
+      return ret;
+    }
+
+    return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
+        + "FROM (" + query + ") AS " + alias;
+  }
+
   @Override
   protected void configureInputFormat(Job job, String tableName,
       String tableClassName, String splitByCol) throws IOException {
@@ -165,18 +195,8 @@ public class DataDrivenImportJob extends
             DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
 
         String inputBoundingQuery = options.getBoundaryQuery();
-
         if (inputBoundingQuery == null) {
-          inputBoundingQuery =
-            mgr.getInputBoundsQuery(splitByCol, sanitizedQuery);
-          if (inputBoundingQuery == null) {
-            if (splitByCol != null) {
-              inputBoundingQuery = "SELECT MIN(" + splitByCol + "), MAX("
-                      + splitByCol + ") FROM (" + sanitizedQuery + ") AS t1";
-            } else {
-              inputBoundingQuery = "";
-            }
-          }
+          inputBoundingQuery = buildBoundaryQuery(splitByCol, sanitizedQuery);
         }
         DataDrivenDBInputFormat.setInput(job, DBWritable.class,
             inputQuery, inputBoundingQuery);

Added: sqoop/trunk/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java?rev=1309268&view=auto
==============================================================================
--- sqoop/trunk/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java (added)
+++ sqoop/trunk/src/test/com/cloudera/sqoop/TestFreeFormQueryImport.java Wed Apr  4 06:58:52
2012
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import com.cloudera.sqoop.testutil.CommonArgs;
+import com.cloudera.sqoop.testutil.ImportJobTestCase;
+
+/**
+ * Test free form query import.
+ */
+public class TestFreeFormQueryImport extends ImportJobTestCase {
+
+  private Log log;
+
+  public TestFreeFormQueryImport() {
+    this.log = LogFactory.getLog(TestFreeFormQueryImport.class.getName());
+  }
+
+  /**
+   * @return the Log object to use for reporting during this test
+   */
+  protected Log getLogger() {
+    return log;
+  }
+
+  /** the names of the tables we're creating. */
+  private List<String> tableNames;
+
+  @Override
+  public void tearDown() {
+    // Clean up the database on our way out.
+    for (String tableName : tableNames) {
+      try {
+        dropTableIfExists(tableName);
+      } catch (SQLException e) {
+        log.warn("Error trying to drop table '" + tableName
+            + "' on tearDown: " + e);
+      }
+    }
+    super.tearDown();
+  }
+
+  /**
+   * Create the argv to pass to Sqoop.
+   * @param splitByCol column of the table used to split work.
+   * @param query free form query to be used.
+   * @return the argv as an array of strings.
+   */
+  protected String [] getArgv(String splitByCol, String query) {
+    ArrayList<String> args = new ArrayList<String>();
+
+    CommonArgs.addHadoopFlags(args);
+
+    args.add("--connect");
+    args.add(getConnectString());
+    args.add("--target-dir");
+    args.add(getWarehouseDir());
+    args.add("--split-by");
+    args.add(splitByCol);
+    args.add("--num-mappers");
+    args.add("2");
+    args.add("--query");
+    args.add(query);
+
+    return args.toArray(new String[0]);
+  }
+
+  /**
+   * Create two tables that share the common id column.  Run free-form query
+   * import on the result table that is created by joining the two tables on
+   * the id column.
+   */
+  public void testSimpleJoin() throws IOException {
+    tableNames = new ArrayList<String>();
+
+    String [] types1 = { "SMALLINT", };
+    String [] vals1 = { "1", };
+    String tableName1 = getTableName();
+    createTableWithColTypes(types1, vals1);
+    tableNames.add(tableName1);
+
+    incrementTableNum();
+
+    String [] types2 = { "SMALLINT", "VARCHAR(32)", };
+    String [] vals2 = { "1", "'foo'", };
+    String tableName2 = getTableName();
+    createTableWithColTypes(types2, vals2);
+    tableNames.add(tableName2);
+
+    String query = "SELECT "
+        + tableName1 + "." + getColName(0) + ", "
+        + tableName2 + "." + getColName(1) + " "
+        + "FROM " + tableName1 + " JOIN " + tableName2 + " ON ("
+        + tableName1 + "." + getColName(0) + " = "
+        + tableName2 + "." + getColName(0) + ") WHERE "
+        + tableName1 + "." + getColName(0) + " < 3 AND $CONDITIONS";
+
+    runImport(getArgv(tableName1 + "." + getColName(0), query));
+
+    Path warehousePath = new Path(this.getWarehouseDir());
+    Path filePath = new Path(warehousePath, "part-m-00000");
+    String expectedVal = "1,foo";
+
+    BufferedReader reader = null;
+    if (!isOnPhysicalCluster()) {
+      reader = new BufferedReader(
+          new InputStreamReader(new FileInputStream(
+              new File(filePath.toString()))));
+    } else {
+      FileSystem dfs = FileSystem.get(getConf());
+      FSDataInputStream dis = dfs.open(filePath);
+      reader = new BufferedReader(new InputStreamReader(dis));
+    }
+    try {
+      String line = reader.readLine();
+      assertEquals("QueryResult expected a different string",
+          expectedVal, line);
+    } finally {
+      IOUtils.closeStream(reader);
+    }
+  }
+}

Added: sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java?rev=1309268&view=auto
==============================================================================
--- sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java (added)
+++ sqoop/trunk/src/test/com/cloudera/sqoop/manager/MySQLFreeFormQueryTest.java Wed Apr  4
06:58:52 2012
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.TestFreeFormQueryImport;
+
+/**
+ * Test free form query import with the MySQL db.
+ */
+public class MySQLFreeFormQueryTest extends TestFreeFormQueryImport {
+
+  public static final Log LOG = LogFactory.getLog(
+      MySQLFreeFormQueryTest.class.getName());
+
+  @Override
+  protected Log getLogger() {
+    return LOG;
+  }
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Override
+  protected String getConnectString() {
+    return MySQLTestUtils.CONNECT_STRING;
+  }
+
+  @Override
+  protected SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions opts = new SqoopOptions(conf);
+    opts.setUsername(MySQLTestUtils.getCurrentUser());
+    return opts;
+  }
+
+  @Override
+  protected void dropTableIfExists(String table) throws SQLException {
+    Connection conn = getManager().getConnection();
+    PreparedStatement statement = conn.prepareStatement(
+        "DROP TABLE IF EXISTS " + table,
+        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try {
+      statement.executeUpdate();
+      conn.commit();
+    } finally {
+      statement.close();
+    }
+  }
+}

Added: sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java
URL: http://svn.apache.org/viewvc/sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java?rev=1309268&view=auto
==============================================================================
--- sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java (added)
+++ sqoop/trunk/src/test/com/cloudera/sqoop/manager/OracleFreeFormQueryTest.java Wed Apr 
4 06:58:52 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.sqoop.manager;
+
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.cloudera.sqoop.SqoopOptions;
+import com.cloudera.sqoop.TestFreeFormQueryImport;
+
+/**
+ * Test free form query import with the Oracle db.
+ */
+public class OracleFreeFormQueryTest extends TestFreeFormQueryImport {
+
+  public static final Log LOG = LogFactory.getLog(
+      OracleFreeFormQueryTest.class.getName());
+
+  @Override
+  protected boolean useHsqldbTestServer() {
+    return false;
+  }
+
+  @Override
+  protected String getConnectString() {
+    return OracleUtils.CONNECT_STRING;
+  }
+
+  @Override
+  protected SqoopOptions getSqoopOptions(Configuration conf) {
+    SqoopOptions opts = new SqoopOptions(conf);
+    OracleUtils.setOracleAuth(opts);
+    return opts;
+  }
+
+  @Override
+  protected void dropTableIfExists(String table) throws SQLException {
+    OracleUtils.dropTable(table, getManager());
+  }
+}
+



Mime
View raw message