sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-1139: Sqoop2: JobManager.java indentation is inconsistent with rest of the codebase
Date Wed, 17 Jul 2013 21:32:48 GMT
Updated Branches:
  refs/heads/sqoop2 20bbef04c -> e5f664508


SQOOP-1139: Sqoop2: JobManager.java indentation is inconsistent with rest of the codebase

(Venkat Ranganathan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e5f66450
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e5f66450
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e5f66450

Branch: refs/heads/sqoop2
Commit: e5f664508e751afeca4b2462a850ac5436bf439d
Parents: 20bbef0
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Wed Jul 17 14:31:55 2013 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Wed Jul 17 14:31:55 2013 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/framework/JobManager.java  | 1189 +++++++++---------
 1 file changed, 606 insertions(+), 583 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/e5f66450/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index 2d37020..9f09982 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.sqoop.framework;
 
 import org.apache.log4j.Logger;
@@ -43,571 +43,594 @@ import java.util.Date;
 import java.util.List;
 
 public class JobManager implements Reconfigurable {
-   /**
-    * Logger object.
-    */
-   private static final Logger LOG = Logger.getLogger(JobManager.class);
-
-   /**
-    * Private instance to singleton of this class.
-    */
-   private static JobManager instance;
-   /**
-    * Create default object by default.
-    *
-    * Every Sqoop server application needs one so this should not be performance issue.
-    */
-   static {
-       instance = new JobManager();
-   }
-
-   /**
-    * Return current instance.
-    *
-    * @return Current instance
-    */
-   public static JobManager getInstance() {
-       return instance;
-   }
-
-   /**
-    * Allows to set instance in case that it's need.
-    *
-    * This method should not be normally used as the default instance should be sufficient.
One target
-    * user use case for this method are unit tests.
-    *
-    * @param newInstance New instance
-    */
-   public static void setInstance(JobManager newInstance) {
-       instance = newInstance;
-   }
-
-   /**
-    * Default interval for purging old submissions from repository.
-    */
-   private static final long DEFAULT_PURGE_THRESHOLD = 24*60*60*1000;
-
-   /**
-    * Default sleep interval for purge thread.
-    */
-   private static final long DEFAULT_PURGE_SLEEP = 24*60*60*1000;
-
-   /**
-    * Default interval for update thread.
-    */
-   private static final long DEFAULT_UPDATE_SLEEP = 60*5*1000;
-
-   /**
-    * Configured submission engine instance
-    */
-   private SubmissionEngine submissionEngine;
-
-   /**
-    * Configured execution engine instance
-    */
-   private ExecutionEngine executionEngine;
-
-   /**
-    * Purge thread that will periodically remove old submissions from repository.
-    */
-   private PurgeThread purgeThread = null;
-
-   /**
-    * Update thread that will periodically check status of running submissions.
-    */
-   private UpdateThread updateThread = null;
-
-   /**
-    * Synchronization variable between threads.
-    */
-   private boolean running = true;
-
-   /**
-    * Specifies how old submissions should be removed from repository.
-    */
-   private long purgeThreshold;
-
-   /**
-    * Number of milliseconds for purge thread to sleep.
-    */
-   private long purgeSleep;
-
-   /**
-    * Number of milliseconds for update thread to slepp.
-    */
-   private long updateSleep;
-
-   /**
-    * Base notification URL.
-    *
-    * Framework manager will always add job id.
-    */
-   private String notificationBaseUrl;
-
-   /**
-    * Set notification base URL.
-    *
-    * @param url Base URL
-    */
-   public void setNotificationBaseUrl(String url) {
-       LOG.debug("Setting notification base URL to " + url);
-       notificationBaseUrl = url;
-   }
-
-   /**
-    * Get base notification url.
-    *
-    * @return String representation of the URL
-    */
-   public String getNotificationBaseUrl() {
-       return notificationBaseUrl;
-   }
-
-   public  synchronized void destroy() {
-       LOG.trace("Begin submission engine manager destroy");
-
-       running = false;
-
-       try {
-           purgeThread.interrupt();
-           purgeThread.join();
-       } catch (InterruptedException e) {
-           //TODO(jarcec): Do I want to wait until it actually finish here?
-           LOG.error("Interrupted joining purgeThread");
-       }
-
-       try {
-           updateThread.interrupt();
-           updateThread.join();
-       } catch (InterruptedException e) {
-           //TODO(jarcec): Do I want to wait until it actually finish here?
-           LOG.error("Interrupted joining updateThread");
-       }
-
-       if(submissionEngine != null) {
-           submissionEngine.destroy();
-       }
-
-       if(executionEngine != null) {
-           executionEngine.destroy();
-       }
-   }
-
-
-   public synchronized void initialize() {
-       LOG.trace("Begin submission engine manager initialization");
-       MapContext context = SqoopConfiguration.getInstance().getContext();
-
-
-       // Let's load configured submission engine
-       String submissionEngineClassName =
-               context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
-
-       submissionEngine = (SubmissionEngine) ClassUtils.instantiate(submissionEngineClassName);
-       if(submissionEngine == null) {
-           throw new SqoopException(FrameworkError.FRAMEWORK_0001,
-                   submissionEngineClassName);
-       }
-
-       submissionEngine.initialize(context, FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
-
-       // Execution engine
-       String executionEngineClassName =
-               context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
-
-       executionEngine = (ExecutionEngine) ClassUtils.instantiate(executionEngineClassName);
-       if(executionEngine == null) {
-           throw new SqoopException(FrameworkError.FRAMEWORK_0007,
-                   executionEngineClassName);
-       }
-
-       // We need to make sure that user has configured compatible combination of
-       // submission engine and execution engine
-       if(! submissionEngine.isExecutionEngineSupported(executionEngine.getClass())) {
-           throw new SqoopException(FrameworkError.FRAMEWORK_0008);
-       }
-
-       executionEngine.initialize(context, FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
-
-       // Set up worker threads
-       purgeThreshold = context.getLong(
-               FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
-               DEFAULT_PURGE_THRESHOLD
-       );
-       purgeSleep = context.getLong(
-               FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
-               DEFAULT_PURGE_SLEEP
-       );
-
-       purgeThread = new PurgeThread();
-       purgeThread.start();
-
-       updateSleep = context.getLong(
-               FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
-               DEFAULT_UPDATE_SLEEP
-       );
-
-       updateThread = new UpdateThread();
-       updateThread.start();
-
-       SqoopConfiguration.getInstance().getProvider().registerListener(new CoreConfigurationListener(this));
-
-       LOG.info("Submission manager initialized: OK");
-   }
-   public MSubmission submit(long jobId) {
-       Repository repository = RepositoryManager.getInstance().getRepository();
-
-       MJob job = repository.findJob(jobId);
-       if(job == null) {
-           throw new SqoopException(FrameworkError.FRAMEWORK_0004,
-                   "Unknown job id " + jobId);
-       }
-       MConnection connection = repository.findConnection(job.getConnectionId());
-       SqoopConnector connector =
-               ConnectorManager.getInstance().getConnector(job.getConnectorId());
-
-       // Transform forms to connector specific classes
-       Object connectorConnection = ClassUtils.instantiate(
-               connector.getConnectionConfigurationClass());
-       FormUtils.fromForms(connection.getConnectorPart().getForms(),
-               connectorConnection);
-
-       Object connectorJob = ClassUtils.instantiate(
-               connector.getJobConfigurationClass(job.getType()));
-       FormUtils.fromForms(job.getConnectorPart().getForms(), connectorJob);
-
-       // Transform framework specific forms
-       Object frameworkConnection = ClassUtils.instantiate(
-               FrameworkManager.getInstance().getConnectionConfigurationClass());
-       FormUtils.fromForms(connection.getFrameworkPart().getForms(),
-               frameworkConnection);
-
-       Object frameworkJob = ClassUtils.instantiate(
-               FrameworkManager.getInstance().getJobConfigurationClass(job.getType()));
-       FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
-
-       // Create request object
-       MSubmission summary = new MSubmission(jobId);
-       SubmissionRequest request = executionEngine.createSubmissionRequest();
-
-       // Save important variables to the submission request
-       request.setSummary(summary);
-       request.setConnector(connector);
-       request.setConfigConnectorConnection(connectorConnection);
-       request.setConfigConnectorJob(connectorJob);
-       request.setConfigFrameworkConnection(frameworkConnection);
-       request.setConfigFrameworkJob(frameworkJob);
-       request.setJobType(job.getType());
-       request.setJobName(job.getName());
-       request.setJobId(job.getPersistenceId());
-       request.setNotificationUrl(notificationBaseUrl + jobId);
-
-       // Let's register all important jars
-       // sqoop-common
-       request.addJarForClass(MapContext.class);
-       // sqoop-core
-       request.addJarForClass(FrameworkManager.class);
-       // sqoop-spi
-       request.addJarForClass(SqoopConnector.class);
-       // Execution engine jar
-       request.addJarForClass(executionEngine.getClass());
-       // Connector in use
-       request.addJarForClass(connector.getClass());
-
-       // Extra libraries that Sqoop code requires
-       request.addJarForClass(JSONValue.class);
-
-       // Get connector callbacks
-       switch (job.getType()) {
-           case IMPORT:
-               request.setConnectorCallbacks(connector.getImporter());
-               break;
-           case EXPORT:
-               request.setConnectorCallbacks(connector.getExporter());
-               break;
-           default:
-               throw  new SqoopException(FrameworkError.FRAMEWORK_0005,
-                       "Unsupported job type " + job.getType().name());
-       }
-       LOG.debug("Using callbacks: " + request.getConnectorCallbacks());
-
-       // Initialize submission from connector perspective
-       CallbackBase baseCallbacks = request.getConnectorCallbacks();
-
-       Class<? extends Initializer> initializerClass = baseCallbacks.getInitializer();
-       Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass);
-
-       if(initializer == null) {
-           throw  new SqoopException(FrameworkError.FRAMEWORK_0006,
-                   "Can't create initializer instance: " + initializerClass.getName());
-       }
-
-       // Initializer context
-       InitializerContext initializerContext = new InitializerContext(request.getConnectorContext());
-
-       // Initialize submission from connector perspective
-       initializer.initialize(initializerContext,
-               request.getConfigConnectorConnection(),
-               request.getConfigConnectorJob());
-
-       // Add job specific jars to
-       request.addJars(initializer.getJars(initializerContext,
-               request.getConfigConnectorConnection(),
-               request.getConfigConnectorJob()));
-
-       // Retrieve and persist the schema
-       request.getSummary().setConnectorSchema(initializer.getSchema(
-           initializerContext,
-           request.getConfigConnectorConnection(),
-           request.getConfigConnectorJob()
-       ));
-
-       // Bootstrap job from framework perspective
-       switch (job.getType()) {
-           case IMPORT:
-               prepareImportSubmission(request);
-               break;
-           case EXPORT:
-               prepareExportSubmission(request);
-               break;
-           default:
-               throw  new SqoopException(FrameworkError.FRAMEWORK_0005,
-                       "Unsupported job type " + job.getType().name());
-       }
-
-       // Make sure that this job id is not currently running and submit the job
-       // only if it's not.
-       synchronized (getClass()) {
-           MSubmission lastSubmission = repository.findSubmissionLastForJob(jobId);
-           if(lastSubmission != null && lastSubmission.getStatus().isRunning()) {
-               throw new SqoopException(FrameworkError.FRAMEWORK_0002,
-                       "Job with id " + jobId);
-           }
-
-           // TODO(jarcec): We might need to catch all exceptions here to ensure
-           // that Destroyer will be executed in all cases.
-           boolean submitted = submissionEngine.submit(request);
-           if(!submitted) {
-               destroySubmission(request);
-               summary.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
-           }
-
-           repository.createSubmission(summary);
-       }
-
-       // Return job status most recent
-       return summary;
-   }
-
-   private void prepareImportSubmission(SubmissionRequest request) {
-       ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request.getConfigFrameworkJob();
-
-       // Initialize the map-reduce part (all sort of required classes, ...)
-       request.setOutputDirectory(jobConfiguration.output.outputDirectory);
-
-       // We're directly moving configured number of extractors and loaders to
-       // underlying request object. In the future we might need to throttle this
-       // count based on other running jobs to meet our SLAs.
-       request.setExtractors(jobConfiguration.throttling.extractors);
-       request.setLoaders(jobConfiguration.throttling.loaders);
-
-       // Delegate rest of the job to execution engine
-       executionEngine.prepareImportSubmission(request);
-   }
-
-   private void prepareExportSubmission(SubmissionRequest request) {
-       ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request.getConfigFrameworkJob();
-
-       // We're directly moving configured number of extractors and loaders to
-       // underlying request object. In the future we might need to throttle this
-       // count based on other running jobs to meet our SLAs.
-       request.setExtractors(jobConfiguration.throttling.extractors);
-       request.setLoaders(jobConfiguration.throttling.loaders);
-
-       // Delegate rest of the job to execution engine
-       executionEngine.prepareExportSubmission(request);
-   }
-
-   /**
-    * Callback that will be called only if we failed to submit the job to the
-    * remote cluster.
-    */
-   private void destroySubmission(SubmissionRequest request) {
-       CallbackBase baseCallbacks = request.getConnectorCallbacks();
-
-       Class<? extends Destroyer> destroyerClass = baseCallbacks.getDestroyer();
-       Destroyer destroyer = (Destroyer) ClassUtils.instantiate(destroyerClass);
-
-       if(destroyer == null) {
-           throw  new SqoopException(FrameworkError.FRAMEWORK_0006,
-                   "Can't create destroyer instance: " + destroyerClass.getName());
-       }
-
-       DestroyerContext destroyerContext = new DestroyerContext(request.getConnectorContext(),
false, request.getSummary().getConnectorSchema());
-
-       // Initialize submission from connector perspective
-       destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(), request.getConfigConnectorJob());
-   }
-
-   public MSubmission stop(long jobId) {
-       Repository repository = RepositoryManager.getInstance().getRepository();
-       MSubmission submission = repository.findSubmissionLastForJob(jobId);
-
-       if(submission == null || !submission.getStatus().isRunning()) {
-           throw new SqoopException(FrameworkError.FRAMEWORK_0003,
-                   "Job with id " + jobId + " is not running");
-       }
-
-       String externalId = submission.getExternalId();
-       submissionEngine.stop(externalId);
-
-       // Fetch new information to verify that the stop command has actually worked
-       update(submission);
-
-       // Return updated structure
-       return submission;
-   }
-
-   public MSubmission status(long jobId) {
-       Repository repository = RepositoryManager.getInstance().getRepository();
-       MSubmission submission = repository.findSubmissionLastForJob(jobId);
-
-       if(submission == null) {
-           return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
-       }
-
-       // If the submission is in running state, let's update it
-       if(submission.getStatus().isRunning()) {
-           update(submission);
-       }
-
-       return submission;
-   }
-
-   private void update(MSubmission submission) {
-       double progress  = -1;
-       Counters counters = null;
-       String externalId = submission.getExternalId();
-       SubmissionStatus newStatus = submissionEngine.status(externalId);
-       String externalLink = submissionEngine.externalLink(externalId);
-
-       if(newStatus.isRunning()) {
-           progress = submissionEngine.progress(externalId);
-       } else {
-           counters = submissionEngine.counters(externalId);
-       }
-
-       submission.setStatus(newStatus);
-       submission.setProgress(progress);
-       submission.setCounters(counters);
-       submission.setExternalLink(externalLink);
-       submission.setLastUpdateDate(new Date());
-
-       RepositoryManager.getInstance().getRepository().updateSubmission(submission);
-   }
-
-   @Override
-   public synchronized void configurationChanged() {
-     LOG.info("Begin submission engine manager reconfiguring");
-     MapContext newContext = SqoopConfiguration.getInstance().getContext();
-     MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
-
-     String newSubmissionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
-     if (newSubmissionEngineClassName == null
-         || newSubmissionEngineClassName.trim().length() == 0) {
-       throw new SqoopException(FrameworkError.FRAMEWORK_0001,
-           newSubmissionEngineClassName);
-     }
-
-     String oldSubmissionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
-     if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
-       LOG.warn("Submission engine cannot be replaced at the runtime. " +
-                "You might need to restart the server.");
-     }
-
-     String newExecutionEngineClassName = newContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
-     if (newExecutionEngineClassName == null
-         || newExecutionEngineClassName.trim().length() == 0) {
-       throw new SqoopException(FrameworkError.FRAMEWORK_0007,
-           newExecutionEngineClassName);
-     }
-
-     String oldExecutionEngineClassName = oldContext.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
-     if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
-       LOG.warn("Execution engine cannot be replaced at the runtime. " +
-                "You might need to restart the server.");
-     }
-
-     // Set up worker threads
-     purgeThreshold = newContext.getLong(
-       FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
-       DEFAULT_PURGE_THRESHOLD
-     );
-     purgeSleep = newContext.getLong(
-       FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
-       DEFAULT_PURGE_SLEEP
-     );
-     purgeThread.interrupt();
-
-     updateSleep = newContext.getLong(
-       FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
-       DEFAULT_UPDATE_SLEEP
-     );
-     updateThread.interrupt();
-
-     LOG.info("Submission engine manager reconfigured.");
-   }
-
-   private class PurgeThread extends Thread {
-       public PurgeThread() {
-           super("PurgeThread");
-       }
-
-       public void run() {
-           LOG.info("Starting submission manager purge thread");
-
-           while(running) {
-               try {
-                   LOG.info("Purging old submissions");
-                   Date threshold = new Date((new Date()).getTime() - purgeThreshold);
-                   RepositoryManager.getInstance().getRepository().purgeSubmissions(threshold);
-                   Thread.sleep(purgeSleep);
-               } catch (InterruptedException e) {
-                   LOG.debug("Purge thread interrupted", e);
-               }
-           }
-
-           LOG.info("Ending submission manager purge thread");
-       }
-   }
-
-   private class UpdateThread extends Thread {
-       public UpdateThread() {
-           super("UpdateThread");
-       }
-
-       public void run() {
-           LOG.info("Starting submission manager update thread");
-
-           while(running) {
-               try {
-                   LOG.debug("Updating running submissions");
-
-                   // Let's get all running submissions from repository to check them out
-                   List<MSubmission> unfinishedSubmissions =
-                           RepositoryManager.getInstance().getRepository().findSubmissionsUnfinished();
-
-                   for(MSubmission submission : unfinishedSubmissions) {
-                       update(submission);
-                   }
-
-                   Thread.sleep(updateSleep);
-               } catch (InterruptedException e) {
-                   LOG.debug("Purge thread interrupted", e);
-               }
-           }
-
-           LOG.info("Ending submission manager update thread");
-       }
-   }
+  /**
+   * Logger object.
+   */
+  private static final Logger LOG = Logger.getLogger(JobManager.class);
+
+  /**
+   * Private instance to singleton of this class.
+   */
+  private static JobManager instance;
+  /**
+   * Create default object by default.
+   *
+   * Every Sqoop server application needs one so this should not be performance
+   * issue.
+   */
+  static {
+    instance = new JobManager();
+  }
+
+  /**
+   * Return current instance.
+   *
+   * @return Current instance
+   */
+  public static JobManager getInstance() {
+    return instance;
+  }
+
+  /**
+   * Allows to set instance in case that it's need.
+   *
+   * This method should not be normally used as the default instance should be
+   * sufficient. One target user use case for this method are unit tests.
+   *
+   * @param newInstance
+   *          New instance
+   */
+  public static void setInstance(JobManager newInstance) {
+    instance = newInstance;
+  }
+
+  /**
+   * Default interval for purging old submissions from repository.
+   */
+  private static final long DEFAULT_PURGE_THRESHOLD = 24 * 60 * 60 * 1000;
+
+  /**
+   * Default sleep interval for purge thread.
+   */
+  private static final long DEFAULT_PURGE_SLEEP = 24 * 60 * 60 * 1000;
+
+  /**
+   * Default interval for update thread.
+   */
+  private static final long DEFAULT_UPDATE_SLEEP = 60 * 5 * 1000;
+
+  /**
+   * Configured submission engine instance
+   */
+  private SubmissionEngine submissionEngine;
+
+  /**
+   * Configured execution engine instance
+   */
+  private ExecutionEngine executionEngine;
+
+  /**
+   * Purge thread that will periodically remove old submissions from repository.
+   */
+  private PurgeThread purgeThread = null;
+
+  /**
+   * Update thread that will periodically check status of running submissions.
+   */
+  private UpdateThread updateThread = null;
+
+  /**
+   * Synchronization variable between threads.
+   */
+  private boolean running = true;
+
+  /**
+   * Specifies how old submissions should be removed from repository.
+   */
+  private long purgeThreshold;
+
+  /**
+   * Number of milliseconds for purge thread to sleep.
+   */
+  private long purgeSleep;
+
+  /**
+   * Number of milliseconds for update thread to slepp.
+   */
+  private long updateSleep;
+
+  /**
+   * Base notification URL.
+   *
+   * Framework manager will always add job id.
+   */
+  private String notificationBaseUrl;
+
+  /**
+   * Set notification base URL.
+   *
+   * @param url
+   *          Base URL
+   */
+  public void setNotificationBaseUrl(String url) {
+    LOG.debug("Setting notification base URL to " + url);
+    notificationBaseUrl = url;
+  }
+
+  /**
+   * Get base notification url.
+   *
+   * @return String representation of the URL
+   */
+  public String getNotificationBaseUrl() {
+    return notificationBaseUrl;
+  }
+
+  public synchronized void destroy() {
+    LOG.trace("Begin submission engine manager destroy");
+
+    running = false;
+
+    try {
+      purgeThread.interrupt();
+      purgeThread.join();
+    } catch (InterruptedException e) {
+      // TODO(jarcec): Do I want to wait until it actually finish here?
+      LOG.error("Interrupted joining purgeThread");
+    }
+
+    try {
+      updateThread.interrupt();
+      updateThread.join();
+    } catch (InterruptedException e) {
+      // TODO(jarcec): Do I want to wait until it actually finish here?
+      LOG.error("Interrupted joining updateThread");
+    }
+
+    if (submissionEngine != null) {
+      submissionEngine.destroy();
+    }
+
+    if (executionEngine != null) {
+      executionEngine.destroy();
+    }
+  }
+
+  public synchronized void initialize() {
+    LOG.trace("Begin submission engine manager initialization");
+    MapContext context = SqoopConfiguration.getInstance().getContext();
+
+    // Let's load configured submission engine
+    String submissionEngineClassName =
+      context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
+
+    submissionEngine = (SubmissionEngine) ClassUtils
+      .instantiate(submissionEngineClassName);
+    if (submissionEngine == null) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0001,
+        submissionEngineClassName);
+    }
+
+    submissionEngine.initialize(context,
+      FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
+
+    // Execution engine
+    String executionEngineClassName =
+      context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
+
+    executionEngine = (ExecutionEngine) ClassUtils
+      .instantiate(executionEngineClassName);
+    if (executionEngine == null) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0007,
+        executionEngineClassName);
+    }
+
+    // We need to make sure that user has configured compatible combination of
+    // submission engine and execution engine
+    if (!submissionEngine
+      .isExecutionEngineSupported(executionEngine.getClass())) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0008);
+    }
+
+    executionEngine.initialize(context,
+      FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
+
+    // Set up worker threads
+    purgeThreshold = context.getLong(
+      FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
+      DEFAULT_PURGE_THRESHOLD
+      );
+    purgeSleep = context.getLong(
+      FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
+      DEFAULT_PURGE_SLEEP
+      );
+
+    purgeThread = new PurgeThread();
+    purgeThread.start();
+
+    updateSleep = context.getLong(
+      FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
+      DEFAULT_UPDATE_SLEEP
+      );
+
+    updateThread = new UpdateThread();
+    updateThread.start();
+
+    SqoopConfiguration.getInstance().getProvider()
+      .registerListener(new CoreConfigurationListener(this));
+
+    LOG.info("Submission manager initialized: OK");
+  }
+
+  public MSubmission submit(long jobId) {
+    Repository repository = RepositoryManager.getInstance().getRepository();
+
+    MJob job = repository.findJob(jobId);
+    if (job == null) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0004,
+        "Unknown job id " + jobId);
+    }
+    MConnection connection = repository.findConnection(job.getConnectionId());
+    SqoopConnector connector =
+      ConnectorManager.getInstance().getConnector(job.getConnectorId());
+
+    // Transform forms to connector specific classes
+    Object connectorConnection = ClassUtils.instantiate(
+      connector.getConnectionConfigurationClass());
+    FormUtils.fromForms(connection.getConnectorPart().getForms(),
+      connectorConnection);
+
+    Object connectorJob = ClassUtils.instantiate(
+      connector.getJobConfigurationClass(job.getType()));
+    FormUtils.fromForms(job.getConnectorPart().getForms(), connectorJob);
+
+    // Transform framework specific forms
+    Object frameworkConnection = ClassUtils.instantiate(
+      FrameworkManager.getInstance().getConnectionConfigurationClass());
+    FormUtils.fromForms(connection.getFrameworkPart().getForms(),
+      frameworkConnection);
+
+    Object frameworkJob = ClassUtils.instantiate(
+      FrameworkManager.getInstance().getJobConfigurationClass(job.getType()));
+    FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
+
+    // Create request object
+    MSubmission summary = new MSubmission(jobId);
+    SubmissionRequest request = executionEngine.createSubmissionRequest();
+
+    // Save important variables to the submission request
+    request.setSummary(summary);
+    request.setConnector(connector);
+    request.setConfigConnectorConnection(connectorConnection);
+    request.setConfigConnectorJob(connectorJob);
+    request.setConfigFrameworkConnection(frameworkConnection);
+    request.setConfigFrameworkJob(frameworkJob);
+    request.setJobType(job.getType());
+    request.setJobName(job.getName());
+    request.setJobId(job.getPersistenceId());
+    request.setNotificationUrl(notificationBaseUrl + jobId);
+
+    // Let's register all important jars
+    // sqoop-common
+    request.addJarForClass(MapContext.class);
+    // sqoop-core
+    request.addJarForClass(FrameworkManager.class);
+    // sqoop-spi
+    request.addJarForClass(SqoopConnector.class);
+    // Execution engine jar
+    request.addJarForClass(executionEngine.getClass());
+    // Connector in use
+    request.addJarForClass(connector.getClass());
+
+    // Extra libraries that Sqoop code requires
+    request.addJarForClass(JSONValue.class);
+
+    // Get connector callbacks
+    switch (job.getType()) {
+      case IMPORT:
+        request.setConnectorCallbacks(connector.getImporter());
+        break;
+      case EXPORT:
+        request.setConnectorCallbacks(connector.getExporter());
+        break;
+      default:
+        throw new SqoopException(FrameworkError.FRAMEWORK_0005,
+          "Unsupported job type " + job.getType().name());
+    }
+    LOG.debug("Using callbacks: " + request.getConnectorCallbacks());
+
+    // Initialize submission from connector perspective
+    CallbackBase baseCallbacks = request.getConnectorCallbacks();
+
+    Class<? extends Initializer> initializerClass = baseCallbacks
+      .getInitializer();
+    Initializer initializer = (Initializer) ClassUtils
+      .instantiate(initializerClass);
+
+    if (initializer == null) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0006,
+        "Can't create initializer instance: " + initializerClass.getName());
+    }
+
+    // Initializer context
+    InitializerContext initializerContext = new InitializerContext(
+      request.getConnectorContext());
+
+    // Initialize submission from connector perspective
+    initializer.initialize(initializerContext,
+      request.getConfigConnectorConnection(),
+      request.getConfigConnectorJob());
+
+    // Add job specific jars to
+    request.addJars(initializer.getJars(initializerContext,
+      request.getConfigConnectorConnection(),
+      request.getConfigConnectorJob()));
+
+    // Retrieve and persist the schema
+    request.getSummary().setConnectorSchema(initializer.getSchema(
+      initializerContext,
+      request.getConfigConnectorConnection(),
+      request.getConfigConnectorJob()
+      ));
+
+    // Bootstrap job from framework perspective
+    switch (job.getType()) {
+      case IMPORT:
+        prepareImportSubmission(request);
+        break;
+      case EXPORT:
+        prepareExportSubmission(request);
+        break;
+      default:
+        throw new SqoopException(FrameworkError.FRAMEWORK_0005,
+          "Unsupported job type " + job.getType().name());
+    }
+
+    // Make sure that this job id is not currently running and submit the job
+    // only if it's not.
+    synchronized (getClass()) {
+      MSubmission lastSubmission = repository.findSubmissionLastForJob(jobId);
+      if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
+        throw new SqoopException(FrameworkError.FRAMEWORK_0002,
+          "Job with id " + jobId);
+      }
+
+      // TODO(jarcec): We might need to catch all exceptions here to ensure
+      // that Destroyer will be executed in all cases.
+      boolean submitted = submissionEngine.submit(request);
+      if (!submitted) {
+        destroySubmission(request);
+        summary.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
+      }
+
+      repository.createSubmission(summary);
+    }
+
+    // Return job status most recent
+    return summary;
+  }
+
+  private void prepareImportSubmission(SubmissionRequest request) {
+    ImportJobConfiguration jobConfiguration = (ImportJobConfiguration) request
+      .getConfigFrameworkJob();
+
+    // Initialize the map-reduce part (all sort of required classes, ...)
+    request.setOutputDirectory(jobConfiguration.output.outputDirectory);
+
+    // We're directly moving configured number of extractors and loaders to
+    // underlying request object. In the future we might need to throttle this
+    // count based on other running jobs to meet our SLAs.
+    request.setExtractors(jobConfiguration.throttling.extractors);
+    request.setLoaders(jobConfiguration.throttling.loaders);
+
+    // Delegate rest of the job to execution engine
+    executionEngine.prepareImportSubmission(request);
+  }
+
+  private void prepareExportSubmission(SubmissionRequest request) {
+    ExportJobConfiguration jobConfiguration = (ExportJobConfiguration) request
+      .getConfigFrameworkJob();
+
+    // We're directly moving configured number of extractors and loaders to
+    // underlying request object. In the future we might need to throttle this
+    // count based on other running jobs to meet our SLAs.
+    request.setExtractors(jobConfiguration.throttling.extractors);
+    request.setLoaders(jobConfiguration.throttling.loaders);
+
+    // Delegate rest of the job to execution engine
+    executionEngine.prepareExportSubmission(request);
+  }
+
+  /**
+   * Callback that will be called only if we failed to submit the job to the
+   * remote cluster.
+   */
+  private void destroySubmission(SubmissionRequest request) {
+    CallbackBase baseCallbacks = request.getConnectorCallbacks();
+
+    Class<? extends Destroyer> destroyerClass = baseCallbacks.getDestroyer();
+    Destroyer destroyer = (Destroyer) ClassUtils.instantiate(destroyerClass);
+
+    if (destroyer == null) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0006,
+        "Can't create destroyer instance: " + destroyerClass.getName());
+    }
+
+    DestroyerContext destroyerContext = new DestroyerContext(
+      request.getConnectorContext(), false, request.getSummary()
+        .getConnectorSchema());
+
+    // Initialize submission from connector perspective
+    destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(),
+      request.getConfigConnectorJob());
+  }
+
+  public MSubmission stop(long jobId) {
+    Repository repository = RepositoryManager.getInstance().getRepository();
+    MSubmission submission = repository.findSubmissionLastForJob(jobId);
+
+    if (submission == null || !submission.getStatus().isRunning()) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0003,
+        "Job with id " + jobId + " is not running");
+    }
+
+    String externalId = submission.getExternalId();
+    submissionEngine.stop(externalId);
+
+    // Fetch new information to verify that the stop command has actually worked
+    update(submission);
+
+    // Return updated structure
+    return submission;
+  }
+
+  public MSubmission status(long jobId) {
+    Repository repository = RepositoryManager.getInstance().getRepository();
+    MSubmission submission = repository.findSubmissionLastForJob(jobId);
+
+    if (submission == null) {
+      return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
+    }
+
+    // If the submission is in running state, let's update it
+    if (submission.getStatus().isRunning()) {
+      update(submission);
+    }
+
+    return submission;
+  }
+
+  private void update(MSubmission submission) {
+    double progress = -1;
+    Counters counters = null;
+    String externalId = submission.getExternalId();
+    SubmissionStatus newStatus = submissionEngine.status(externalId);
+    String externalLink = submissionEngine.externalLink(externalId);
+
+    if (newStatus.isRunning()) {
+      progress = submissionEngine.progress(externalId);
+    } else {
+      counters = submissionEngine.counters(externalId);
+    }
+
+    submission.setStatus(newStatus);
+    submission.setProgress(progress);
+    submission.setCounters(counters);
+    submission.setExternalLink(externalLink);
+    submission.setLastUpdateDate(new Date());
+
+    RepositoryManager.getInstance().getRepository()
+      .updateSubmission(submission);
+  }
+
+  @Override
+  public synchronized void configurationChanged() {
+    LOG.info("Begin submission engine manager reconfiguring");
+    MapContext newContext = SqoopConfiguration.getInstance().getContext();
+    MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
+
+    String newSubmissionEngineClassName = newContext
+      .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
+    if (newSubmissionEngineClassName == null
+      || newSubmissionEngineClassName.trim().length() == 0) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0001,
+        newSubmissionEngineClassName);
+    }
+
+    String oldSubmissionEngineClassName = oldContext
+      .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
+    if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
+      LOG.warn("Submission engine cannot be replaced at the runtime. " +
+        "You might need to restart the server.");
+    }
+
+    String newExecutionEngineClassName = newContext
+      .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
+    if (newExecutionEngineClassName == null
+      || newExecutionEngineClassName.trim().length() == 0) {
+      throw new SqoopException(FrameworkError.FRAMEWORK_0007,
+        newExecutionEngineClassName);
+    }
+
+    String oldExecutionEngineClassName = oldContext
+      .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
+    if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
+      LOG.warn("Execution engine cannot be replaced at the runtime. " +
+        "You might need to restart the server.");
+    }
+
+    // Set up worker threads
+    purgeThreshold = newContext.getLong(
+      FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
+      DEFAULT_PURGE_THRESHOLD
+      );
+    purgeSleep = newContext.getLong(
+      FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
+      DEFAULT_PURGE_SLEEP
+      );
+    purgeThread.interrupt();
+
+    updateSleep = newContext.getLong(
+      FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
+      DEFAULT_UPDATE_SLEEP
+      );
+    updateThread.interrupt();
+
+    LOG.info("Submission engine manager reconfigured.");
+  }
+
+  private class PurgeThread extends Thread {
+    public PurgeThread() {
+      super("PurgeThread");
+    }
+
+    public void run() {
+      LOG.info("Starting submission manager purge thread");
+
+      while (running) {
+        try {
+          LOG.info("Purging old submissions");
+          Date threshold = new Date((new Date()).getTime() - purgeThreshold);
+          RepositoryManager.getInstance().getRepository()
+            .purgeSubmissions(threshold);
+          Thread.sleep(purgeSleep);
+        } catch (InterruptedException e) {
+          LOG.debug("Purge thread interrupted", e);
+        }
+      }
+
+      LOG.info("Ending submission manager purge thread");
+    }
+  }
+
+  private class UpdateThread extends Thread {
+    public UpdateThread() {
+      super("UpdateThread");
+    }
+
+    public void run() {
+      LOG.info("Starting submission manager update thread");
+
+      while (running) {
+        try {
+          LOG.debug("Updating running submissions");
+
+          // Let's get all running submissions from repository to check them out
+          List<MSubmission> unfinishedSubmissions =
+            RepositoryManager.getInstance().getRepository()
+              .findSubmissionsUnfinished();
+
+          for (MSubmission submission : unfinishedSubmissions) {
+            update(submission);
+          }
+
+          Thread.sleep(updateSleep);
+        } catch (InterruptedException e) {
+          LOG.debug("Purge thread interrupted", e);
+        }
+      }
+
+      LOG.info("Ending submission manager update thread");
+    }
+  }
 }


Mime
View raw message