sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-659: Design metadata upgrade procedure
Date Fri, 19 Apr 2013 04:16:01 GMT
Updated Branches:
  refs/heads/sqoop2 a69b1cc66 -> c4467c677


SQOOP-659: Design metadata upgrade procedure

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/c4467c67
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/c4467c67
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/c4467c67

Branch: refs/heads/sqoop2
Commit: c4467c6770c99b4f7e231ee0af162fa825791656
Parents: a69b1cc
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Thu Apr 18 21:15:28 2013 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Thu Apr 18 21:15:28 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/sqoop/model/MConnection.java   |   12 +
 .../src/main/java/org/apache/sqoop/model/MJob.java |   17 +
 .../sqoop/connector/jdbc/GenericJdbcConnector.java |    6 +
 .../jdbc/GenericJdbcConnectorMetadataUpgrader.java |   70 ++++
 .../apache/sqoop/repository/JdbcRepository.java    |  149 ++++++++-
 .../sqoop/repository/JdbcRepositoryHandler.java    |  118 +++++--
 .../org/apache/sqoop/repository/Repository.java    |  266 +++++++++++++-
 .../sqoop/repository/derby/DerbyRepoError.java     |    2 +
 .../repository/derby/DerbyRepositoryHandler.java   |  202 +++++++++---
 .../sqoop/repository/derby/DerbySchemaQuery.java   |   52 +++-
 .../sqoop/connector/spi/MetadataUpgrader.java      |   47 +++
 .../apache/sqoop/connector/spi/SqoopConnector.java |    7 +
 12 files changed, 841 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/common/src/main/java/org/apache/sqoop/model/MConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MConnection.java b/common/src/main/java/org/apache/sqoop/model/MConnection.java
index 36dca42..c31eafd 100644
--- a/common/src/main/java/org/apache/sqoop/model/MConnection.java
+++ b/common/src/main/java/org/apache/sqoop/model/MConnection.java
@@ -57,6 +57,18 @@ public class MConnection extends MAccountableEntity {
     return connectorId;
   }
 
+  public void setConnectorPart(MConnectionForms connectorPart) {
+    this.connectorPart = connectorPart;
+  }
+
+  public void setFrameworkPart(MConnectionForms frameworkPart) {
+    this.frameworkPart = frameworkPart;
+  }
+
+  public void setConnectorId(long connectorId) {
+    this.connectorId = connectorId;
+  }
+
   public MConnectionForms getConnectorPart() {
     return connectorPart;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/common/src/main/java/org/apache/sqoop/model/MJob.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/model/MJob.java b/common/src/main/java/org/apache/sqoop/model/MJob.java
index a53f04e..5b50bfd 100644
--- a/common/src/main/java/org/apache/sqoop/model/MJob.java
+++ b/common/src/main/java/org/apache/sqoop/model/MJob.java
@@ -98,10 +98,18 @@ public class MJob extends MAccountableEntity {
     return connectionId;
   }
 
+  public void setConnectionId(long connectionId) {
+    this.connectionId = connectionId;
+  }
+
   public long getConnectorId() {
     return connectorId;
   }
 
+  public void setConnectorId(long connectorId) {
+    this.connectorId = connectorId;
+  }
+
   public MJobForms getConnectorPart() {
     return connectorPart;
   }
@@ -110,6 +118,15 @@ public class MJob extends MAccountableEntity {
     return frameworkPart;
   }
 
+  public void setConnectorPart(MJobForms connectorPart) {
+    this.connectorPart = connectorPart;
+  }
+
+  public void setFrameworkPart(MJobForms frameworkPart) {
+    this.frameworkPart = frameworkPart;
+  }
+
+
   public Type getType() {
     return type;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
index c315e48..11c10de 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnector.java
@@ -24,6 +24,7 @@ import org.apache.sqoop.common.VersionInfo;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
 import org.apache.sqoop.job.etl.Exporter;
 import org.apache.sqoop.job.etl.Importer;
 import org.apache.sqoop.connector.spi.SqoopConnector;
@@ -94,4 +95,9 @@ public class GenericJdbcConnector extends SqoopConnector {
     return new GenericJdbcValidator();
   }
 
+  @Override
+  public MetadataUpgrader getMetadataUpgrader() {
+    return new GenericJdbcConnectorMetadataUpgrader();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java
new file mode 100644
index 0000000..cd461f4
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorMetadataUpgrader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MForm;
+import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MJobForms;
+import org.apache.sqoop.validation.Status;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GenericJdbcConnectorMetadataUpgrader extends MetadataUpgrader {
+  /*
+   * For now, there is no real upgrade. So copy all data over,
+   * set the validation messages and error messages to be the same as for the
+   * inputs in the original one.
+   */
+
+  @Override
+  public void upgrade(MConnectionForms original,
+    MConnectionForms upgradeTarget) {
+    doUpgrade(original.getForms(), upgradeTarget.getForms());
+  }
+
+  @Override
+  public void upgrade(MJobForms original, MJobForms upgradeTarget) {
+    doUpgrade(original.getForms(), upgradeTarget.getForms());
+
+  }
+
+  @SuppressWarnings("unchecked")
+  private void doUpgrade(List<MForm> original, List<MForm> target) {
+    // Easier to find the form in the original forms list if we use a map.
+    // Since the constructor of MJobForms takes a list,
+    // index is not guaranteed to be the same, so we need to look for
+    // equivalence
+    Map<String, MForm> formMap = new HashMap<String, MForm>();
+    for (MForm form : original) {
+      formMap.put(form.getName(), form);
+    }
+    for (MForm form : target) {
+      List<MInput<?>> inputs = form.getInputs();
+      MForm originalForm = formMap.get(form.getName());
+      for (MInput input : inputs) {
+        MInput originalInput = originalForm.getInput(input.getName());
+        input.setValue(originalInput.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
index 32df1e5..b2259ce 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java
@@ -29,7 +29,7 @@ import org.apache.sqoop.model.MFramework;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MSubmission;
 
-public class JdbcRepository implements Repository {
+public class JdbcRepository extends Repository {
 
   private static final Logger LOG =
       Logger.getLogger(JdbcRepository.class);
@@ -58,27 +58,42 @@ public class JdbcRepository implements Repository {
     Object doIt(Connection conn) throws Exception;
   }
 
+  private Object doWithConnection(DoWithConnection delegator) {
+    return doWithConnection(delegator, null);
+  }
+
   /**
    * Handle transaction and connection functionality and delegate action to
    * given delegator.
    *
    * @param delegator Code for specific action
+   * @param tx The transaction to use for the operation. If a transaction is
+   *           specified, this method will not commit, rollback or close it.
+   *           If null, a new transaction will be created - which will be
+   *           committed/closed/rolled back.
    * @return Arbitrary value
    */
-  private Object doWithConnection(DoWithConnection delegator) {
-    JdbcRepositoryTransaction tx = null;
+  private Object doWithConnection(DoWithConnection delegator,
+    JdbcRepositoryTransaction tx) {
+    boolean shouldCloseTxn = false;
 
     try {
       // Get transaction and connection
-      tx = getTransaction();
-      tx.begin();
-      Connection conn = tx.getConnection();
+      Connection conn;
+      if (tx == null) {
+        tx = getTransaction();
+        shouldCloseTxn = true;
+        tx.begin();
+      }
+      conn = tx.getConnection();
 
       // Delegate the functionality to our delegator
       Object returnValue = delegator.doIt(conn);
 
-      // Commit transaction
-      tx.commit();
+      if (shouldCloseTxn) {
+        // Commit transaction
+        tx.commit();
+      }
 
       // Return value that the underlying code needs to return
       return returnValue;
@@ -86,12 +101,12 @@ public class JdbcRepository implements Repository {
     } catch (SqoopException ex) {
       throw  ex;
     } catch (Exception ex) {
-      if (tx != null) {
+      if (tx != null && shouldCloseTxn) {
         tx.rollback();
       }
       throw new SqoopException(RepositoryError.JDBCREPO_0012, ex);
     } finally {
-      if (tx != null) {
+      if (tx != null && shouldCloseTxn) {
         tx.close();
       }
     }
@@ -121,11 +136,20 @@ public class JdbcRepository implements Repository {
           handler.registerConnector(mConnector, conn);
           return mConnector;
         } else {
+          // Same connector, check if the version is the same.
+          // For now, use the "string" versions itself - later we should
+          // probably include a build number or something that is
+          // monotonically increasing.
+          if (result.getUniqueName().equals(mConnector.getUniqueName()) &&
+            mConnector.getVersion().compareTo(result.getVersion()) > 0) {
+            upgradeConnector(result, mConnector);
+            return mConnector;
+          }
           if (!result.equals(mConnector)) {
             throw new SqoopException(RepositoryError.JDBCREPO_0013,
               "Connector: " + mConnector.getUniqueName()
-              + " given: " + mConnector
-              + " found: " + result);
+                + " given: " + mConnector
+                + " found: " + result);
           }
           return result;
         }
@@ -137,6 +161,19 @@ public class JdbcRepository implements Repository {
    * {@inheritDoc}
    */
   @Override
+  public MConnector findConnector(final String shortName) {
+    return (MConnector) doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) throws Exception {
+        return handler.findConnector(shortName, conn);
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public MFramework registerFramework(final MFramework mFramework) {
     return (MFramework) doWithConnection(new DoWithConnection() {
       @Override
@@ -179,6 +216,15 @@ public class JdbcRepository implements Repository {
    */
   @Override
   public void updateConnection(final MConnection connection) {
+    updateConnection(connection, null);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void updateConnection(final MConnection connection,
+    RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
@@ -193,7 +239,7 @@ public class JdbcRepository implements Repository {
         handler.updateConnection(connection, conn);
         return null;
       }
-    });
+    }, (JdbcRepositoryTransaction) tx);
   }
 
   /**
@@ -269,6 +315,14 @@ public class JdbcRepository implements Repository {
    */
   @Override
   public void updateJob(final MJob job) {
+    updateJob(job, null);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void updateJob(final MJob job, RepositoryTransaction tx) {
     doWithConnection(new DoWithConnection() {
       @Override
       public Object doIt(Connection conn) {
@@ -283,7 +337,7 @@ public class JdbcRepository implements Repository {
         handler.updateJob(job, conn);
         return null;
       }
-    });
+    }, (JdbcRepositoryTransaction) tx);
   }
 
   /**
@@ -420,4 +474,71 @@ public class JdbcRepository implements Repository {
       }
     });
   }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<MConnection> findConnectionsForConnector(final long
+    connectorID) {
+    return (List<MConnection>) doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) throws Exception {
+        return handler.findConnectionsForConnector(connectorID, conn);
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<MJob> findJobsForConnector(final long connectorID) {
+    return (List<MJob>) doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) throws Exception {
+        return handler.findJobsForConnector(connectorID, conn);
+      }
+    });
+  }
+
+  @Override
+  protected void deleteJobInputs(final long jobID, RepositoryTransaction tx) {
+    doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) throws Exception {
+        handler.deleteJobInputs(jobID, conn);
+        return null;
+      }
+    }, (JdbcRepositoryTransaction) tx);
+
+  }
+
+  @Override
+  protected void deleteConnectionInputs(final long connectionID,
+    RepositoryTransaction tx) {
+    doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) throws Exception {
+        handler.deleteConnectionInputs(connectionID, conn);
+        return null;
+      }
+    }, (JdbcRepositoryTransaction) tx);
+
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected void updateConnector(final MConnector newConnector,
+    RepositoryTransaction tx) {
+    doWithConnection(new DoWithConnection() {
+      @Override
+      public Object doIt(Connection conn) throws Exception {
+        handler.updateConnector(newConnector, conn);
+        return null;
+      }
+    }, (JdbcRepositoryTransaction) tx);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
index ca51313..1f88b6d 100644
--- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
+++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java
@@ -30,14 +30,14 @@ import org.apache.sqoop.model.MSubmission;
 /**
  * Set of methods required from each JDBC based repository.
  */
-public interface JdbcRepositoryHandler {
+public abstract class JdbcRepositoryHandler {
 
   /**
    * Initialize JDBC based repository.
    *
    * @param repoContext Context for this instance
    */
-  void initialize(JdbcRepositoryContext repoContext);
+  public abstract void initialize(JdbcRepositoryContext repoContext);
 
   /**
    * Search for connector with given name in repository.
@@ -49,7 +49,7 @@ public interface JdbcRepositoryHandler {
    * @return null if connector is not yet registered in repository or
    *   loaded representation.
    */
-  MConnector findConnector(String shortName, Connection conn);
+  public abstract MConnector findConnector(String shortName, Connection conn);
 
   /**
    * Register given connector in repository.
@@ -60,7 +60,42 @@ public interface JdbcRepositoryHandler {
    * @param mc Connector that should be registered.
    * @param conn JDBC connection for querying repository.
    */
-  void registerConnector(MConnector mc, Connection conn);
+  public abstract void registerConnector(MConnector mc, Connection conn);
+
+
+  /**
+   * Retrieve connections which use the given connector.
+   * @param connectorID Connector ID whose connections should be fetched
+   * @param conn JDBC connection for querying repository
+   * @return List of MConnections that use <code>connectorID</code>.
+   */
+  public abstract List<MConnection> findConnectionsForConnector(long
+    connectorID, Connection conn);
+
+  /**
+   * Retrieve jobs which use the given connection.
+   *
+   * @param connectorID Connector ID whose jobs should be fetched
+   * @param conn JDBC connection for querying repository
+   * @return List of MJobs that use <code>connectionID</code>.
+   */
+  public abstract List<MJob> findJobsForConnector(long connectorID,
+    Connection conn);
+
+  /**
+   * Update the connector with the new data supplied in the <tt>newConnector</tt>.
+   * Also Update all forms associated with this connector in the repository
+   * with the forms specified in <tt>mConnector</tt>. <tt>mConnector </tt> must
+   * minimally have the connectorID and all required forms (including ones
+   * which may not have changed). After this operation the repository is
+   * guaranteed to only have the new forms specified in this object.
+   *
+   * @param mConnector The new data to be inserted into the repository for
+   *                     this connector.
+   * @param conn JDBC connection for querying repository
+   */
+
+  public abstract void updateConnector(MConnector mConnector, Connection conn);
 
   /**
    * Search for framework metadata in the repository.
@@ -69,7 +104,7 @@ public interface JdbcRepositoryHandler {
    * @return null if framework metadata are not yet present in repository or
    *  loaded representation.
    */
-  MFramework findFramework(Connection conn);
+  public abstract MFramework findFramework(Connection conn);
 
   /**
    * Register framework metadata in repository.
@@ -80,26 +115,26 @@ public interface JdbcRepositoryHandler {
    * @param mf Framework metadata that should be registered.
    * @param conn JDBC connection for querying repository.
    */
-  void registerFramework(MFramework mf, Connection conn);
+  public abstract void registerFramework(MFramework mf, Connection conn);
 
   /**
    * Check if schema is already present in the repository.
    *
    * @return true if schema is already present or false if it's not
    */
-  boolean schemaExists();
+  public abstract boolean schemaExists();
 
   /**
    * Create required schema in repository.
    */
-  void createSchema();
+  public abstract void createSchema();
 
   /**
    * Termination callback for repository.
    *
    * Should clean up all resources and commit all uncommitted data.
    */
-  void shutdown();
+  public abstract void shutdown();
 
   /**
    * Specify query that Sqoop framework can use to validate connection to
@@ -108,7 +143,7 @@ public interface JdbcRepositoryHandler {
    * @return Query or NULL in case that this repository do not support or do not
    *   want to validate live connections.
    */
-  String validationQuery();
+  public abstract String validationQuery();
 
   /**
    * Save given connection to repository. This connection must not be already
@@ -117,7 +152,8 @@ public interface JdbcRepositoryHandler {
    * @param connection Connection object to serialize into repository.
    * @param conn Connection to metadata repository
    */
-  void createConnection(MConnection connection, Connection conn);
+  public abstract void createConnection(MConnection connection,
+    Connection conn);
 
   /**
    * Update given connection representation in repository. This connection
@@ -127,7 +163,8 @@ public interface JdbcRepositoryHandler {
    * @param connection Connection object that should be updated in repository.
    * @param conn Connection to metadata repository
    */
-  void updateConnection(MConnection connection, Connection conn);
+  public abstract void updateConnection(MConnection connection,
+    Connection conn);
 
   /**
    * Check if given connection exists in metastore.
@@ -136,7 +173,7 @@ public interface JdbcRepositoryHandler {
    * @param conn Connection to metadata repository
    * @return True if the connection exists
    */
-  boolean existsConnection(long connetionId, Connection conn);
+  public abstract boolean existsConnection(long connetionId, Connection conn);
 
   /**
    * Check if given Connection id is referenced somewhere and thus can't
@@ -146,7 +183,7 @@ public interface JdbcRepositoryHandler {
    * @param conn Connection to metadata repository
    * @return
    */
-  boolean inUseConnection(long connectionId, Connection conn);
+  public abstract boolean inUseConnection(long connectionId, Connection conn);
 
   /**
    * Delete connection with given id from metadata repository.
@@ -154,16 +191,24 @@ public interface JdbcRepositoryHandler {
    * @param connectionId Connection object that should be removed from repository
    * @param conn Connection to metadata repository
    */
-  void deleteConnection(long connectionId, Connection conn);
+  public abstract void deleteConnection(long connectionId, Connection conn);
 
   /**
+   * Delete the input values for the connection with given id from the
+   * repository.
+   * @param id Connection object whose inputs should be removed from repository
+   * @param conn Connection to metadata repository
+   */
+  public abstract void deleteConnectionInputs(long id, Connection conn);
+  /**
    * Find connection with given id in repository.
    *
    * @param connectionId Connection id
    * @param conn Connection to metadata repository
    * @return Deserialized form of the connection that is saved in repository
    */
-  MConnection findConnection(long connectionId, Connection conn);
+  public abstract MConnection findConnection(long connectionId,
+    Connection conn);
 
   /**
    * Get all connection objects.
@@ -171,7 +216,7 @@ public interface JdbcRepositoryHandler {
    * @param conn Connection to metadata repository
    * @return List will all saved connection objects
    */
-  List<MConnection> findConnections(Connection conn);
+  public abstract List<MConnection> findConnections(Connection conn);
 
 
   /**
@@ -181,7 +226,7 @@ public interface JdbcRepositoryHandler {
    * @param job Job object to serialize into repository.
    * @param conn Connection to metadata repository
    */
-  void createJob(MJob job, Connection conn);
+  public abstract void createJob(MJob job, Connection conn);
 
   /**
    * Update given job representation in repository. This job object must
@@ -191,7 +236,7 @@ public interface JdbcRepositoryHandler {
    * @param job Job object that should be updated in repository.
    * @param conn Connection to metadata repository
    */
-  void updateJob(MJob job, Connection conn);
+  public abstract void updateJob(MJob job, Connection conn);
 
   /**
    * Check if given job exists in metastore.
@@ -200,7 +245,7 @@ public interface JdbcRepositoryHandler {
    * @param conn Connection to metadata repository
    * @return True if the job exists
    */
-  boolean existsJob(long jobId, Connection conn);
+  public abstract boolean existsJob(long jobId, Connection conn);
 
   /**
    * Check if given job id is referenced somewhere and thus can't
@@ -210,15 +255,23 @@ public interface JdbcRepositoryHandler {
    * @param conn Connection to metadata repository
    * @return
    */
-  boolean inUseJob(long jobId, Connection conn);
+  public abstract boolean inUseJob(long jobId, Connection conn);
+
 
   /**
-   * Delete job with given id from metadata repository.
+   * Delete the input values for the job with given id from the repository.
+   * @param id Job object whose inputs should be removed from repository
+   * @param conn Connection to metadata repository
+   */
+  public abstract void deleteJobInputs(long id, Connection conn);
+  /**
+   * Delete job with given id from metadata repository. This method will
+   * delete all inputs for this job also.
    *
    * @param jobId Job object that should be removed from repository
    * @param conn Connection to metadata repository
    */
-  void deleteJob(long jobId, Connection conn);
+  public abstract void deleteJob(long jobId, Connection conn);
 
   /**
    * Find job with given id in repository.
@@ -227,7 +280,7 @@ public interface JdbcRepositoryHandler {
    * @param conn Connection to metadata repository
    * @return Deserialized form of the job that is present in the repository
    */
-  MJob findJob(long jobId, Connection conn);
+  public abstract MJob findJob(long jobId, Connection conn);
 
   /**
    * Get all job objects.
@@ -235,7 +288,7 @@ public interface JdbcRepositoryHandler {
    * @param conn Connection to metadata repository
    * @return List will all saved job objects
    */
-  List<MJob> findJobs(Connection conn);
+  public abstract List<MJob> findJobs(Connection conn);
 
   /**
    * Save given submission in repository.
@@ -243,7 +296,8 @@ public interface JdbcRepositoryHandler {
    * @param submission Submission object
    * @param conn Connection to metadata repository
    */
-  void createSubmission(MSubmission submission, Connection conn);
+  public abstract void createSubmission(MSubmission submission,
+    Connection conn);
 
   /**
    * Check if submission with given id already exists in repository.
@@ -251,7 +305,7 @@ public interface JdbcRepositoryHandler {
    * @param submissionId Submission internal id
    * @param conn Connection to metadata repository
    */
-  boolean existsSubmission(long submissionId, Connection conn);
+  public abstract boolean existsSubmission(long submissionId, Connection conn);
 
   /**
    * Update given submission in repository.
@@ -259,7 +313,8 @@ public interface JdbcRepositoryHandler {
    * @param submission Submission object
    * @param conn Connection to metadata repository
    */
-  void updateSubmission(MSubmission submission, Connection conn);
+  public abstract void updateSubmission(MSubmission submission,
+    Connection conn);
 
   /**
    * Remove submissions older then threshold from repository.
@@ -267,7 +322,7 @@ public interface JdbcRepositoryHandler {
    * @param threshold Threshold date
    * @param conn Connection to metadata repository
    */
-  void purgeSubmissions(Date threshold, Connection conn);
+  public abstract void purgeSubmissions(Date threshold, Connection conn);
 
   /**
    * Return list of unfinished submissions (as far as repository is concerned).
@@ -275,7 +330,7 @@ public interface JdbcRepositoryHandler {
    * @param conn Connection to metadata repository
    * @return List of unfinished submissions.
    */
-  List<MSubmission> findSubmissionsUnfinished(Connection conn);
+  public abstract List<MSubmission> findSubmissionsUnfinished(Connection conn);
 
   /**
    * Find last submission for given jobId.
@@ -284,5 +339,6 @@ public interface JdbcRepositoryHandler {
    * @param conn Connection to metadata repository
    * @return Most recent submission
    */
-  MSubmission findSubmissionLastForJob(long jobId, Connection conn);
+  public abstract MSubmission findSubmissionLastForJob(long jobId,
+    Connection conn);
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/core/src/main/java/org/apache/sqoop/repository/Repository.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java
index d6ec303..57c9be4 100644
--- a/core/src/main/java/org/apache/sqoop/repository/Repository.java
+++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java
@@ -17,12 +17,27 @@
  */
 package org.apache.sqoop.repository;
 
+import org.apache.sqoop.common.ErrorCode;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.ConnectorManager;
+import org.apache.sqoop.connector.spi.MetadataUpgrader;
+import org.apache.sqoop.connector.spi.SqoopConnector;
 import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MConnectionForms;
 import org.apache.sqoop.model.MConnector;
+import org.apache.sqoop.model.MEnumInput;
+import org.apache.sqoop.model.MForm;
 import org.apache.sqoop.model.MFramework;
+import org.apache.sqoop.model.MInput;
+import org.apache.sqoop.model.MIntegerInput;
 import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MJobForms;
+import org.apache.sqoop.model.MMapInput;
+import org.apache.sqoop.model.MStringInput;
 import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.model.ModelError;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
@@ -32,9 +47,9 @@ import java.util.List;
  * Sqoop to store metadata, statistics and other state relevant to Sqoop
  * Jobs in the system.
  */
-public interface Repository {
+public abstract class Repository {
 
-  RepositoryTransaction getTransaction();
+  public abstract RepositoryTransaction getTransaction();
 
   /**
    * Registers given connector in the repository and return registered
@@ -44,7 +59,18 @@ public interface Repository {
    * @param mConnector the connector metadata to be registered
    * @return Registered connector structure
    */
-  MConnector registerConnector(MConnector mConnector);
+  public abstract MConnector registerConnector(MConnector mConnector);
+
+  /**
+   * Search for connector with given name in repository.
+   *
+   * And return corresponding metadata structure.
+   *
+   * @param shortName Connector unique name
+   * @return null if connector is not yet registered in repository or
+   *   loaded representation.
+   */
+  public abstract MConnector findConnector(String shortName);
 
 
   /**
@@ -55,7 +81,7 @@ public interface Repository {
    * @param mFramework framework metadata to be registered
    * @return Registered connector structure
    */
-  MFramework registerFramework(MFramework mFramework);
+  public abstract MFramework registerFramework(MFramework mFramework);
 
   /**
    * Save given connection to repository. This connection must not be already
@@ -63,7 +89,16 @@ public interface Repository {
    *
    * @param connection Connection object to serialize into repository.
    */
-  void createConnection(MConnection connection);
+  public abstract void createConnection(MConnection connection);
+
+  /**
+   * Update given connection representation in repository. This connection
+   * object must already exists in the repository otherwise exception will be
+   * thrown.
+   *
+   * @param connection Connection object that should be updated in repository.
+   */
+  public abstract void updateConnection(MConnection connection);
 
   /**
    * Update given connection representation in repository. This connection
@@ -71,15 +106,20 @@ public interface Repository {
    * thrown.
    *
    * @param connection Connection object that should be updated in repository.
+   * @param tx The repository transaction to use to push the data to the
+   *           repository. If this is null, a new transaction will be created.
+   *           method will not call begin, commit,
+   *           rollback or close on this transaction.
    */
-  void updateConnection(MConnection connection);
+  public abstract void updateConnection(final MConnection connection,
+    RepositoryTransaction tx);
 
   /**
    * Delete connection with given id from metadata repository.
    *
    * @param id Connection object that should be removed from repository
    */
-  void deleteConnection(long id);
+  public abstract void deleteConnection(long id);
 
   /**
    * Find connection with given id in repository.
@@ -87,14 +127,14 @@ public interface Repository {
    * @param id Connection id
    * @return Deserialized form of the connection that is saved in repository
    */
-  MConnection findConnection(long id);
+  public abstract MConnection findConnection(long id);
 
   /**
    * Get all connection objects.
    *
    * @return List will all saved connection objects
    */
-  List<MConnection> findConnections();
+  public abstract List<MConnection> findConnections();
 
   /**
    * Save given job to repository. This job object must not be already present
@@ -102,7 +142,7 @@ public interface Repository {
    *
    * @param job Job object that should be saved to repository
    */
-  void createJob(MJob job);
+  public abstract void createJob(MJob job);
 
   /**
    * Update given job metadata in repository. This object must already be saved
@@ -110,14 +150,26 @@ public interface Repository {
    *
    * @param job Job object that should be updated in the repository
    */
-  void updateJob(MJob job);
+  public abstract void updateJob(MJob job);
+
+  /**
+   * Update given job metadata in repository. This object must already be saved
+   * in repository otherwise exception will be thrown.
+   *
+   * @param job Job object that should be updated in the repository
+   * @param tx The repository transaction to use to push the data to the
+   *           repository. If this is null, a new transaction will be created.
+   *           method will not call begin, commit,
+   *           rollback or close on this transaction.
+   */
+  public abstract void updateJob(MJob job, RepositoryTransaction tx);
 
   /**
    * Delete job with given id from metadata repository.
    *
    * @param id Job id that should be removed
    */
-  void deleteJob(long id);
+  public abstract void deleteJob(long id);
 
   /**
    * Find job object with given id.
@@ -125,42 +177,42 @@ public interface Repository {
    * @param id Job id
    * @return Deserialized form of job loaded from repository
    */
-  MJob findJob(long id);
+  public abstract MJob findJob(long id);
 
   /**
    * Get all job objects.
    *
    * @return List of all jobs in the repository
    */
-  List<MJob> findJobs();
+  public abstract List<MJob> findJobs();
 
   /**
    * Create new submission record in repository.
    *
    * @param submission Submission object that should be serialized to repository
    */
-  void createSubmission(MSubmission submission);
+  public abstract void createSubmission(MSubmission submission);
 
   /**
    * Update already existing submission record in repository.
    *
    * @param submission Submission object that should be updated
    */
-  void updateSubmission(MSubmission submission);
+  public abstract void updateSubmission(MSubmission submission);
 
   /**
    * Remove submissions older then given date from repository.
    *
    * @param threshold Threshold date
    */
-  void purgeSubmissions(Date threshold);
+  public abstract void purgeSubmissions(Date threshold);
 
   /**
    * Return all unfinished submissions as far as repository is concerned.
    *
    * @return List of unfinished submissions
    */
-  List<MSubmission> findSubmissionsUnfinished();
+  public abstract List<MSubmission> findSubmissionsUnfinished();
 
   /**
    * Find last submission for given jobId.
@@ -168,5 +220,181 @@ public interface Repository {
    * @param jobId Job id
    * @return Most recent submission
    */
-  MSubmission findSubmissionLastForJob(long jobId);
+  public abstract MSubmission findSubmissionLastForJob(long jobId);
+
+  /**
+   * Retrieve connections which use the given connector.
+   * @param connectorID Connector ID whose connections should be fetched
+   * @return List of MConnections that use <code>connectorID</code>.
+   */
+  public abstract List<MConnection> findConnectionsForConnector(long
+    connectorID);
+
+  /**
+   * Retrieve jobs which use the given connection.
+   *
+   * @param connectorID Connector ID whose jobs should be fetched
+   * @return List of MJobs that use <code>connectionID</code>.
+   */
+  public abstract List<MJob> findJobsForConnector(long
+    connectorID);
+
+  /**
+   * Update the connector with the new data supplied in the
+   * <tt>newConnector</tt>. Also Update all forms associated with this
+   * connector in the repository with the forms specified in
+   * <tt>mConnector</tt>. <tt>mConnector </tt> must
+   * minimally have the connectorID and all required forms (including ones
+   * which may not have changed). After this operation the repository is
+   * guaranteed to only have the new forms specified in this object.
+   *
+   * @param newConnector The new data to be inserted into the repository for
+   *                     this connector.
+   * @param tx The repository transaction to use to push the data to the
+   *           repository. If this is null, a new transaction will be created.
+   *           method will not call begin, commit,
+   *           rollback or close on this transaction.
+   */
+  protected abstract void updateConnector(MConnector newConnector,
+    RepositoryTransaction tx);
+
+  /**
+   * Delete all inputs for a job
+   * @param jobId The id of the job whose inputs are to be deleted.
+   * @param tx A transaction on the repository. This
+   *           method will not call <code>begin, commit,
+   *           rollback or close on this transaction.</code>
+   */
+  protected abstract void deleteJobInputs(long jobId, RepositoryTransaction tx);
+
+  /**
+   * Delete all inputs for a connection
+   * @param connectionID The id of the connection whose inputs are to be
+   *                     deleted.
+   * @param tx The repository transaction to use to push the data to the
+   *           repository. If this is null, a new transaction will be created.
+   *           method will not call begin, commit,
+   *           rollback or close on this transaction.
+   */
+  protected abstract void deleteConnectionInputs(long connectionID,
+    RepositoryTransaction tx);
+
+  /**
+   * Upgrade the connector with the same {@linkplain MConnector#uniqueName}
+   * in the repository with values from <code>newConnector</code>.
+   * <p/>
+   * All connections and jobs associated with this connector will be upgraded
+   * automatically.
+   *
+   * @param oldConnector The old connector that should be upgraded.
+   * @param newConnector New properties for the Connector that should be
+   *                     upgraded.
+   */
+  public final void upgradeConnector(MConnector oldConnector,
+    MConnector newConnector) {
+    long connectorID = oldConnector.getPersistenceId();
+    newConnector.setPersistenceId(connectorID);
+    /* Algorithms:
+     * 1. Get an upgrader for the connector.
+     * 2. Get all connections associated with the connector.
+     * 3. Get all jobs associated with the connector.
+     * 4. Delete the inputs for all of the jobs and connections (in that order)
+     * 5. Remove all inputs and forms associated with the connector, and
+     *    register the new forms and inputs.
+     * 6. Create new connections and jobs with connector part being the ones
+     *    returned by the upgrader.
+     * 7. Insert the connection inputs followed by job inputs (using
+     *    updateJob and updateConnection)
+     */
+    RepositoryTransaction tx = null;
+    try {
+      SqoopConnector connector = ConnectorManager.getInstance().getConnector(
+        connectorID);
+      MetadataUpgrader upgrader = connector.getMetadataUpgrader();
+      List<MConnection> connections = findConnectionsForConnector(
+        connectorID);
+      List<MJob> jobs = findJobsForConnector(connectorID);
+      // -- BEGIN TXN --
+      tx = getTransaction();
+      tx.begin();
+      for (MJob job : jobs) {
+        deleteJobInputs(job.getPersistenceId(), tx);
+      }
+      for (MConnection connection : connections) {
+        deleteConnectionInputs(connection.getPersistenceId(), tx);
+      }
+      updateConnector(newConnector, tx);
+      for (MConnection connection : connections) {
+        long connectionID = connection.getPersistenceId();
+        // Make a new copy of the forms from the connector,
+        // else the values will get set in the forms in the connector for
+        // each connection.
+        List<MForm> forms = cloneForms(newConnector.getConnectionForms()
+          .getForms());
+        MConnectionForms newConnectionForms = new MConnectionForms(forms);
+        upgrader.upgrade(connection.getConnectorPart(), newConnectionForms);
+        MConnection newConnection = new MConnection(connectorID,
+          newConnectionForms, connection.getFrameworkPart());
+        newConnection.setPersistenceId(connectionID);
+        updateConnection(newConnection, tx);
+      }
+      for (MJob job : jobs) {
+        // Make a new copy of the forms from the connector,
+        // else the values will get set in the forms in the connector for
+        // each connection.
+        List<MForm> forms = cloneForms(newConnector.getJobForms(job.getType())
+          .getForms());
+        MJobForms newJobForms = new MJobForms(job.getType(), forms);
+        upgrader.upgrade(job.getConnectorPart(), newJobForms);
+        MJob newJob = new MJob(connectorID, job.getConnectionId(),
+          job.getType(), newJobForms, job.getFrameworkPart());
+        updateJob(newJob, tx);
+      }
+      tx.commit();
+    } catch (Exception ex) {
+      if(tx != null) {
+        tx.rollback();
+      }
+      throw new SqoopException(RepositoryError.JDBCREPO_0000, ex);
+    } finally {
+      if(tx != null) {
+        tx.close();
+      }
+    }
+  }
+
+  /**
+   * Clones the forms, but does not set the actual data,
+   * validation message etc in the inputs, but only the persistence id of the
+   * inputs.
+   * @param mForms MForms which must be cloned
+   * @return Cloned MForms
+   * @throws Exception
+   */
+  private List<MForm> cloneForms(List<MForm> mForms) throws Exception {
+    List<MForm> forms = new ArrayList<MForm>();
+    for(MForm mForm : mForms) {
+      List<MInput<?>> inputs = new ArrayList<MInput<?>>();
+      for (MInput<?> input : mForm.getInputs()) {
+        MInput newInput;
+        if(input instanceof MEnumInput) {
+          newInput = new MEnumInput(input.getName(), input.isSensitive(),
+            ((MEnumInput) input).getValues());
+        } else if (input instanceof MMapInput) {
+          newInput = new MMapInput(input.getName(), input.isSensitive());
+        } else if(input instanceof MStringInput) {
+          newInput = new MStringInput(input.getName(), input.isSensitive(),
+            ((MStringInput) input).getMaxLength());
+        } else if (input instanceof MIntegerInput) {
+          newInput = new MIntegerInput(input.getName(), input.isSensitive());
+        } else {
+          throw new SqoopException(ModelError.MODEL_003);
+        }
+        newInput.setPersistenceId(input.getPersistenceId());
+        inputs.add(newInput);
+      }
+      forms.add(new MForm(mForm.getName(), inputs));
+    }
+    return forms;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
index 95f6570..327896c 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepoError.java
@@ -161,6 +161,8 @@ public enum DerbyRepoError implements ErrorCode {
   /** Can't retrieve unfinished submissions **/
   DERBYREPO_0037("Can't retrieve unfinished submissions"),
 
+  DERBYREPO_0038("Update of connector failed"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
index 32cef8a..556241e 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java
@@ -67,7 +67,7 @@ import org.apache.sqoop.utils.StringUtils;
  *
  * Repository implementation for Derby database.
  */
-public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
+public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
 
   private static final Logger LOG =
       Logger.getLogger(DerbyRepositoryHandler.class);
@@ -86,15 +86,54 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
   public void registerConnector(MConnector mc, Connection conn) {
     if (mc.hasPersistenceId()) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0011,
-          mc.getUniqueName());
+        mc.getUniqueName());
     }
+    mc.setPersistenceId(getConnectorId(mc, conn));
+    insertFormsForConnector(mc, conn);
+  }
 
-    PreparedStatement baseConnectorStmt = null;
+  /**
+   * Helper method to insert the forms from the MConnector into the
+   * repository. The job and connector forms within <code>mc</code> will get
+   * updated with the id of the forms when this function returns.
+   * @param mc The connector to use for updating forms
+   * @param conn JDBC connection to use for updating the forms
+   */
+  private void insertFormsForConnector (MConnector mc, Connection conn) {
+    long connectorId = mc.getPersistenceId();
     PreparedStatement baseFormStmt = null;
     PreparedStatement baseInputStmt = null;
+    try{
+      baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE,
+        Statement.RETURN_GENERATED_KEYS);
+
+      baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
+        Statement.RETURN_GENERATED_KEYS);
+
+      // Register connector forms
+      registerForms(connectorId, null, mc.getConnectionForms().getForms(),
+        MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
+
+      // Register all jobs
+      for (MJobForms jobForms : mc.getAllJobsForms().values()) {
+        registerForms(connectorId, jobForms.getType(), jobForms.getForms(),
+          MFormType.JOB.name(), baseFormStmt, baseInputStmt);
+      }
+
+    } catch (SQLException ex) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
+        mc.toString(), ex);
+    } finally {
+      closeStatements(baseFormStmt, baseInputStmt);
+    }
+
+  }
+
+  private long getConnectorId(MConnector mc, Connection conn) {
+    PreparedStatement baseConnectorStmt = null;
     try {
       baseConnectorStmt = conn.prepareStatement(STMT_INSERT_CONNECTOR_BASE,
-          Statement.RETURN_GENERATED_KEYS);
+        Statement.RETURN_GENERATED_KEYS);
       baseConnectorStmt.setString(1, mc.getUniqueName());
       baseConnectorStmt.setString(2, mc.getClassName());
       baseConnectorStmt.setString(3, mc.getVersion());
@@ -110,31 +149,12 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
       if (!rsetConnectorId.next()) {
         throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
       }
-
-      long connectorId = rsetConnectorId.getLong(1);
-      mc.setPersistenceId(connectorId);
-
-      baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE,
-          Statement.RETURN_GENERATED_KEYS);
-
-      baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE,
-          Statement.RETURN_GENERATED_KEYS);
-
-      // Register connector forms
-      registerForms(connectorId, null, mc.getConnectionForms().getForms(),
-        MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt);
-
-      // Register all jobs
-      for (MJobForms jobForms : mc.getAllJobsForms().values()) {
-        registerForms(connectorId, jobForms.getType(), jobForms.getForms(),
-          MFormType.JOB.name(), baseFormStmt, baseInputStmt);
-      }
-
+      return rsetConnectorId.getLong(1);
     } catch (SQLException ex) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
-          mc.toString(), ex);
+        mc.toString(), ex);
     } finally {
-      closeStatements(baseConnectorStmt, baseFormStmt, baseInputStmt);
+      closeStatements(baseConnectorStmt);
     }
   }
 
@@ -227,7 +247,6 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
       }
       String sqoopSchemaId = rset.getString(1);
       LOG.debug("SQOOP schema ID: " + sqoopSchemaId);
-
       connection.commit();
     } catch (SQLException ex) {
       if (connection != null) {
@@ -550,21 +569,33 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
   @Override
   public void deleteConnection(long id, Connection conn) {
     PreparedStatement dltConn = null;
-    PreparedStatement dltConnInput = null;
+
     try {
-      dltConnInput = conn.prepareStatement(STMT_DELETE_CONNECTION_INPUT);
+      deleteConnectionInputs(id, conn);
       dltConn = conn.prepareStatement(STMT_DELETE_CONNECTION);
-
-      dltConnInput.setLong(1, id);
       dltConn.setLong(1, id);
-
-      dltConnInput.executeUpdate();
       dltConn.executeUpdate();
+    } catch (SQLException ex) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex);
+    } finally {
+      closeStatements(dltConn);
+    }
+  }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deleteConnectionInputs(long id, Connection conn) {
+    PreparedStatement dltConnInput = null;
+    try {
+      dltConnInput = conn.prepareStatement(STMT_DELETE_CONNECTION_INPUT);
+      dltConnInput.setLong(1, id);
+      dltConnInput.executeUpdate();
     } catch (SQLException ex) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0022, ex);
     } finally {
-      closeStatements(dltConn, dltConnInput);
+      closeStatements(dltConnInput);
     }
   }
 
@@ -613,6 +644,63 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
     }
   }
 
+
+  /**
+   *
+   * {@inheritDoc}
+   *
+   */
+  @Override
+  public List<MConnection> findConnectionsForConnector(long
+    connectorID, Connection conn) {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(STMT_SELECT_CONNECTION_FOR_CONNECTOR);
+      stmt.setLong(1, connectorID);
+
+      return loadConnections(stmt, conn);
+
+    } catch (SQLException ex) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0023, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void updateConnector(MConnector mConnector, Connection conn) {
+    PreparedStatement updateConnectorStatement = null;
+    PreparedStatement deleteForm = null;
+    PreparedStatement deleteInput = null;
+    try {
+      updateConnectorStatement = conn.prepareStatement(STMT_UPDATE_CONNECTOR);
+      deleteInput = conn.prepareStatement(STMT_DELETE_INPUTS_FOR_CONNECTOR);
+      deleteForm = conn.prepareStatement(STMT_DELETE_FORMS_FOR_CONNECTOR);
+      updateConnectorStatement.setString(1, mConnector.getUniqueName());
+      updateConnectorStatement.setString(2, mConnector.getClassName());
+      updateConnectorStatement.setString(3, mConnector.getVersion());
+      updateConnectorStatement.setLong(4, mConnector.getPersistenceId());
+
+      if (updateConnectorStatement.executeUpdate() != 1) {
+        throw new SqoopException(DerbyRepoError.DERBYREPO_0038);
+      }
+      deleteInput.setLong(1, mConnector.getPersistenceId());
+      deleteForm.setLong(1, mConnector.getPersistenceId());
+      deleteInput.executeUpdate();
+      deleteForm.executeUpdate();
+
+    } catch (SQLException e) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0038, e);
+    } finally {
+      closeStatements(updateConnectorStatement, deleteForm, deleteInput);
+    }
+    insertFormsForConnector(mConnector, conn);
+
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -746,21 +834,32 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
   @Override
   public void deleteJob(long id, Connection conn) {
     PreparedStatement dlt = null;
-    PreparedStatement dltInput = null;
     try {
-      dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT);
+      deleteJobInputs(id, conn);
       dlt = conn.prepareStatement(STMT_DELETE_JOB);
-
-      dltInput.setLong(1, id);
       dlt.setLong(1, id);
-
-      dltInput.executeUpdate();
       dlt.executeUpdate();
+    } catch (SQLException ex) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex);
+    } finally {
+      closeStatements(dlt);
+    }
+  }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deleteJobInputs(long id, Connection conn) {
+    PreparedStatement dltInput = null;
+    try {
+      dltInput = conn.prepareStatement(STMT_DELETE_JOB_INPUT);
+      dltInput.setLong(1, id);
+      dltInput.executeUpdate();
     } catch (SQLException ex) {
       throw new SqoopException(DerbyRepoError.DERBYREPO_0028, ex);
     } finally {
-      closeStatements(dlt, dltInput);
+      closeStatements(dltInput);
     }
   }
 
@@ -813,6 +912,24 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
    * {@inheritDoc}
    */
   @Override
+  public List<MJob> findJobsForConnector(long connectorId, Connection conn) {
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR);
+      stmt.setLong(1, connectorId);
+      return loadJobs(stmt, conn);
+
+    } catch (SQLException ex) {
+      throw new SqoopException(DerbyRepoError.DERBYREPO_0031, ex);
+    } finally {
+      closeStatements(stmt);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
   public void createSubmission(MSubmission submission, Connection conn) {
     PreparedStatement stmt = null;
     int result;
@@ -1297,7 +1414,8 @@ public class DerbyRepositoryHandler implements JdbcRepositoryHandler {
   }
 
   /**
-   * Register forms in derby database.
+   * Register forms in derby database. This method will insert the ids
+   * generated by the repository into the forms passed in itself.
    *
    * Use given prepared statements to create entire form structure in database.
    *

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
index ea458ac..4968c0d 100644
--- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
+++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java
@@ -447,6 +447,30 @@ public final class DerbySchemaQuery {
       + COLUMN_SQI_ENUMVALS
       + ") VALUES (?, ?, ?, ?, ?, ?, ?)";
 
+  // Delete all forms for a given connector
+  public static final String STMT_DELETE_FORMS_FOR_CONNECTOR =
+    "DELETE FROM " + TABLE_SQ_FORM
+    + " WHERE " + COLUMN_SQN_CONNECTOR + " = ?";
+
+  // Delete all inputs for a given connector
+  public static final String STMT_DELETE_INPUTS_FOR_CONNECTOR =
+    "DELETE FROM " + TABLE_SQ_INPUT
+    + " WHERE "
+    + COLUMN_SQI_FORM
+    + " IN (SELECT "
+    + COLUMN_SQF_ID
+    + " FROM " + TABLE_SQ_FORM
+    + " WHERE "
+    + COLUMN_SQF_CONNECTOR + " = ?)";
+
+  // Update the connector
+  public static final String STMT_UPDATE_CONNECTOR =
+    "UPDATE " + TABLE_SQ_CONNECTOR
+    + " SET " + COLUMN_SQC_NAME + " = ?, "
+    + COLUMN_SQC_CLASS + " = ?, "
+    + COLUMN_SQC_VERSION + " = ? "
+    + " WHERE " + COLUMN_SQC_ID + " = ?";
+
   // DML: Insert new connection
   public static final String STMT_INSERT_CONNECTION =
     "INSERT INTO " + TABLE_SQ_CONNECTION + " ("
@@ -502,6 +526,17 @@ public final class DerbySchemaQuery {
     + COLUMN_SQN_UPDATE_DATE
     + " FROM " + TABLE_SQ_CONNECTION;
 
+  // DML: Select all connections for a specific connector.
+  public static final String STMT_SELECT_CONNECTION_FOR_CONNECTOR =
+    "SELECT "
+    + COLUMN_SQN_ID + ", "
+    + COLUMN_SQN_NAME + ", "
+    + COLUMN_SQN_CONNECTOR + ", "
+    + COLUMN_SQN_CREATION_DATE + ", "
+    + COLUMN_SQN_UPDATE_DATE
+    + " FROM " + TABLE_SQ_CONNECTION
+    + " WHERE " + COLUMN_SQN_CONNECTOR + " = ?";
+
   // DML: Check if given connection exists
   public static final String STMT_SELECT_CONNECTION_CHECK =
     "SELECT count(*) FROM " + TABLE_SQ_CONNECTION
@@ -567,7 +602,7 @@ public final class DerbySchemaQuery {
     + COLUMN_SQB_UPDATE_DATE
     + " FROM " + TABLE_SQ_JOB
     + " LEFT JOIN " + TABLE_SQ_CONNECTION
-      + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+    + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
     + " WHERE " + COLUMN_SQB_ID + " = ?";
 
   // DML: Select all jobs
@@ -584,6 +619,21 @@ public final class DerbySchemaQuery {
     + " LEFT JOIN " + TABLE_SQ_CONNECTION
       + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID;
 
+  // DML: Select all jobs for a Connector
+  public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR =
+    "SELECT "
+    + COLUMN_SQN_CONNECTOR + ", "
+    + COLUMN_SQB_ID + ", "
+    + COLUMN_SQB_NAME + ", "
+    + COLUMN_SQB_CONNECTION + ", "
+    + COLUMN_SQB_TYPE + ", "
+    + COLUMN_SQB_CREATION_DATE + ", "
+    + COLUMN_SQB_UPDATE_DATE
+    + " FROM " + TABLE_SQ_JOB
+    + " LEFT JOIN " + TABLE_SQ_CONNECTION
+      + " ON " + COLUMN_SQB_CONNECTION + " = " + COLUMN_SQN_ID
+      + " AND " + COLUMN_SQN_CONNECTOR + " = ? ";
+
   // DML: Insert new submission
   public static final String STMT_INSERT_SUBMISSION =
     "INSERT INTO " + TABLE_SQ_SUBMISSION + "("

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java b/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java
new file mode 100644
index 0000000..d840a78
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/MetadataUpgrader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sqoop.connector.spi;
+
+import org.apache.sqoop.model.MConnection;
+import org.apache.sqoop.model.MConnectionForms;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MJobForms;
+
+public abstract class MetadataUpgrader {
+
+  /**
+   * Upgrade the original connection and fill into the upgradeTarget. Note
+   * that any metadata already in {@code upgradeTarget} maybe overwritten.
+   * @param original - original connection metadata as in the repository
+   * @param upgradeTarget - the instance that will be filled in with the
+   *                      upgraded metadata.
+   */
+  public abstract void upgrade(MConnectionForms original,
+    MConnectionForms upgradeTarget);
+  /**
+   * Upgrade the original job and fill into the upgradeTarget. Note
+   * that any metadata already in {@code upgradeTarget} maybe overwritten.
+   * This method must be called only after the connection metadata has
+   * already been upgraded.
+   * @param original - original connection metadata as in the repository
+   * @param upgradeTarget - the instance that will be filled in with the
+   *                      upgraded metadata.
+   */
+  public abstract void upgrade(MJobForms original, MJobForms upgradeTarget);
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/c4467c67/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
index 540303a..2becc56 100644
--- a/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
+++ b/spi/src/main/java/org/apache/sqoop/connector/spi/SqoopConnector.java
@@ -72,4 +72,11 @@ public abstract class SqoopConnector {
    */
   public abstract Validator getValidator();
 
+  /**
+   * Returns an {@linkplain MetadataUpgrader} object that can upgrade the
+   * connection and job metadata.
+   * @return MetadataUpgrader object
+   */
+  public abstract MetadataUpgrader getMetadataUpgrader();
+
 }


Mime
View raw message