sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-1078: incremental import from database in direct mode
Date Wed, 19 Jun 2013 22:39:29 GMT
Updated Branches:
  refs/heads/trunk 64878c643 -> 92e94b911


SQOOP-1078: incremental import from database in direct mode

(Tim Howe via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/92e94b91
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/92e94b91
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/92e94b91

Branch: refs/heads/trunk
Commit: 92e94b911d203fafbd4f3784badd29431aa5bf78
Parents: 64878c6
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Wed Jun 19 15:38:40 2013 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Wed Jun 19 15:38:40 2013 -0700

----------------------------------------------------------------------
 src/java/org/apache/sqoop/util/AppendUtils.java | 117 +++++++++++++------
 .../apache/sqoop/util/DirectImportUtils.java    |   2 +-
 2 files changed, 81 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/92e94b91/src/java/org/apache/sqoop/util/AppendUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/util/AppendUtils.java b/src/java/org/apache/sqoop/util/AppendUtils.java
index 873c718..d7cd6c5 100644
--- a/src/java/org/apache/sqoop/util/AppendUtils.java
+++ b/src/java/org/apache/sqoop/util/AppendUtils.java
@@ -49,6 +49,8 @@ public class AppendUtils {
   private static final String FILEPART_SEPARATOR = "-";
   private static final String FILEEXT_SEPARATOR = ".";
 
+  private static final Pattern DATA_PART_PATTERN = Pattern.compile("part.*-([0-9]{" + PARTITION_DIGITS
+ "}+).*");
+
   private ImportJobContext context = null;
 
   public AppendUtils(ImportJobContext context) {
@@ -116,11 +118,10 @@ public class AppendUtils {
     int nextPartition = 0;
     FileStatus[] existingFiles = fs.listStatus(targetDir);
     if (existingFiles != null && existingFiles.length > 0) {
-      Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
       for (FileStatus fileStat : existingFiles) {
         if (!fileStat.isDir()) {
           String filename = fileStat.getPath().getName();
-          Matcher mat = patt.matcher(filename);
+          Matcher mat = DATA_PART_PATTERN.matcher(filename);
           if (mat.matches()) {
             int thisPart = Integer.parseInt(mat.group(1));
             if (thisPart >= nextPartition) {
@@ -140,52 +141,94 @@ public class AppendUtils {
   }
 
   /**
-   * Move files from source to target using a specified starting partition.
+   * Move selected files from source to target using a specified starting partition.
+   *
+   * Directories are moved without restriction.  Note that the serial
+   * number of directories bears no relation to the file partition
+   * numbering.
    */
   private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir,
       int partitionStart) throws IOException {
 
-    NumberFormat numpart = NumberFormat.getInstance();
-    numpart.setMinimumIntegerDigits(PARTITION_DIGITS);
-    numpart.setGroupingUsed(false);
-    Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
-    FileStatus[] tempFiles = fs.listStatus(sourceDir);
+    /* list files in the source dir and check for errors */
+
+    FileStatus[] sourceFiles = fs.listStatus(sourceDir);
 
-    if (null == tempFiles) {
+    if (null == sourceFiles) {
       // If we've already checked that the dir exists, and now it can't be
       // listed, this is a genuine error (permissions, fs integrity, or other).
       throw new IOException("Could not list files from " + sourceDir);
     }
 
-    // Move and rename files & directories from temporary to target-dir thus
-    // appending file's next partition
-    for (FileStatus fileStat : tempFiles) {
-      if (!fileStat.isDir()) {
-        // Move imported data files
-        String filename = fileStat.getPath().getName();
-        Matcher mat = patt.matcher(filename);
-        if (mat.matches()) {
-          String name = getFilename(filename);
-          String fileToMove = name.concat(numpart.format(partitionStart++));
-          String extension = getFileExtension(filename);
-          if (extension != null) {
-            fileToMove = fileToMove.concat(extension);
-          }
-          LOG.debug("Filename: " + filename + " repartitioned to: "
-              + fileToMove);
-          fs.rename(fileStat.getPath(), new Path(targetDir, fileToMove));
-        }
-      } else {
-        // Move directories (_logs & any other)
-        String dirName = fileStat.getPath().getName();
-        Path path = new Path(targetDir, dirName);
-        int dirNumber = 0;
-        while (fs.exists(path)) {
-          path = new Path(targetDir, dirName.concat("-").concat(
-              numpart.format(dirNumber++)));
+
+    /* state used throughout the entire move operation */
+
+    // pad the data partition number thusly
+    NumberFormat partFormat = NumberFormat.getInstance();
+    partFormat.setMinimumIntegerDigits(PARTITION_DIGITS);
+    partFormat.setGroupingUsed(false);
+
+    // where the data partitioning is currently at
+    int dataPart = partitionStart;
+
+
+    /* loop through all top-level files and copy matching ones */
+
+    for (FileStatus fileStatus : sourceFiles) {
+      String        sourceFilename = fileStatus.getPath().getName();
+      StringBuilder destFilename   = new StringBuilder();
+
+      if (fileStatus.isDir()) {    // move all subdirectories
+        // pass target dir as initial dest to prevent nesting inside preexisting dir
+        if (fs.rename(fileStatus.getPath(), targetDir)) {
+          LOG.debug("Directory: " + sourceFilename + " renamed to: " + sourceFilename);
+        } else {
+          int dirNumber = 0;
+          Path destPath;
+          do {
+            // clear the builder in case this isn't the first iteration
+            destFilename.setLength(0);
+
+            // name-nnnnn?
+            destFilename
+              .append(sourceFilename)
+              .append("-")
+              .append(partFormat.format(dirNumber++));
+
+            destPath = new Path(targetDir, destFilename.toString());
+            if (fs.exists(destPath))
+              continue;
+
+            /*
+             * There's still a race condition right here if an
+             * identically-named directory is created concurrently.
+             * It can be avoided by creating a parent dir for all
+             * migrated dirs, or by an intermediate rename.
+             */
+
+          } while (!fs.rename(fileStatus.getPath(), destPath));
+
+          LOG.debug("Directory: " + sourceFilename + " renamed to: " + destPath.getName());
         }
-        LOG.debug("Directory: " + dirName + " renamed to: " + path.getName());
-        fs.rename(fileStat.getPath(), path);
+      } else if (DATA_PART_PATTERN.matcher(sourceFilename).matches()) {    // move only matching
top-level files
+        do {
+          // clear the builder in case this isn't the first iteration
+          destFilename.setLength(0);
+
+          // name-nnnnn
+          destFilename
+            .append(getFilename(sourceFilename))
+            .append(partFormat.format(dataPart++));
+
+          // .ext?
+          String extension = getFileExtension(sourceFilename);
+          if (extension != null)
+            destFilename.append(getFileExtension(sourceFilename));
+        } while (!fs.rename(fileStatus.getPath(), new Path(targetDir, destFilename.toString())));
+
+        LOG.debug("Filename: " + sourceFilename + " repartitioned to: " + destFilename.toString());
+      } else {    // ignore everything else
+        LOG.debug("Filename: " + sourceFilename + " ignored");
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/92e94b91/src/java/org/apache/sqoop/util/DirectImportUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/util/DirectImportUtils.java b/src/java/org/apache/sqoop/util/DirectImportUtils.java
index e1bf22f..d801c8f 100644
--- a/src/java/org/apache/sqoop/util/DirectImportUtils.java
+++ b/src/java/org/apache/sqoop/util/DirectImportUtils.java
@@ -86,7 +86,7 @@ public final class DirectImportUtils {
 
     // This Writer will be closed by the caller.
     return new SplittableBufferedWriter(
-        new SplittingOutputStream(conf, destDir, "data-",
+        new SplittingOutputStream(conf, destDir, "part-m-",
         options.getDirectSplitSize(), getCodec(conf, options)));
   }
 


Mime
View raw message