sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From abhij...@apache.org
Subject git commit: SQOOP-604: Easy throttling feature for MySQL exports
Date Sat, 03 Nov 2012 05:21:41 GMT
Updated Branches:
  refs/heads/trunk 5eb987a78 -> c499f4909


SQOOP-604: Easy throttling feature for MySQL exports

(Zoltan Toth-Czifra via Abhijeet Gaikwad)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c499f490
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c499f490
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c499f490

Branch: refs/heads/trunk
Commit: c499f49097ebf04f9fac34f1df768a319e679cea
Parents: 5eb987a
Author: Abhijeet Gaikwad <abhijeet@apache.org>
Authored: Sat Nov 3 10:48:49 2012 +0530
Committer: Abhijeet Gaikwad <abhijeet@apache.org>
Committed: Sat Nov 3 10:48:49 2012 +0530

----------------------------------------------------------------------
 .../apache/sqoop/mapreduce/MySQLExportMapper.java  |   33 +++++++++++++++
 1 files changed, 33 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/c499f490/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
index a4e8b88..dc1c126 100644
--- a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
@@ -64,6 +64,18 @@ public class MySQLExportMapper<KEYIN, VALIN>
   // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
   protected long checkpointDistInBytes;
 
+  /** Configuration key that specifies the number of milliseconds
+   * to sleep at the end of each checkpoint commit
+   * Default is 0, no sleep.
+   */
+  public static final String MYSQL_CHECKPOINT_SLEEP_KEY =
+      "sqoop.mysql.export.sleep.ms";
+
+  public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;
+
+  // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.
+  protected long checkpointSleepMs;
+
   protected Configuration conf;
 
   /** The FIFO being used to communicate with mysqlimport. */
@@ -314,6 +326,21 @@ public class MySQLExportMapper<KEYIN, VALIN>
       LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
       this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
     }
+
+    this.checkpointSleepMs = conf.getLong(
+        MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS);
+
+    if (this.checkpointSleepMs < 0) {
+      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY);
+      this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS;
+    }
+
+    if (this.checkpointSleepMs >= conf.getLong("mapred.task.timeout", 0)) {
+      LOG.warn("Value for "
+          + MYSQL_CHECKPOINT_SLEEP_KEY
+          + " has to be smaller than mapred.task.timeout");
+      this.checkpointSleepMs = DEFAULT_CHECKPOINT_SLEEP_MS;
+    }
   }
 
   /**
@@ -347,6 +374,12 @@ public class MySQLExportMapper<KEYIN, VALIN>
     if (this.checkpointDistInBytes != 0
         && this.bytesWritten > this.checkpointDistInBytes) {
       LOG.info("Checkpointing current export.");
+
+      if (this.checkpointSleepMs != 0) {
+        LOG.info("Pausing.");
+        Thread.sleep(this.checkpointSleepMs);
+      }
+
       closeExportHandles();
       initMySQLImportProcess();
       this.bytesWritten = 0;


Mime
View raw message