sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [14/52] [abbrv] SQOOP-1497: Sqoop2: Entity Nomenclature Revisited
Date Fri, 10 Oct 2014 02:51:43 GMT
http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
deleted file mode 100644
index 8149d1c..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ /dev/null
@@ -1,710 +0,0 @@
-
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.ConnectorManager;
-import org.apache.sqoop.framework.configuration.JobConfiguration;
-import org.apache.sqoop.request.HttpEventContext;
-import org.apache.sqoop.connector.idf.IntermediateDataFormat;
-import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.core.Reconfigurable;
-import org.apache.sqoop.core.SqoopConfiguration;
-import org.apache.sqoop.core.SqoopConfiguration.CoreConfigurationListener;
-import org.apache.sqoop.job.etl.*;
-import org.apache.sqoop.model.FormUtils;
-import org.apache.sqoop.model.MConnection;
-import org.apache.sqoop.model.MJob;
-import org.apache.sqoop.model.MSubmission;
-import org.apache.sqoop.repository.Repository;
-import org.apache.sqoop.repository.RepositoryManager;
-import org.apache.sqoop.schema.Schema;
-import org.apache.sqoop.submission.SubmissionStatus;
-import org.apache.sqoop.submission.counter.Counters;
-import org.apache.sqoop.utils.ClassUtils;
-import org.json.simple.JSONValue;
-
-import java.util.Date;
-import java.util.List;
-
-public class JobManager implements Reconfigurable {
-  /**
-   * Logger object.
-   */
-  private static final Logger LOG = Logger.getLogger(JobManager.class);
-
-  /**
-   * Private instance to singleton of this class.
-   */
-  private static JobManager instance;
-  /**
-   * Create default object by default.
-   *
-   * Every Sqoop server application needs one so this should not be performance
-   * issue.
-   */
-  static {
-    instance = new JobManager();
-  }
-
-  /**
-   * Return current instance.
-   *
-   * @return Current instance
-   */
-  public static JobManager getInstance() {
-    return instance;
-  }
-
-  /**
-   * Allows to set instance in case that it's need.
-   *
-   * This method should not be normally used as the default instance should be
-   * sufficient. One target user use case for this method are unit tests.
-   *
-   * @param newInstance
-   *          New instance
-   */
-  public static void setInstance(JobManager newInstance) {
-    instance = newInstance;
-  }
-
-  /**
-   * Default interval for purging old submissions from repository.
-   */
-  private static final long DEFAULT_PURGE_THRESHOLD = 24 * 60 * 60 * 1000;
-
-  /**
-   * Default sleep interval for purge thread.
-   */
-  private static final long DEFAULT_PURGE_SLEEP = 24 * 60 * 60 * 1000;
-
-  /**
-   * Default interval for update thread.
-   */
-  private static final long DEFAULT_UPDATE_SLEEP = 60 * 5 * 1000;
-
-  /**
-   * Configured submission engine instance
-   */
-  private SubmissionEngine submissionEngine;
-
-  /**
-   * Configured execution engine instance
-   */
-  private ExecutionEngine executionEngine;
-
-  /**
-   * Purge thread that will periodically remove old submissions from repository.
-   */
-  private PurgeThread purgeThread = null;
-
-  /**
-   * Update thread that will periodically check status of running submissions.
-   */
-  private UpdateThread updateThread = null;
-
-  /**
-   * Synchronization variable between threads.
-   */
-  private boolean running = true;
-
-  /**
-   * Specifies how old submissions should be removed from repository.
-   */
-  private long purgeThreshold;
-
-  /**
-   * Number of milliseconds for purge thread to sleep.
-   */
-  private long purgeSleep;
-
-  /**
-   * Number of milliseconds for update thread to slepp.
-   */
-  private long updateSleep;
-
-  /**
-   * Base notification URL.
-   *
-   * Framework manager will always add job id.
-   */
-  private String notificationBaseUrl;
-
-  /**
-   * Set notification base URL.
-   *
-   * @param url
-   *          Base URL
-   */
-  public void setNotificationBaseUrl(String url) {
-    LOG.debug("Setting notification base URL to " + url);
-    notificationBaseUrl = url;
-  }
-
-  /**
-   * Get base notification url.
-   *
-   * @return String representation of the URL
-   */
-  public String getNotificationBaseUrl() {
-    return notificationBaseUrl;
-  }
-
-  public synchronized void destroy() {
-    LOG.trace("Begin submission engine manager destroy");
-
-    running = false;
-
-    try {
-      purgeThread.interrupt();
-      purgeThread.join();
-    } catch (InterruptedException e) {
-      // TODO(jarcec): Do I want to wait until it actually finish here?
-      LOG.error("Interrupted joining purgeThread");
-    }
-
-    try {
-      updateThread.interrupt();
-      updateThread.join();
-    } catch (InterruptedException e) {
-      // TODO(jarcec): Do I want to wait until it actually finish here?
-      LOG.error("Interrupted joining updateThread");
-    }
-
-    if (submissionEngine != null) {
-      submissionEngine.destroy();
-    }
-
-    if (executionEngine != null) {
-      executionEngine.destroy();
-    }
-  }
-
-  public synchronized void initialize() {
-    LOG.trace("Begin submission engine manager initialization");
-    MapContext context = SqoopConfiguration.getInstance().getContext();
-
-    // Let's load configured submission engine
-    String submissionEngineClassName =
-      context.getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
-
-    submissionEngine = (SubmissionEngine) ClassUtils
-      .instantiate(submissionEngineClassName);
-    if (submissionEngine == null) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0001,
-        submissionEngineClassName);
-    }
-
-    submissionEngine.initialize(context,
-      FrameworkConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
-
-    // Execution engine
-    String executionEngineClassName =
-      context.getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
-
-    executionEngine = (ExecutionEngine) ClassUtils
-      .instantiate(executionEngineClassName);
-    if (executionEngine == null) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0007,
-        executionEngineClassName);
-    }
-
-    // We need to make sure that user has configured compatible combination of
-    // submission engine and execution engine
-    if (!submissionEngine
-      .isExecutionEngineSupported(executionEngine.getClass())) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0008);
-    }
-
-    executionEngine.initialize(context,
-      FrameworkConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
-
-    // Set up worker threads
-    purgeThreshold = context.getLong(
-      FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
-      DEFAULT_PURGE_THRESHOLD
-      );
-    purgeSleep = context.getLong(
-      FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
-      DEFAULT_PURGE_SLEEP
-      );
-
-    purgeThread = new PurgeThread();
-    purgeThread.start();
-
-    updateSleep = context.getLong(
-      FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
-      DEFAULT_UPDATE_SLEEP
-      );
-
-    updateThread = new UpdateThread();
-    updateThread.start();
-
-    SqoopConfiguration.getInstance().getProvider()
-      .registerListener(new CoreConfigurationListener(this));
-
-    LOG.info("Submission manager initialized: OK");
-  }
-
-  public MSubmission submit(long jobId, HttpEventContext ctx) {
-
-    MSubmission mSubmission = createJobSubmission(ctx, jobId);
-    JobRequest jobRequest = createJobRequest(jobId, mSubmission);
-    // Bootstrap job to execute
-    prepareJob(jobRequest);
-    // Make sure that this job id is not currently running and submit the job
-    // only if it's not.
-    synchronized (getClass()) {
-      MSubmission lastSubmission = RepositoryManager.getInstance().getRepository()
-          .findSubmissionLastForJob(jobId);
-      if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
-        throw new SqoopException(FrameworkError.FRAMEWORK_0002, "Job with id " + jobId);
-      }
-      // TODO(Abe): Call multiple destroyers.
-      // TODO(jarcec): We might need to catch all exceptions here to ensure
-      // that Destroyer will be executed in all cases.
-      // NOTE: the following is a blocking call
-      boolean success = submissionEngine.submit(jobRequest);
-      if (!success) {
-        destroySubmission(jobRequest);
-        mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
-      }
-      RepositoryManager.getInstance().getRepository().createSubmission(mSubmission);
-    }
-    return mSubmission;
-  }
-
-  private JobRequest createJobRequest(long jobId, MSubmission submission) {
-    // get job
-    MJob job = getJob(jobId);
-
-    // get from/to connections for the job
-    MConnection fromConnection = getConnection(job.getConnectionId(Direction.FROM));
-    MConnection toConnection = getConnection(job.getConnectionId(Direction.TO));
-
-    // get from/to connectors for the connection
-    SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId());
-    validateSupportedDirection(fromConnector, Direction.FROM);
-    SqoopConnector toConnector = getConnector(toConnection.getConnectorId());
-    validateSupportedDirection(toConnector, Direction.TO);
-
-    // Transform config to fromConnector specific classes
-    Object fromConnectionConfig = ClassUtils.instantiate(fromConnector
-        .getConnectionConfigurationClass());
-    FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), fromConnectionConfig);
-
-    // Transform config to toConnector specific classes
-    Object toConnectorConfig = ClassUtils
-        .instantiate(toConnector.getConnectionConfigurationClass());
-    FormUtils.fromForms(toConnection.getConnectorPart().getForms(), toConnectorConfig);
-
-    Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
-    FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromJob);
-
-    Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
-    FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob);
-
-    // Transform framework specific configs
-    // Q(VB) : Aren't the following 2 exactly the same?
-    Object fromFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance()
-        .getConnectionConfigurationClass());
-    FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), fromFrameworkConnection);
-
-    Object toFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance()
-        .getConnectionConfigurationClass());
-    FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), toFrameworkConnection);
-
-    Object frameworkJob = ClassUtils.instantiate(FrameworkManager.getInstance()
-        .getJobConfigurationClass());
-    FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
-
-    // Create a job request for submit/execution
-    JobRequest jobRequest = executionEngine.createJobRequest();
-    // Save important variables to the job request
-    jobRequest.setSummary(submission);
-    jobRequest.setConnector(Direction.FROM, fromConnector);
-    jobRequest.setConnector(Direction.TO, toConnector);
-    jobRequest.setConnectorConnectionConfig(Direction.FROM, fromConnectionConfig);
-    jobRequest.setConnectorConnectionConfig(Direction.TO, toConnectorConfig);
-    jobRequest.setConnectorJobConfig(Direction.FROM, fromJob);
-    jobRequest.setConnectorJobConfig(Direction.TO, toJob);
-    // TODO(Abe): Should we actually have 2 different Framework Connection config objects?
-    jobRequest.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection);
-    jobRequest.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection);
-    jobRequest.setConfigFrameworkJob(frameworkJob);
-    jobRequest.setJobName(job.getName());
-    jobRequest.setJobId(job.getPersistenceId());
-    jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
-    Class<? extends IntermediateDataFormat<?>> dataFormatClass =
-      fromConnector.getIntermediateDataFormat();
-    jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
-
-
-    jobRequest.setFrom(fromConnector.getFrom());
-    jobRequest.setTo(toConnector.getTo());
-
-    addStandardJars(jobRequest);
-    addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
-    addConnectorInitializerJars(jobRequest, Direction.FROM);
-    addConnectorInitializerJars(jobRequest, Direction.TO);
-
-    Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
-    Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
-
-    // TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378
-    if (fromSchema != null) {
-      jobRequest.getSummary().setFromSchema(fromSchema);
-    }
-    else {
-      jobRequest.getSummary().setFromSchema(toSchema);
-    }
-    LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
-    return jobRequest;
-  }
-
-  private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector,
-      SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) {
-    jobRequest.addJarForClass(fromConnector.getClass());
-    jobRequest.addJarForClass(toConnector.getClass());
-    jobRequest.addJarForClass(dataFormatClass);
-  }
-
-  private void addStandardJars(JobRequest jobRequest) {
-    // Let's register all important jars
-    // sqoop-common
-    jobRequest.addJarForClass(MapContext.class);
-    // sqoop-core
-    jobRequest.addJarForClass(FrameworkManager.class);
-    // sqoop-spi
-    jobRequest.addJarForClass(SqoopConnector.class);
-    // Execution engine jar
-    jobRequest.addJarForClass(executionEngine.getClass());
-    // Extra libraries that Sqoop code requires
-    jobRequest.addJarForClass(JSONValue.class);
-  }
-
-  MSubmission createJobSubmission(HttpEventContext ctx, long jobId) {
-    MSubmission summary = new MSubmission(jobId);
-    summary.setCreationUser(ctx.getUsername());
-    summary.setLastUpdateUser(ctx.getUsername());
-    return summary;
-  }
-
-  SqoopConnector getConnector(long connnectorId) {
-    return ConnectorManager.getInstance().getConnector(connnectorId);
-  }
-
-  void validateSupportedDirection(SqoopConnector connector, Direction direction) {
-    // Make sure that connector supports the given direction
-    if (!connector.getSupportedDirections().contains(direction)) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0011, "Connector: "
-          + connector.getClass().getCanonicalName());
-    }
-  }
-
-  MConnection getConnection(long connectionId) {
-    MConnection connection = RepositoryManager.getInstance().getRepository()
-        .findConnection(connectionId);
-    if (!connection.getEnabled()) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0010, "Connection id: "
-          + connection.getPersistenceId());
-    }
-    return connection;
-  }
-
-  MJob getJob(long jobId) {
-    MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId);
-    if (job == null) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0004, "Unknown job id: " + jobId);
-    }
-
-    if (!job.getEnabled()) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0009, "Job id: " + job.getPersistenceId());
-    }
-    return job;
-  }
-  
-  private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) {
-
-    Initializer initializer = getConnectorInitializer(jobRequest, direction);
-
-    // Initializer context
-    InitializerContext initializerContext = getInitializerContext(jobRequest, direction);
-
-    // Initialize submission from the connector perspective
-    initializer.initialize(initializerContext, jobRequest.getConnectorConnectionConfig(direction),
-        jobRequest.getConnectorJobConfig(direction));
-
-    // TODO(Abe): Alter behavior of Schema here.
-    return initializer.getSchema(initializerContext,
-        jobRequest.getConnectorConnectionConfig(direction),
-        jobRequest.getConnectorJobConfig(direction));
-  }
-
-  private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
-
-    Initializer initializer = getConnectorInitializer(jobRequest, direction);
-    InitializerContext initializerContext = getInitializerContext(jobRequest, direction);
-    // Add job specific jars to
-    jobRequest.addJars(initializer.getJars(initializerContext,
-        jobRequest.getConnectorConnectionConfig(direction),
-        jobRequest.getConnectorJobConfig(direction)));
-  }
-
-  private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
-    Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo();
-    Class<? extends Initializer> initializerClass = transferable.getInitializer();
-    Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass);
-
-    if (initializer == null) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0006,
-          "Can't create connector initializer instance: " + initializerClass.getName());
-    }
-    return initializer;
-  }
-
-  private InitializerContext getInitializerContext(JobRequest jobRequest, Direction direction) {
-    return new InitializerContext(jobRequest.getConnectorContext(direction));
-  }
-
-  void prepareJob(JobRequest request) {
-    JobConfiguration jobConfiguration = (JobConfiguration) request.getConfigFrameworkJob();
-    // We're directly moving configured number of extractors and loaders to
-    // underlying request object. In the future we might need to throttle this
-    // count based on other running jobs to meet our SLAs.
-    request.setExtractors(jobConfiguration.throttling.extractors);
-    request.setLoaders(jobConfiguration.throttling.loaders);
-
-    // Delegate rest of the job to execution engine
-    executionEngine.prepareJob(request);
-  }
-
-  /**
-   * Callback that will be called only if we failed to submit the job to the
-   * remote cluster.
-   */
-  void destroySubmission(JobRequest request) {
-    Transferable from = request.getFrom();
-    Transferable to = request.getTo();
-
-    Class<? extends Destroyer> fromDestroyerClass = from.getDestroyer();
-    Class<? extends Destroyer> toDestroyerClass = to.getDestroyer();
-    Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass);
-    Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass);
-
-    if (fromDestroyer == null) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0006,
-        "Can't create toDestroyer instance: " + fromDestroyerClass.getName());
-    }
-
-    if (toDestroyer == null) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0006,
-          "Can't create toDestroyer instance: " + toDestroyerClass.getName());
-    }
-
-    // TODO(Abe): Update context to manage multiple connectors. As well as summary.
-    DestroyerContext fromDestroyerContext = new DestroyerContext(
-      request.getConnectorContext(Direction.FROM), false, request.getSummary()
-        .getFromSchema());
-    DestroyerContext toDestroyerContext = new DestroyerContext(
-        request.getConnectorContext(Direction.TO), false, request.getSummary()
-        .getToSchema());
-
-    // destroy submission from connector perspective
-    fromDestroyer.destroy(fromDestroyerContext, request.getConnectorConnectionConfig(Direction.FROM),
-        request.getConnectorJobConfig(Direction.FROM));
-    toDestroyer.destroy(toDestroyerContext, request.getConnectorConnectionConfig(Direction.TO),
-        request.getConnectorJobConfig(Direction.TO));
-  }
-
-  public MSubmission stop(long jobId, HttpEventContext ctx) {
-
-    Repository repository = RepositoryManager.getInstance().getRepository();
-    MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
-
-    if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0003, "Job with id " + jobId
-          + " is not running");
-    }
-    submissionEngine.stop(mSubmission.getExternalId());
-
-    mSubmission.setLastUpdateUser(ctx.getUsername());
-
-    // Fetch new information to verify that the stop command has actually worked
-    update(mSubmission);
-
-    // Return updated structure
-    return mSubmission;
-  }
-
-  public MSubmission status(long jobId) {
-    Repository repository = RepositoryManager.getInstance().getRepository();
-    MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
-
-    if (mSubmission == null) {
-      return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
-    }
-
-    // If the submission is in running state, let's update it
-    if (mSubmission.getStatus().isRunning()) {
-      update(mSubmission);
-    }
-
-    return mSubmission;
-  }
-
-  private void update(MSubmission submission) {
-    double progress = -1;
-    Counters counters = null;
-    String externalId = submission.getExternalId();
-    SubmissionStatus newStatus = submissionEngine.status(externalId);
-    String externalLink = submissionEngine.externalLink(externalId);
-
-    if (newStatus.isRunning()) {
-      progress = submissionEngine.progress(externalId);
-    } else {
-      counters = submissionEngine.counters(externalId);
-    }
-
-    submission.setStatus(newStatus);
-    submission.setProgress(progress);
-    submission.setCounters(counters);
-    submission.setExternalLink(externalLink);
-    submission.setLastUpdateDate(new Date());
-
-    RepositoryManager.getInstance().getRepository()
-      .updateSubmission(submission);
-  }
-
-  @Override
-  public synchronized void configurationChanged() {
-    LOG.info("Begin submission engine manager reconfiguring");
-    MapContext newContext = SqoopConfiguration.getInstance().getContext();
-    MapContext oldContext = SqoopConfiguration.getInstance().getOldContext();
-
-    String newSubmissionEngineClassName = newContext
-      .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
-    if (newSubmissionEngineClassName == null
-      || newSubmissionEngineClassName.trim().length() == 0) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0001,
-        newSubmissionEngineClassName);
-    }
-
-    String oldSubmissionEngineClassName = oldContext
-      .getString(FrameworkConstants.SYSCFG_SUBMISSION_ENGINE);
-    if (!newSubmissionEngineClassName.equals(oldSubmissionEngineClassName)) {
-      LOG.warn("Submission engine cannot be replaced at the runtime. " +
-        "You might need to restart the server.");
-    }
-
-    String newExecutionEngineClassName = newContext
-      .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
-    if (newExecutionEngineClassName == null
-      || newExecutionEngineClassName.trim().length() == 0) {
-      throw new SqoopException(FrameworkError.FRAMEWORK_0007,
-        newExecutionEngineClassName);
-    }
-
-    String oldExecutionEngineClassName = oldContext
-      .getString(FrameworkConstants.SYSCFG_EXECUTION_ENGINE);
-    if (!newExecutionEngineClassName.equals(oldExecutionEngineClassName)) {
-      LOG.warn("Execution engine cannot be replaced at the runtime. " +
-        "You might need to restart the server.");
-    }
-
-    // Set up worker threads
-    purgeThreshold = newContext.getLong(
-      FrameworkConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
-      DEFAULT_PURGE_THRESHOLD
-      );
-    purgeSleep = newContext.getLong(
-      FrameworkConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
-      DEFAULT_PURGE_SLEEP
-      );
-    purgeThread.interrupt();
-
-    updateSleep = newContext.getLong(
-      FrameworkConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
-      DEFAULT_UPDATE_SLEEP
-      );
-    updateThread.interrupt();
-
-    LOG.info("Submission engine manager reconfigured.");
-  }
-
-  private class PurgeThread extends Thread {
-    public PurgeThread() {
-      super("PurgeThread");
-    }
-
-    public void run() {
-      LOG.info("Starting submission manager purge thread");
-
-      while (running) {
-        try {
-          LOG.info("Purging old submissions");
-          Date threshold = new Date((new Date()).getTime() - purgeThreshold);
-          RepositoryManager.getInstance().getRepository()
-            .purgeSubmissions(threshold);
-          Thread.sleep(purgeSleep);
-        } catch (InterruptedException e) {
-          LOG.debug("Purge thread interrupted", e);
-        }
-      }
-
-      LOG.info("Ending submission manager purge thread");
-    }
-  }
-
-  private class UpdateThread extends Thread {
-    public UpdateThread() {
-      super("UpdateThread");
-    }
-
-    public void run() {
-      LOG.info("Starting submission manager update thread");
-
-      while (running) {
-        try {
-          LOG.debug("Updating running submissions");
-
-          // Let's get all running submissions from repository to check them out
-          List<MSubmission> unfinishedSubmissions =
-            RepositoryManager.getInstance().getRepository()
-              .findSubmissionsUnfinished();
-
-          for (MSubmission submission : unfinishedSubmissions) {
-            update(submission);
-          }
-
-          Thread.sleep(updateSleep);
-        } catch (InterruptedException e) {
-          LOG.debug("Purge thread interrupted", e);
-        }
-      }
-
-      LOG.info("Ending submission manager update thread");
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobRequest.java b/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
deleted file mode 100644
index 1f77693..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/JobRequest.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.common.Direction;
-import org.apache.sqoop.common.DirectionError;
-import org.apache.sqoop.common.MutableMapContext;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.idf.IntermediateDataFormat;
-import org.apache.sqoop.connector.spi.SqoopConnector;
-import org.apache.sqoop.job.etl.Transferable;
-import org.apache.sqoop.model.MSubmission;
-import org.apache.sqoop.utils.ClassUtils;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Submission details class is used when creating new submission and contains
- * all information that we need to create a new submission (including mappers,
- * reducers, ...).
- */
-public class JobRequest {
-
-  /**
-   * Submission summary
-   */
-  MSubmission summary;
-
-  /**
-   * Original job name
-   */
-  String jobName;
-
-  /**
-   * Associated job (from metadata perspective) id
-   */
-  long jobId;
-
-  /**
-   * Connector instances associated with this submission request
-   */
-  SqoopConnector fromConnector;
-  SqoopConnector toConnector;
-
-  /**
-   * List of required local jars for the job
-   */
-  List<String> jars;
-
-  /**
-   * From entity
-   */
-  Transferable from;
-
-  /**
-   * To entity
-   */
-  Transferable to;
-
-  /**
-   * All configuration objects
-   */
-  Object fromConnectorConnectionConfig;
-  Object toConnectorConnectionConfig;
-  Object fromConnectorJobConfig;
-  Object toConnectorJobConfig;
-  Object fromFrameworkConnectionConfig;
-  Object toFrameworkConnectionConfig;
-  Object configFrameworkJob;
-
-  /**
-   * Connector context (submission specific configuration)
-   */
-  MutableMapContext fromConnectorContext;
-  MutableMapContext toConnectorContext;
-
-  /**
-   * Framework context (submission specific configuration)
-   */
-  MutableMapContext frameworkContext;
-
-  /**
-   * Optional notification URL for job progress
-   */
-  String notificationUrl;
-
-  /**
-   * Number of extractors
-   */
-  Integer extractors;
-
-  /**
-   * Number of loaders
-   */
-  Integer loaders;
-
-  /**
-   * The intermediate data format this submission should use.
-   */
-  Class<? extends IntermediateDataFormat> intermediateDataFormat;
-
-  public JobRequest() {
-    this.jars = new LinkedList<String>();
-    this.fromConnectorContext = new MutableMapContext();
-    this.toConnectorContext = new MutableMapContext();
-    this.frameworkContext = new MutableMapContext();
-    this.fromConnector = null;
-    this.toConnector = null;
-    this.fromConnectorConnectionConfig = null;
-    this.toConnectorConnectionConfig = null;
-    this.fromConnectorJobConfig = null;
-    this.toConnectorJobConfig = null;
-    this.fromFrameworkConnectionConfig = null;
-    this.toFrameworkConnectionConfig = null;
-  }
-
-  public MSubmission getSummary() {
-    return summary;
-  }
-
-  public void setSummary(MSubmission summary) {
-    this.summary = summary;
-  }
-
-  public String getJobName() {
-    return jobName;
-  }
-
-  public void setJobName(String jobName) {
-    this.jobName = jobName;
-  }
-
-  public long getJobId() {
-    return jobId;
-  }
-
-  public void setJobId(long jobId) {
-    this.jobId = jobId;
-  }
-
-  public SqoopConnector getConnector(Direction type) {
-    switch(type) {
-      case FROM:
-        return fromConnector;
-
-      case TO:
-        return toConnector;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public void setConnector(Direction type, SqoopConnector connector) {
-    switch(type) {
-      case FROM:
-        fromConnector = connector;
-        break;
-
-      case TO:
-        toConnector = connector;
-        break;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public List<String> getJars() {
-    return jars;
-  }
-
-  public void addJar(String jar) {
-    if(!jars.contains(jar)) {
-      jars.add(jar);
-    }
-  }
-
-  public void addJarForClass(Class klass) {
-    addJar(ClassUtils.jarForClass(klass));
-  }
-
-  public void addJars(List<String> jars) {
-    for(String j : jars) {
-      addJar(j);
-    }
-  }
-
-  public Transferable getFrom() {
-    return from;
-  }
-
-  public void setFrom(Transferable from) {
-    this.from = from;
-  }
-
-  public Transferable getTo() {
-    return to;
-  }
-
-  public void setTo(Transferable to) {
-    this.to = to;
-  }
-
-  public Object getConnectorConnectionConfig(Direction type) {
-    switch(type) {
-      case FROM:
-        return fromConnectorConnectionConfig;
-
-      case TO:
-        return toConnectorConnectionConfig;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public void setConnectorConnectionConfig(Direction type, Object config) {
-    switch(type) {
-      case FROM:
-        fromConnectorConnectionConfig = config;
-        break;
-      case TO:
-        toConnectorConnectionConfig = config;
-        break;
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public Object getConnectorJobConfig(Direction type) {
-    switch(type) {
-      case FROM:
-        return fromConnectorJobConfig;
-
-      case TO:
-        return toConnectorJobConfig;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public void setConnectorJobConfig(Direction type, Object config) {
-    switch(type) {
-      case FROM:
-        fromConnectorJobConfig = config;
-        break;
-      case TO:
-        toConnectorJobConfig = config;
-        break;
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public Object getFrameworkConnectionConfig(Direction type) {
-    switch(type) {
-      case FROM:
-        return fromFrameworkConnectionConfig;
-
-      case TO:
-        return toFrameworkConnectionConfig;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public void setFrameworkConnectionConfig(Direction type, Object config) {
-    switch(type) {
-      case FROM:
-        fromFrameworkConnectionConfig = config;
-        break;
-      case TO:
-        toFrameworkConnectionConfig = config;
-        break;
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public Object getConfigFrameworkJob() {
-    return configFrameworkJob;
-  }
-
-  public void setConfigFrameworkJob(Object config) {
-    configFrameworkJob = config;
-  }
-
-  public MutableMapContext getConnectorContext(Direction type) {
-    switch(type) {
-      case FROM:
-        return fromConnectorContext;
-
-      case TO:
-        return toConnectorContext;
-
-      default:
-        throw new SqoopException(DirectionError.DIRECTION_0000, "Direction: " + type);
-    }
-  }
-
-  public MutableMapContext getFrameworkContext() {
-    return frameworkContext;
-  }
-
-  public String getNotificationUrl() {
-    return notificationUrl;
-  }
-
-  public void setNotificationUrl(String url) {
-    this.notificationUrl = url;
-  }
-
-  public Integer getExtractors() {
-    return extractors;
-  }
-
-  public void setExtractors(Integer extractors) {
-    this.extractors = extractors;
-  }
-
-  public Integer getLoaders() {
-    return loaders;
-  }
-
-  public void setLoaders(Integer loaders) {
-    this.loaders = loaders;
-  }
-
-  public Class<? extends IntermediateDataFormat> getIntermediateDataFormat() {
-    return intermediateDataFormat;
-  }
-
-  public void setIntermediateDataFormat(Class<? extends IntermediateDataFormat> intermediateDataFormat) {
-    this.intermediateDataFormat = intermediateDataFormat;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
deleted file mode 100644
index 732be3b..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework;
-
-import org.apache.sqoop.common.MapContext;
-import org.apache.sqoop.submission.counter.Counters;
-import org.apache.sqoop.submission.SubmissionStatus;
-
-/**
- * Submission engine is responsible in conveying the information about the
- * job instances (submissions) to remote (hadoop) cluster.
- */
-public abstract class SubmissionEngine {
-
-  /**
-   * Initialize submission engine
-   *
-   * @param context Configuration context
-   * @param prefix Submission engine prefix
-   */
-  public void initialize(MapContext context, String prefix) {
-  }
-
-  /**
-   * Destroy submission engine when stopping server
-   */
-  public void destroy() {
-  }
-
-  /**
-   * Callback to verify that configured submission engine and execution engine
-   * are compatible.
-   *
-   * @param executionEngineClass Configured execution class.
-   * @return True if such execution engine is supported
-   */
-  public abstract boolean isExecutionEngineSupported(Class executionEngineClass);
-
-  /**
-   * Submit new job to remote (hadoop) cluster. This method *must* fill
-   * submission.getSummary.setExternalId(), otherwise Sqoop framework won't
-   * be able to track progress on this job!
-   *
-   * @return Return true if we were able to submit job to remote cluster.
-   */
-  public abstract boolean submit(JobRequest submission);
-
-  /**
-   * Hard stop for given submission.
-   *
-   * @param submissionId Submission internal id.
-   */
-  public abstract void stop(String submissionId);
-
-  /**
-   * Return status of given submission.
-   *
-   * @param submissionId Submission internal id.
-   * @return Current submission status.
-   */
-  public abstract SubmissionStatus status(String submissionId);
-
-  /**
-   * Return submission progress.
-   *
-   * Expected is number from interval <0, 1> denoting how far the processing
-   * has gone or -1 in case that this submission engine do not supports
-   * progress reporting.
-   *
-   * @param submissionId Submission internal id.
-   * @return {-1} union <0, 1>
-   */
-  public double progress(String submissionId) {
-    return -1;
-  }
-
-  /**
-   * Return statistics for given submission id.
-   *
-   * Sqoop framework will call counters only for submission in state SUCCEEDED,
-   * it's consider exceptional state to call this method for other states.
-   *
-   * @param submissionId Submission internal id.
-   * @return Submission statistics
-   */
-  public Counters counters(String submissionId) {
-    return null;
-  }
-
-  /**
-   * Return link to external web page with given submission.
-   *
-   * @param submissionId Submission internal id.
-   * @return Null in case that external page is not supported or available or
-   *  HTTP link to given submission.
-   */
-  public String externalLink(String submissionId) {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java
deleted file mode 100644
index 897d3c7..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ConnectionConfiguration.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-/**
- * Framework class representing connection configuration
- */
-@ConfigurationClass
-public class ConnectionConfiguration {
-
-  @Form SecurityForm security = new SecurityForm();
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java b/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
deleted file mode 100644
index 0abc611..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/JobConfiguration.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.ConfigurationClass;
-import org.apache.sqoop.model.Form;
-
-@ConfigurationClass
-public class JobConfiguration {
-  @Form
-  public ThrottlingForm throttling;
-
-  public JobConfiguration() {
-    throttling = new ThrottlingForm();
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java
deleted file mode 100644
index 8ab50ed..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/SecurityForm.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- * Security form
- */
-@FormClass
-public class SecurityForm {
-  @Input public Integer maxConnections;
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java b/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java
deleted file mode 100644
index c435f6b..0000000
--- a/core/src/main/java/org/apache/sqoop/framework/configuration/ThrottlingForm.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.framework.configuration;
-
-import org.apache.sqoop.model.FormClass;
-import org.apache.sqoop.model.Input;
-
-/**
- * Form to set up number of loaders and extractors
- */
-@FormClass
-public class ThrottlingForm {
-
-  @Input public Integer extractors;
-
-  @Input public Integer loaders;
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index fa119a5..3466116 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -23,9 +23,9 @@ import java.util.List;
 
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.model.MConnector;
-import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MSubmission;
 
@@ -43,13 +43,13 @@ public class JdbcRepository extends Repository {
   }
 
   /**
-   * Private interface to wrap specific code that requires fresh connection to
-   * repository with general code that will get the connection and handle
+   * Private interface to wrap specific code that requires fresh link to
+   * repository with general code that will get the link and handle
    * exceptions.
    */
   private interface DoWithConnection {
     /**
-     * Do what is needed to be done with given connection object.
+     * Do what is needed to be done with given link object.
      *
      * @param conn Connection to metadata repository.
      * @return Arbitrary value
@@ -62,7 +62,7 @@ public class JdbcRepository extends Repository {
   }
 
   /**
-   * Handle transaction and connection functionality and delegate action to
+   * Handle transaction and link functionality and delegate action to
    * given delegator.
    *
    * @param delegator Code for specific action
@@ -77,7 +77,7 @@ public class JdbcRepository extends Repository {
     boolean shouldCloseTxn = false;
 
     try {
-      // Get transaction and connection
+      // Get transaction and link
       Connection conn;
       if (tx == null) {
         tx = getTransaction();
@@ -205,6 +205,7 @@ public class JdbcRepository extends Repository {
   /**
    * {@inheritDoc}
    */
+    @SuppressWarnings("unchecked")
     @Override
     public List<MConnector> findConnectors() {
       return (List<MConnector>) doWithConnection(new DoWithConnection() {
@@ -219,24 +220,24 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  public MFramework registerFramework(final MFramework mFramework, final boolean autoUpgrade) {
-    return (MFramework) doWithConnection(new DoWithConnection() {
+  public MDriverConfig registerDriverConfig(final MDriverConfig mDriverConfig, final boolean autoUpgrade) {
+    return (MDriverConfig) doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
-        MFramework result = handler.findFramework(conn);
+        MDriverConfig result = handler.findDriverConfig(conn);
         if (result == null) {
-          handler.registerFramework(mFramework, conn);
-          return mFramework;
+          handler.registerDriverConfig(mDriverConfig, conn);
+          return mDriverConfig;
         } else {
-          // We're currently not serializing framework version into repository
+          // We're currently not serializing version into repository
           // so let's just compare the structure to see if we need upgrade.
-          if(!mFramework.equals(result)) {
+          if(!mDriverConfig.equals(result)) {
             if (autoUpgrade) {
-              upgradeFramework(mFramework);
-              return mFramework;
+              upgradeDriverConfig(mDriverConfig);
+              return mDriverConfig;
             } else {
               throw new SqoopException(RepositoryError.JDBCREPO_0026,
-                "Framework: " + mFramework.getPersistenceId());
+                "DriverConfig: " + mDriverConfig.getPersistenceId());
             }
           }
           return result;
@@ -249,15 +250,15 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  public void createConnection(final MConnection connection) {
+  public void createLink(final MLink link) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
-        if(connection.hasPersistenceId()) {
+        if(link.hasPersistenceId()) {
           throw new SqoopException(RepositoryError.JDBCREPO_0015);
         }
 
-        handler.createConnection(connection, conn);
+        handler.createLink(link, conn);
         return null;
       }
     });
@@ -267,28 +268,27 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  public void updateConnection(final MConnection connection) {
-    updateConnection(connection, null);
+  public void updateLink(final MLink link) {
+    updateLink(link, null);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public void updateConnection(final MConnection connection,
-    RepositoryTransaction tx) {
+  public void updateLink(final MLink link, RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
-       if(!connection.hasPersistenceId()) {
+        if (!link.hasPersistenceId()) {
           throw new SqoopException(RepositoryError.JDBCREPO_0016);
         }
-        if(!handler.existsConnection(connection.getPersistenceId(), conn)) {
-          throw new SqoopException(RepositoryError.JDBCREPO_0017,
-            "Invalid id: " + connection.getPersistenceId());
+        if (!handler.existsLink(link.getPersistenceId(), conn)) {
+          throw new SqoopException(RepositoryError.JDBCREPO_0017, "Invalid id: "
+              + link.getPersistenceId());
         }
 
-        handler.updateConnection(connection, conn);
+        handler.updateLink(link, conn);
         return null;
       }
     }, (JdbcRepositoryTransaction) tx);
@@ -298,16 +298,16 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  public void enableConnection(final long connectionId, final boolean enabled) {
+  public void enableLink(final long linkId, final boolean enabled) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
-        if(!handler.existsConnection(connectionId, conn)) {
+        if(!handler.existsLink(linkId, conn)) {
           throw new SqoopException(RepositoryError.JDBCREPO_0017,
-            "Invalid id: " + connectionId);
+            "Invalid id: " + linkId);
         }
 
-        handler.enableConnection(connectionId, enabled, conn);
+        handler.enableLink(linkId, enabled, conn);
         return null;
       }
     });
@@ -317,20 +317,20 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  public void deleteConnection(final long connectionId) {
+  public void deleteLink(final long linkId) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
-        if(!handler.existsConnection(connectionId, conn)) {
+        if(!handler.existsLink(linkId, conn)) {
           throw new SqoopException(RepositoryError.JDBCREPO_0017,
-            "Invalid id: " + connectionId);
+            "Invalid id: " + linkId);
         }
-        if(handler.inUseConnection(connectionId, conn)) {
+        if(handler.inUseLink(linkId, conn)) {
           throw new SqoopException(RepositoryError.JDBCREPO_0021,
-            "Id in use: " + connectionId);
+            "Id in use: " + linkId);
         }
 
-        handler.deleteConnection(connectionId, conn);
+        handler.deleteLink(linkId, conn);
         return null;
       }
     });
@@ -340,11 +340,11 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  public MConnection findConnection(final long connectionId) {
-    return (MConnection) doWithConnection(new DoWithConnection() {
+  public MLink findLink(final long connectionId) {
+    return (MLink) doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
-        return handler.findConnection(connectionId, conn);
+        return handler.findLink(connectionId, conn);
       }
     });
   }
@@ -354,11 +354,11 @@ public class JdbcRepository extends Repository {
    */
   @SuppressWarnings("unchecked")
   @Override
-  public List<MConnection> findConnections() {
-    return (List<MConnection>) doWithConnection(new DoWithConnection() {
+  public List<MLink> findLinks() {
+    return (List<MLink>) doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
-        return handler.findConnections(conn);
+        return handler.findLinks(conn);
       }
     });
   }
@@ -601,12 +601,12 @@ public class JdbcRepository extends Repository {
    * {@inheritDoc}
    */
   @Override
-  public List<MConnection> findConnectionsForConnector(final long
+  public List<MLink> findLinksForConnector(final long
     connectorID) {
-    return (List<MConnection>) doWithConnection(new DoWithConnection() {
+    return (List<MLink>) doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        return handler.findConnectionsForConnector(connectorID, conn);
+        return handler.findLinksForConnector(connectorID, conn);
       }
     });
   }
@@ -637,12 +637,11 @@ public class JdbcRepository extends Repository {
   }
 
   @Override
-  protected void deleteConnectionInputs(final long connectionID,
-    RepositoryTransaction tx) {
+  protected void deleteLinkInputs(final long linkId, RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        handler.deleteConnectionInputs(connectionID, conn);
+        handler.deleteLinkInputs(linkId, conn);
         return null;
       }
     }, (JdbcRepositoryTransaction) tx);
@@ -665,12 +664,11 @@ public class JdbcRepository extends Repository {
   }
 
 
-  protected void updateFramework(final MFramework mFramework,
-    RepositoryTransaction tx) {
+  protected void updateDriverConfig(final MDriverConfig mDriverConfig, RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) throws Exception {
-        handler.updateFramework(mFramework, conn);
+        handler.updateDriverConfig(mDriverConfig, conn);
         return null;
       }
     }, (JdbcRepositoryTransaction) tx);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/049994a0/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index 4de3134..a743491 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -21,9 +21,9 @@ import java.sql.Connection;
 import java.util.Date;
 import java.util.List;
 
-import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.model.MConnector;
-import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MDriverConfig;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MSubmission;
 
@@ -41,11 +41,10 @@ public abstract class JdbcRepositoryHandler {
 
   /**
    * Search for connector with given name in repository.
-   *
-   * And return corresponding metadata structure.
+   * And return corresponding connector structure.
    *
    * @param shortName Connector unique name
-   * @param conn JDBC connection for querying repository.
+   * @param conn JDBC link for querying repository.
    * @return null if connector is not yet registered in repository or
    *   loaded representation.
    */
@@ -65,26 +64,26 @@ public abstract class JdbcRepositoryHandler {
    * already registered or present in the repository.
    *
    * @param mc Connector that should be registered.
-   * @param conn JDBC connection for querying repository.
+   * @param conn JDBC link for querying repository.
    */
   public abstract void registerConnector(MConnector mc, Connection conn);
 
 
   /**
-   * Retrieve connections which use the given connector.
-   * @param connectorID Connector ID whose connections should be fetched
-   * @param conn JDBC connection for querying repository
-   * @return List of MConnections that use <code>connectorID</code>.
+   * Retrieve links which use the given connector.
+   * @param connectorID Connector ID whose links should be fetched
+   * @param conn JDBC link for querying repository
+   * @return List of MLinks that use <code>connectorID</code>.
    */
-  public abstract List<MConnection> findConnectionsForConnector(long
+  public abstract List<MLink> findLinksForConnector(long
     connectorID, Connection conn);
 
   /**
-   * Retrieve jobs which use the given connection.
+   * Retrieve jobs which use the given link.
    *
    * @param connectorID Connector ID whose jobs should be fetched
-   * @param conn JDBC connection for querying repository
-   * @return List of MJobs that use <code>connectionID</code>.
+   * @param conn JDBC link for querying repository
+   * @return List of MJobs that use <code>linkID</code>.
    */
   public abstract List<MJob> findJobsForConnector(long connectorID,
     Connection conn);
@@ -99,47 +98,47 @@ public abstract class JdbcRepositoryHandler {
    *
    * @param mConnector The new data to be inserted into the repository for
    *                     this connector.
-   * @param conn JDBC connection for querying repository
+   * @param conn JDBC link for querying repository
    */
 
   public abstract void updateConnector(MConnector mConnector, Connection conn);
 
 
   /**
-   * Update the framework with the new data supplied in the
-   * <tt>mFramework</tt>.
+   * Update the driverConfig with the new data supplied in the
+   * <tt>mDriverConfig</tt>.
    * Also Update all forms in the repository
-   * with the forms specified in <tt>mFramework</tt>. <tt>mFramework </tt> must
+   * with the forms specified in <tt>mDriverConfig</tt>. <tt>mDriverConfig </tt> must
    * minimally have the connectorID and all required forms (including ones
    * which may not have changed). After this operation the repository is
    * guaranteed to only have the new forms specified in this object.
    *
-   * @param mFramework The new data to be inserted into the repository for
-   *                     the framework.
-   * @param conn JDBC connection for querying repository
+   * @param mDriverConfig The new data to be inserted into the repository for
+   *                     the driverConfig.
+   * @param conn JDBC link for querying repository
    */
-  public abstract void updateFramework(MFramework mFramework, Connection conn);
+  public abstract void updateDriverConfig(MDriverConfig mDriverConfig, Connection conn);
 
 
   /**
-   * Search for framework metadata in the repository.
+   * Search for driverConfigin the repository.
    *
-   * @param conn JDBC connection for querying repository.
-   * @return null if framework metadata are not yet present in repository or
+   * @param conn JDBC link for querying repository.
+   * @return null if driverConfig are not yet present in repository or
    *  loaded representation.
    */
-  public abstract MFramework findFramework(Connection conn);
+  public abstract MDriverConfig findDriverConfig(Connection conn);
 
   /**
-   * Register framework metadata in repository.
+   * Register driver config in repository.
    *
-   * Save framework metadata into repository. Metadata should not be already
+   * Save driver config into repository. Driver config  should not be already
    * registered or present in the repository.
    *
-   * @param mf Framework metadata that should be registered.
-   * @param conn JDBC connection for querying repository.
+   * @param driverConfig Driver config that should be registered.
+   * @param conn JDBC link for querying repository.
    */
-  public abstract void registerFramework(MFramework mf, Connection conn);
+  public abstract void registerDriverConfig(MDriverConfig driverConfig, Connection conn);
 
   /**
    * Return true if repository tables exists and are suitable for use.
@@ -169,95 +168,92 @@ public abstract class JdbcRepositoryHandler {
   public abstract void shutdown();
 
   /**
-   * Specify query that Sqoop framework can use to validate connection to
+   * Specify query that Sqoop can use to validate link to
    * repository. This query should return at least one row.
    *
    * @return Query or NULL in case that this repository do not support or do not
-   *   want to validate live connections.
+   *   want to validate live links.
    */
   public abstract String validationQuery();
 
   /**
-   * Save given connection to repository. This connection must not be already
+   * Save given link to repository. This link must not be already
    * present in the repository otherwise exception will be thrown.
    *
-   * @param connection Connection object to serialize into repository.
-   * @param conn Connection to metadata repository
+   * @param link Link object to serialize into repository.
+   * @param conn Connection to the repository
    */
-  public abstract void createConnection(MConnection connection,
-    Connection conn);
+  public abstract void createLink(MLink link, Connection conn);
 
   /**
-   * Update given connection representation in repository. This connection
+   * Update given link representation in repository. This link
    * object must already exists in the repository otherwise exception will be
    * thrown.
    *
-   * @param connection Connection object that should be updated in repository.
-   * @param conn Connection to metadata repository
+   * @param link Link object that should be updated in repository.
+   * @param conn Connection to the repository
    */
-  public abstract void updateConnection(MConnection connection,
-    Connection conn);
+  public abstract void updateLink(MLink link, Connection conn);
 
   /**
-   * Check if given connection exists in metastore.
+   * Check if given link exists in repository.
    *
-   * @param connetionId Connection id
-   * @param conn Connection to metadata repository
-   * @return True if the connection exists
+   * @param linkId Link id
+   * @param conn Connection to the repository
+   * @return True if the link exists
    */
-  public abstract boolean existsConnection(long connetionId, Connection conn);
+  public abstract boolean existsLink(long linkId, Connection conn);
 
   /**
    * Check if given Connection id is referenced somewhere and thus can't
    * be removed.
    *
-   * @param connectionId Connection id
-   * @param conn Connection to metadata repository
+   * @param linkId Link id
+   * @param conn Connection to the repository
    * @return
    */
-  public abstract boolean inUseConnection(long connectionId, Connection conn);
+  public abstract boolean inUseLink(long linkId, Connection conn);
 
   /**
-   * Enable or disable connection with given id from metadata repository
+   * Enable or disable link with given id from the repository
    *
-   * @param connectionId Connection object that is going to be enabled or disabled
+   * @param linkId Link object that is going to be enabled or disabled
    * @param enabled Enable or disable
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    */
-  public abstract void enableConnection(long connectionId, boolean enabled, Connection conn);
+  public abstract void enableLink(long linkId, boolean enabled, Connection conn);
 
   /**
-   * Delete connection with given id from metadata repository.
+   * Delete link with given id from the repository.
    *
-   * @param connectionId Connection object that should be removed from repository
-   * @param conn Connection to metadata repository
+   * @param linkId Link object that should be removed from repository
+   * @param conn Connection to the repository
    */
-  public abstract void deleteConnection(long connectionId, Connection conn);
+  public abstract void deleteLink(long linkId, Connection conn);
 
   /**
-   * Delete the input values for the connection with given id from the
+   * Delete the input values for the link with given id from the
    * repository.
-   * @param id Connection object whose inputs should be removed from repository
-   * @param conn Connection to metadata repository
+   * @param id Link object whose inputs should be removed from repository
+   * @param conn Connection to the repository
    */
-  public abstract void deleteConnectionInputs(long id, Connection conn);
+  public abstract void deleteLinkInputs(long id, Connection conn);
   /**
-   * Find connection with given id in repository.
+   * Find link with given id in repository.
    *
-   * @param connectionId Connection id
-   * @param conn Connection to metadata repository
-   * @return Deserialized form of the connection that is saved in repository
+   * @param linkId Link id
+   * @param conn Connection to the repository
+   * @return Deserialized form of the link that is saved in repository
    */
-  public abstract MConnection findConnection(long connectionId,
-    Connection conn);
+  public abstract MLink findLink(long linkId, Connection conn);
 
   /**
-   * Get all connection objects.
+   * Get all link objects.
    *
-   * @param conn Connection to metadata repository
-   * @return List will all saved connection objects
+   * @param conn Connection to the repository
+   * @return List will all saved link objects
    */
-  public abstract List<MConnection> findConnections(Connection conn);
+  public abstract List<MLink> findLinks(Connection conn);
 
 
   /**
@@ -265,7 +261,7 @@ public abstract class JdbcRepositoryHandler {
    * present in the repository otherwise exception will be thrown.
    *
    * @param job Job object to serialize into repository.
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    */
   public abstract void createJob(MJob job, Connection conn);
 
@@ -275,15 +271,15 @@ public abstract class JdbcRepositoryHandler {
    * thrown.
    *
    * @param job Job object that should be updated in repository.
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    */
   public abstract void updateJob(MJob job, Connection conn);
 
   /**
-   * Check if given job exists in metastore.
+   * Check if given job exists in the repository.
    *
    * @param jobId Job id
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    * @return True if the job exists
    */
   public abstract boolean existsJob(long jobId, Connection conn);
@@ -293,7 +289,7 @@ public abstract class JdbcRepositoryHandler {
    * be removed.
    *
    * @param jobId Job id
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    * @return
    */
   public abstract boolean inUseJob(long jobId, Connection conn);
@@ -303,22 +299,22 @@ public abstract class JdbcRepositoryHandler {
    *
    * @param jobId Job id
    * @param enabled Enable or disable
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    */
   public abstract void enableJob(long jobId, boolean enabled, Connection conn);
 
   /**
    * Delete the input values for the job with given id from the repository.
    * @param id Job object whose inputs should be removed from repository
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    */
   public abstract void deleteJobInputs(long id, Connection conn);
   /**
-   * Delete job with given id from metadata repository. This method will
+   * Delete job with given id from the repository. This method will
    * delete all inputs for this job also.
    *
    * @param jobId Job object that should be removed from repository
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    */
   public abstract void deleteJob(long jobId, Connection conn);
 
@@ -326,7 +322,7 @@ public abstract class JdbcRepositoryHandler {
    * Find job with given id in repository.
    *
    * @param jobId Job id
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    * @return Deserialized form of the job that is present in the repository
    */
   public abstract MJob findJob(long jobId, Connection conn);
@@ -334,7 +330,7 @@ public abstract class JdbcRepositoryHandler {
   /**
    * Get all job objects.
    *
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    * @return List will all saved job objects
    */
   public abstract List<MJob> findJobs(Connection conn);
@@ -343,16 +339,15 @@ public abstract class JdbcRepositoryHandler {
    * Save given submission in repository.
    *
    * @param submission Submission object
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    */
-  public abstract void createSubmission(MSubmission submission,
-    Connection conn);
+  public abstract void createSubmission(MSubmission submission, Connection conn);
 
   /**
    * Check if submission with given id already exists in repository.
    *
    * @param submissionId Submission internal id
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    */
   public abstract boolean existsSubmission(long submissionId, Connection conn);
 
@@ -360,39 +355,38 @@ public abstract class JdbcRepositoryHandler {
    * Update given submission in repository.
    *
    * @param submission Submission object
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    */
-  public abstract void updateSubmission(MSubmission submission,
-    Connection conn);
+  public abstract void updateSubmission(MSubmission submission, Connection conn);
 
   /**
    * Remove submissions older then threshold from repository.
    *
    * @param threshold Threshold date
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    */
   public abstract void purgeSubmissions(Date threshold, Connection conn);
 
   /**
    * Return list of unfinished submissions (as far as repository is concerned).
    *
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    * @return List of unfinished submissions.
    */
   public abstract List<MSubmission> findSubmissionsUnfinished(Connection conn);
 
   /**
-   * Return list of all submissions from metadata repository.
+   * Return list of all submissions from the repository.
    *
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    * @return List of all submissions.
    */
   public abstract List<MSubmission> findSubmissions(Connection conn);
 
   /**
-   * Return list of submissions from metadata repository for given jobId.
+   * Return list of submissions from the repository for given jobId.
    * @param jobId Job id
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    * @return List of submissions
    */
   public abstract List<MSubmission> findSubmissionsForJob(long jobId, Connection conn);
@@ -401,7 +395,7 @@ public abstract class JdbcRepositoryHandler {
    * Find last submission for given jobId.
    *
    * @param jobId Job id
-   * @param conn Connection to metadata repository
+   * @param conn Connection to the repository
    * @return Most recent submission
    */
   public abstract MSubmission findSubmissionLastForJob(long jobId,


Mime
View raw message