sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [3/5] git commit: SQOOP-677 Destroyer needs to be called from OutputCommitter (Jarek Jarcec Cecho)
Date Mon, 26 Nov 2012 17:13:26 GMT
SQOOP-677 Destroyer needs to be called from OutputCommitter
(Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ef12bf50
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ef12bf50
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ef12bf50

Branch: refs/heads/sqoop2
Commit: ef12bf508faf524f64ed52fb048e92b8a5b34398
Parents: 13c5c06
Author: Bilung Lee <blee@apache.org>
Authored: Mon Nov 26 08:45:02 2012 -0800
Committer: Bilung Lee <blee@apache.org>
Committed: Mon Nov 26 08:45:02 2012 -0800

----------------------------------------------------------------------
 .../connector/jdbc/GenericJdbcExportDestroyer.java |    8 +-
 .../connector/jdbc/GenericJdbcImportDestroyer.java |    4 +-
 .../apache/sqoop/framework/FrameworkManager.java   |    3 +-
 .../sqoop/job/mr/SqoopDestroyerExecutor.java       |   64 +++++++++++++++
 .../apache/sqoop/job/mr/SqoopFileOutputFormat.java |   31 +++++++
 .../apache/sqoop/job/mr/SqoopNullOutputFormat.java |   26 +++++-
 .../java/org/apache/sqoop/job/etl/Destroyer.java   |   13 +++-
 7 files changed, 139 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
index 7f952ac..37149de 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
@@ -17,14 +17,16 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+import org.apache.log4j.Logger;
 import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.job.etl.Destroyer;
 
 public class GenericJdbcExportDestroyer extends Destroyer {
 
+  private static final Logger LOG = Logger.getLogger(GenericJdbcExportDestroyer.class);
+
   @Override
-  public void run(ImmutableContext context) {
-    // TODO Auto-generated method stub
+  public void destroy(boolean success, ImmutableContext context, Object connectionConfig,
Object jobConfig) {
+    LOG.info("Running generic JDBC connector destroyer");
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
index a53fa59..e09b0c3 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
@@ -23,8 +23,8 @@ import org.apache.sqoop.job.etl.Destroyer;
 public class GenericJdbcImportDestroyer extends Destroyer {
 
   @Override
-  public void run(ImmutableContext context) {
-    // TODO Auto-generated method stub
+  public void destroy(boolean success, ImmutableContext context, Object connectionConfig,
Object jobConfig) {
+    // No explicit action at the moment
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 79c9acc..b012d23 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -433,7 +433,8 @@ public final class FrameworkManager {
     }
 
     // Initialize submission from connector perspective
-    destroyer.run(request.getConnectorContext());
+    destroyer.destroy(false, request.getConnectorContext(),
+      request.getConfigConnectorConnection(), request.getConfigConnectorJob());
   }
 
   public static MSubmission stop(long jobId) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
new file mode 100644
index 0000000..36eb65d
--- /dev/null
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.mr;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.job.PrefixContext;
+import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.utils.ClassUtils;
+
+/**
+ *  Helper class to execute destroyers on mapreduce job side.
+ */
+public class SqoopDestroyerExecutor {
+
+  public static final Log LOG =
+    LogFactory.getLog(SqoopNullOutputFormat.class.getName());
+
+  /**
+   * Execute destroyer.
+   *
+   * @param success True if the job execution was successfull
+   * @param configuration Configuration object to get destroyer class with context
+   *                      and configuration objects.
+   * @param propertyName Name of property that holds destroyer class.
+   */
+  public static void executeDestroyer(boolean success, Configuration configuration, String
propertyName) {
+    Destroyer destroyer = (Destroyer) ClassUtils.instantiate(configuration.get(propertyName));
+
+    if(destroyer == null) {
+      LOG.info("Skipping running destroyer as non was defined.");
+      return;
+    }
+
+    // Objects that should be pass to the Destroyer execution
+    PrefixContext subContext = new PrefixContext(configuration, JobConstants.PREFIX_CONNECTOR_CONTEXT);
+    Object configConnection = ConfigurationUtils.getConnectorConnection(configuration);
+    Object configJob = ConfigurationUtils.getConnectorJob(configuration);
+
+    destroyer.destroy(success, subContext, configConnection, configJob);
+  }
+
+  private SqoopDestroyerExecutor() {
+    // Instantiation is prohibited
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
index c221cbf..813f370 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java
@@ -27,8 +27,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.io.Data;
@@ -66,4 +70,31 @@ public class SqoopFileOutputFormat
     return executor.getRecordWriter();
   }
 
+  public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws
IOException {
+    Path output = getOutputPath(context);
+    return new DestroyerFileOutputCommitter(output, context);
+  }
+
+  public class DestroyerFileOutputCommitter extends FileOutputCommitter {
+
+    public DestroyerFileOutputCommitter(Path outputPath, TaskAttemptContext context) throws
IOException {
+      super(outputPath, context);
+    }
+
+    @Override
+    public void commitJob(JobContext context) throws IOException {
+      super.commitJob(context);
+
+      Configuration config = context.getConfiguration();
+      SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER);
+    }
+
+    @Override
+    public void abortJob(JobContext context, JobStatus.State state) throws IOException {
+      super.abortJob(context, state);
+
+      Configuration config = context.getConfiguration();
+      SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
index 1242f90..54604a7 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java
@@ -20,14 +20,19 @@ package org.apache.sqoop.job.mr;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.io.Data;
 
+import java.io.IOException;
+
 /**
  * An output format for MapReduce job.
  */
@@ -51,15 +56,30 @@ public class SqoopNullOutputFormat extends OutputFormat<Data, NullWritable>
{
 
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
-    // return an output committer that does nothing
-    return new NullOutputCommitter();
+    return new DestroyerOutputCommitter();
   }
 
-  class NullOutputCommitter extends OutputCommitter {
+  class DestroyerOutputCommitter extends OutputCommitter {
     @Override
     public void setupJob(JobContext jobContext) { }
 
     @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      super.commitJob(jobContext);
+
+      Configuration config = jobContext.getConfiguration();
+      SqoopDestroyerExecutor.executeDestroyer(true, config, JobConstants.JOB_ETL_DESTROYER);
+    }
+
+    @Override
+    public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException
{
+      super.abortJob(jobContext, state);
+
+      Configuration config = jobContext.getConfiguration();
+      SqoopDestroyerExecutor.executeDestroyer(false, config, JobConstants.JOB_ETL_DESTROYER);
+    }
+
+    @Override
     public void setupTask(TaskAttemptContext taskContext) { }
 
     @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ef12bf50/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
index c8dc7c3..528d550 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
@@ -25,6 +25,17 @@ import org.apache.sqoop.common.ImmutableContext;
  */
 public abstract class Destroyer {
 
-  public abstract void run(ImmutableContext context);
+  /**
+   * Callback to clean up after job execution.
+   *
+   * @param success True if the execution was successfull
+   * @param context Connector context object
+   * @param connectionConfiguration Connection configuration object
+   * @param jobConfiguration Job configuration object
+   */
+  public abstract void destroy(boolean success,
+                               ImmutableContext context,
+                               Object connectionConfiguration,
+                               Object jobConfiguration);
 
 }


Mime
View raw message