sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-1138: incremental lastmodified should re-use output directory
Date Tue, 15 Jul 2014 05:24:47 GMT
Repository: sqoop
Updated Branches:
  refs/heads/trunk d4ff097ea -> 34e4efd0d


SQOOP-1138: incremental lastmodified should re-use output directory

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/34e4efd0
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/34e4efd0
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/34e4efd0

Branch: refs/heads/trunk
Commit: 34e4efd0d7a6d34b3b89a7d43271ebb5aa8193a9
Parents: d4ff097
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Mon Jul 14 22:24:20 2014 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Mon Jul 14 22:24:20 2014 -0700

----------------------------------------------------------------------
 src/java/org/apache/sqoop/tool/ImportTool.java  | 123 ++++++++-
 .../cloudera/sqoop/TestIncrementalImport.java   | 253 ++++++++++++++++++-
 2 files changed, 369 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/34e4efd0/src/java/org/apache/sqoop/tool/ImportTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index 81904ac..a3a2d0d 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -28,12 +28,17 @@ import java.sql.Types;
 import java.util.List;
 import java.util.Map;
 
+import com.cloudera.sqoop.mapreduce.MergeJob;
+import com.cloudera.sqoop.orm.TableClassName;
+import com.cloudera.sqoop.util.ClassLoaderStack;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
 
 import com.cloudera.sqoop.Sqoop;
@@ -66,6 +71,9 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
   // store check column type for incremental option
   private int checkColumnType;
 
+  // Set classloader for local job runner
+  private ClassLoader prevClassLoader = null;
+
   public ImportTool() {
     this("import", false);
   }
@@ -91,6 +99,34 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool {
   }
 
   /**
+   * If jars must be loaded into the local environment, do so here.
+   */
+  private void loadJars(Configuration conf, String ormJarFile,
+                        String tableClassName) throws IOException {
+
+    boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
+        || "local".equals(conf.get("mapred.job.tracker"));
+    if (isLocal) {
+      // If we're using the LocalJobRunner, then instead of using the compiled
+      // jar file as the job source, we're running in the current thread. Push
+      // on another classloader that loads from that jar in addition to
+      // everything currently on the classpath.
+      this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
+          tableClassName);
+    }
+  }
+
+  /**
+   * If any classloader was invoked by loadJars, free it here.
+   */
+  private void unloadJars() {
+    if (null != this.prevClassLoader) {
+      // unload the special classloader for this jar.
+      ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
+    }
+  }
+
+  /**
    * @return true if the supplied options specify an incremental import.
    */
   private boolean isIncremental(SqoopOptions options) {
@@ -256,6 +292,7 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
       return true;
     }
 
+    FileSystem fs = FileSystem.get(options.getConf());
     SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
     String nextIncrementalValue = null;
 
@@ -280,6 +317,12 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
       }
       break;
     case DateLastModified:
+      if (options.getMergeKeyCol() == null && !options.isAppendMode()
+          && fs.exists(getOutputPath(options, context.getTableName(), false))) {
+        throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
+          + " is required when using --" + this.INCREMENT_TYPE_ARG
+          + " lastmodified and the output directory exists.");
+      }
       checkColumnType = manager.getColumnTypes(options.getTableName(),
         options.getSqlQuery()).get(options.getIncrementalTestColumn());
       nextVal = manager.getCurrentDbTimestamp();
@@ -382,6 +425,48 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
   }
 
   /**
+   * Merge HDFS output directories
+   */
+  protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) throws
IOException {
+    FileSystem fs = FileSystem.get(options.getConf());
+    if (context.getDestination() != null && fs.exists(context.getDestination()))
{
+      Path userDestDir = getOutputPath(options, context.getTableName(), false);
+      if (fs.exists(userDestDir)) {
+        String tableClassName = null;
+        if (!context.getConnManager().isORMFacilitySelfManaged()) {
+          tableClassName =
+              new TableClassName(options).getClassForTable(context.getTableName());
+        }
+        Path destDir = getOutputPath(options, context.getTableName());
+        options.setExistingJarName(context.getJarFile());
+        options.setClassName(tableClassName);
+        options.setMergeOldPath(userDestDir.toString());
+        options.setMergeNewPath(context.getDestination().toString());
+        // Merge to temporary directory so that original directory remains intact.
+        options.setTargetDir(destDir.toString());
+
+        // Local job tracker needs jars in the classpath.
+        loadJars(options.getConf(), context.getJarFile(), context.getTableName());
+
+        MergeJob mergeJob = new MergeJob(options);
+        if (mergeJob.runMergeJob()) {
+          // Rename destination directory to proper location.
+          Path tmpDir = getOutputPath(options, context.getTableName());
+          fs.rename(userDestDir, tmpDir);
+          fs.rename(destDir, userDestDir);
+          fs.delete(tmpDir, true);
+        } else {
+          LOG.error("Merge MapReduce job failed!");
+        }
+
+        unloadJars();
+      } else {
+        fs.rename(context.getDestination(), userDestDir);
+      }
+    }
+  }
+
+  /**
    * Import a table or query.
    * @return true if an import was performed, false otherwise.
    */
@@ -392,9 +477,11 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
     // Generate the ORM code for the tables.
     jarFile = codeGenerator.generateORM(options, tableName);
 
+    Path outputPath = getOutputPath(options, tableName);
+
     // Do the actual import.
     ImportJobContext context = new ImportJobContext(tableName, jarFile,
-        options, getOutputPath(options, tableName));
+        options, outputPath);
 
     // If we're doing an incremental import, set up the
     // filtering conditions used to get the latest records.
@@ -415,6 +502,8 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
     if (options.isAppendMode()) {
       AppendUtils app = new AppendUtils(context);
       app.append();
+    } else if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.DateLastModified)
{
+      lastModifiedMerge(options, context);
     }
 
     // If the user wants this table to be in Hive, perform that post-load.
@@ -449,11 +538,20 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
    * if importing to hbase, this may return null.
    */
   private Path getOutputPath(SqoopOptions options, String tableName) {
+    return getOutputPath(options, tableName, options.isAppendMode()
+        || options.getIncrementalMode().equals(SqoopOptions.IncrementalMode.DateLastModified));
+  }
+
+  /**
+   * @return the output path for the imported files;
+   * if importing to hbase, this may return null.
+   */
+  private Path getOutputPath(SqoopOptions options, String tableName, boolean temp) {
     // Get output directory
     String hdfsWarehouseDir = options.getWarehouseDir();
     String hdfsTargetDir = options.getTargetDir();
     Path outputPath = null;
-    if (options.isAppendMode()) {
+    if (temp) {
       // Use temporary path, later removed when appending
       String salt = tableName;
       if(salt == null && options.getSqlQuery() != null) {
@@ -586,6 +684,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
               + " value of the primary key")
           .withLongOpt(SQL_QUERY_BOUNDARY)
           .create());
+      importOpts.addOption(OptionBuilder.withArgName("column")
+          .hasArg().withDescription("Key column to use to join results")
+          .withLongOpt(MERGE_KEY_ARG)
+          .create());
 
       addValidationOpts(importOpts);
     }
@@ -798,6 +900,10 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
           out.setBoundaryQuery(in.getOptionValue(SQL_QUERY_BOUNDARY));
         }
 
+        if (in.hasOption(MERGE_KEY_ARG)) {
+          out.setMergeKeyCol(in.getOptionValue(MERGE_KEY_ARG));
+        }
+
         applyValidationOptions(in, out);
       }
 
@@ -941,14 +1047,14 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
       && options.getHCatTableName() != null) {
       throw new InvalidOptionsException("--hcatalog-table cannot be used "
         + " --warehouse-dir or --target-dir options");
-     } else if (options.isDeleteMode() && options.isAppendMode()) {
+    } else if (options.isDeleteMode() && options.isAppendMode()) {
        throw new InvalidOptionsException("--append and --delete-target-dir can"
          + " not be used together.");
-     } else if (options.isDeleteMode() && options.getIncrementalMode()
+    } else if (options.isDeleteMode() && options.getIncrementalMode()
          != SqoopOptions.IncrementalMode.None) {
        throw new InvalidOptionsException("--delete-target-dir can not be used"
          + " with incremental imports.");
-     }
+    }
   }
 
   /**
@@ -969,6 +1075,13 @@ public class ImportTool extends com.cloudera.sqoop.tool.BaseSqoopTool
{
           "You must specify an incremental import mode with --"
           + INCREMENT_TYPE_ARG + ". " + HELP_STR);
     }
+
+    if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.DateLastModified
+        && options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
+      throw new InvalidOptionsException("--"
+          + INCREMENT_TYPE_ARG + " lastmodified cannot be used in conjunction with --"
+          + FMT_AVRODATAFILE_ARG + "." + HELP_STR);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/34e4efd0/src/test/com/cloudera/sqoop/TestIncrementalImport.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
index 8eadcdd..fd94552 100644
--- a/src/test/com/cloudera/sqoop/TestIncrementalImport.java
+++ b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
@@ -323,10 +323,62 @@ public class TestIncrementalImport extends TestCase {
   }
 
   /**
+   * Look at a directory that should contain files full of an imported 'id'
+   * column and 'last_modified' column. Assert that all numbers in [0, expectedNums) are
present
+   * in order.
+   */
+  public void assertDirOfNumbersAndTimestamps(String tableName, int expectedNums) {
+    try {
+      FileSystem fs = FileSystem.getLocal(new Configuration());
+      Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+      Path tableDir = new Path(warehouse, tableName);
+      FileStatus [] stats = fs.listStatus(tableDir);
+      String [] fileNames = new String[stats.length];
+      for (int i = 0; i < stats.length; i++) {
+        fileNames[i] = stats[i].getPath().toString();
+      }
+
+      Arrays.sort(fileNames);
+
+      // Read all the files in sorted order, adding the value lines to the list.
+      List<String> receivedNums = new ArrayList<String>();
+      for (String fileName : fileNames) {
+        if (fileName.startsWith("_") || fileName.startsWith(".")) {
+          continue;
+        }
+
+        BufferedReader r = new BufferedReader(
+            new InputStreamReader(fs.open(new Path(fileName))));
+        try {
+          while (true) {
+            String s = r.readLine();
+            if (null == s) {
+              break;
+            }
+
+            receivedNums.add(s.trim());
+          }
+        } finally {
+          r.close();
+        }
+      }
+
+      assertEquals(expectedNums, receivedNums.size());
+
+      // Compare the received values with the expected set.
+      for (int i = 0; i < expectedNums; i++) {
+        assertEquals((int) i, (int) Integer.valueOf(receivedNums.get(i).split(",")[0]));
+      }
+    } catch (Exception e) {
+      fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+    }
+  }
+
+  /**
    * Assert that a directory contains a file with exactly one line
    * in it, containing the prescribed number 'val'.
    */
-  public void assertSpecificNumber(String tableName, int val) {
+  public void assertFirstSpecificNumber(String tableName, int val) {
     try {
       FileSystem fs = FileSystem.getLocal(new Configuration());
       Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
@@ -375,6 +427,53 @@ public class TestIncrementalImport extends TestCase {
     }
   }
 
+  /**
+   * Assert that a directory contains a file with exactly one line
+   * in it, containing the prescribed number 'val'.
+   */
+  public void assertSpecificNumber(String tableName, int val) {
+    try {
+      FileSystem fs = FileSystem.getLocal(new Configuration());
+      Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
+      Path tableDir = new Path(warehouse, tableName);
+      FileStatus [] stats = fs.listStatus(tableDir);
+      String [] filePaths = new String[stats.length];
+      for (int i = 0; i < stats.length; i++) {
+        filePaths[i] = stats[i].getPath().toString();
+      }
+
+      // Read the first file that is not a hidden file.
+      boolean foundVal = false;
+      for (String filePath : filePaths) {
+        String fileName = new Path(filePath).getName();
+        if (fileName.startsWith("_") || fileName.startsWith(".")) {
+          continue;
+        }
+
+        if (foundVal) {
+          // Make sure we don't have two or more "real" files in the dir.
+          fail("Got an extra data-containing file in this directory.");
+        }
+
+        BufferedReader r = new BufferedReader(
+            new InputStreamReader(fs.open(new Path(filePath))));
+        try {
+          String s = r.readLine();
+          if (val == (int) Integer.valueOf(s.trim().split(",")[0])) {
+            if (foundVal) {
+              fail("Expected only one result, but got another line: " + s);
+            }
+            foundVal = true;
+          }
+        } finally {
+          r.close();
+        }
+      }
+    } catch (IOException e) {
+      fail("Got unexpected exception: " + StringUtils.stringifyException(e));
+    }
+  }
+
   public void runImport(SqoopOptions options, List<String> args) {
     try {
       Sqoop importer = new Sqoop(new ImportTool(), options.getConf(), options);
@@ -465,7 +564,7 @@ public class TestIncrementalImport extends TestCase {
       args.add("--incremental");
       args.add("lastmodified");
       args.add("--check-column");
-      args.add("last_modified");
+      args.add("LAST_MODIFIED");
     }
     args.add("-m");
     args.add("1");
@@ -858,6 +957,156 @@ public class TestIncrementalImport extends TestCase {
     // Import only the new row.
     clearDir(TABLE_NAME);
     runJob(TABLE_NAME);
+    assertFirstSpecificNumber(TABLE_NAME, 4000);
+  }
+
+  public void testUpdateModifyWithTimestamp() throws Exception {
+    // Create a table with data in it; import it.
+    // Then modify some existing rows, and verify that we only grab
+    // those rows.
+
+    final String TABLE_NAME = "updateModifyTimestamp";
+    Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, false);
+
+    Configuration conf = newConf();
+    SqoopOptions options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Modify a row.
+    long importWasBefore = System.currentTimeMillis();
+    Thread.sleep(50);
+    long rowsAddedTime = System.currentTimeMillis() - 5;
+    assertTrue(rowsAddedTime > importWasBefore);
+    assertTrue(rowsAddedTime < System.currentTimeMillis());
+    SqoopOptions options2 = new SqoopOptions();
+    options2.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options2);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("UPDATE " + TABLE_NAME
+          + " SET id=?, last_modified=? WHERE id=?");
+      s.setInt(1, 4000); // the first row should have '4000' in it now.
+      s.setTimestamp(2, new Timestamp(rowsAddedTime));
+      s.setInt(3, 0);
+      s.executeUpdate();
+      c.commit();
+    } finally {
+      s.close();
+    }
+
+    // Update the new row.
+    args.add("--last-value");
+    args.add(new Timestamp(importWasBefore).toString());
+    args.add("--merge-key");
+    args.add("id");
+    conf = newConf();
+    options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+    assertSpecificNumber(TABLE_NAME, 4000);
+  }
+
+  public void testUpdateModifyWithTimestampWithQuery() throws Exception {
+    // Create an empty table. Import it; nothing happens.
+    // Add some rows. Verify they are appended.
+
+    final String TABLE_NAME = "UpdateModifyWithTimestampWithQuery";
+    Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    final String QUERY = "SELECT id, last_modified FROM UpdateModifyWithTimestampWithQuery
WHERE $CONDITIONS";
+
+    List<String> args = getArgListForQuery(QUERY, TABLE_NAME,
+        true, false, false);
+
+    Configuration conf = newConf();
+    SqoopOptions options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+    assertDirOfNumbersAndTimestamps(TABLE_NAME, 10);
+
+    // Modify a row.
+    long importWasBefore = System.currentTimeMillis();
+    Thread.sleep(50);
+    long rowsAddedTime = System.currentTimeMillis() - 5;
+    assertTrue(rowsAddedTime > importWasBefore);
+    assertTrue(rowsAddedTime < System.currentTimeMillis());
+    SqoopOptions options2 = new SqoopOptions();
+    options2.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options2);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("UPDATE " + TABLE_NAME
+          + " SET id=?, last_modified=? WHERE id=?");
+      s.setInt(1, 4000); // the first row should have '4000' in it now.
+      s.setTimestamp(2, new Timestamp(rowsAddedTime));
+      s.setInt(3, 0);
+      s.executeUpdate();
+      c.commit();
+    } finally {
+      s.close();
+    }
+
+    // Update the new row.
+    args.add("--last-value");
+    args.add(new Timestamp(importWasBefore).toString());
+    args.add("--merge-key");
+    args.add("id");
+    conf = newConf();
+    options = new SqoopOptions();
+    options.setConf(conf);
+    runImport(options, args);
+    assertSpecificNumber(TABLE_NAME, 4000);
+  }
+
+  public void testUpdateModifyWithTimestampJob() throws Exception {
+    // Create a table with data in it; import it.
+    // Then modify some existing rows, and verify that we only grab
+    // those rows.
+
+    final String TABLE_NAME = "updateModifyTimestampJob";
+    Timestamp thePast = new Timestamp(System.currentTimeMillis() - 100);
+    createTimestampTable(TABLE_NAME, 10, thePast);
+
+    List<String> args = getArgListForTable(TABLE_NAME, false, false);
+    args.add("--merge-key");
+    args.add("id");
+    createJob(TABLE_NAME, args);
+    runJob(TABLE_NAME);
+    assertDirOfNumbers(TABLE_NAME, 10);
+
+    // Modify a row.
+    long importWasBefore = System.currentTimeMillis();
+    Thread.sleep(50);
+    long rowsAddedTime = System.currentTimeMillis() - 5;
+    assertTrue(rowsAddedTime > importWasBefore);
+    assertTrue(rowsAddedTime < System.currentTimeMillis());
+    SqoopOptions options2 = new SqoopOptions();
+    options2.setConnectString(SOURCE_DB_URL);
+    HsqldbManager manager = new HsqldbManager(options2);
+    Connection c = manager.getConnection();
+    PreparedStatement s = null;
+    try {
+      s = c.prepareStatement("UPDATE " + TABLE_NAME
+          + " SET id=?, last_modified=? WHERE id=?");
+      s.setInt(1, 4000); // the first row should have '4000' in it now.
+      s.setTimestamp(2, new Timestamp(rowsAddedTime));
+      s.setInt(3, 0);
+      s.executeUpdate();
+      c.commit();
+    } finally {
+      s.close();
+    }
+
+    // Update the new row.
+    runJob(TABLE_NAME);
     assertSpecificNumber(TABLE_NAME, 4000);
   }
 


Mime
View raw message