sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [4/4] SQOOP-656 End to end submission engine (Jarek Jarcec Cecho)
Date Fri, 02 Nov 2012 22:30:33 GMT
http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
----------------------------------------------------------------------
diff --git a/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
new file mode 100644
index 0000000..3433b20
--- /dev/null
+++ b/repository/repository-derby/src/test/java/org/apache/sqoop/repository/derby/TestSubmissionHandling.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.repository.derby;
+
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.submission.SubmissionStatus;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+
+/**
+ *
+ */
+public class TestSubmissionHandling extends  DerbyTestCase {
+
+  DerbyRepositoryHandler handler;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+
+    handler = new DerbyRepositoryHandler();
+
+    // We always needs schema for this test case
+    createSchema();
+
+    // We always needs connector and framework structures in place
+    loadConnectorAndFramework();
+
+    // We also always needs connection metadata in place
+    loadConnections();
+
+    // And finally we always needs job metadata in place
+    loadJobs();
+  }
+
+  public void testFindSubmissionsUnfinished() throws Exception {
+    List<MSubmission> submissions;
+
+    submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+    assertNotNull(submissions);
+    assertEquals(0, submissions.size());
+
+    loadSubmissions();
+
+    submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+    assertNotNull(submissions);
+    assertEquals(2, submissions.size());
+  }
+
+  public void testExistsSubmission() throws Exception {
+    // There shouldn't be anything on empty repository
+    assertFalse(handler.existsSubmission(1, getDerbyConnection()));
+    assertFalse(handler.existsSubmission(2, getDerbyConnection()));
+    assertFalse(handler.existsSubmission(3, getDerbyConnection()));
+    assertFalse(handler.existsSubmission(4, getDerbyConnection()));
+    assertFalse(handler.existsSubmission(5, getDerbyConnection()));
+    assertFalse(handler.existsSubmission(6, getDerbyConnection()));
+
+    loadSubmissions();
+
+    assertTrue(handler.existsSubmission(1, getDerbyConnection()));
+    assertTrue(handler.existsSubmission(2, getDerbyConnection()));
+    assertTrue(handler.existsSubmission(3, getDerbyConnection()));
+    assertTrue(handler.existsSubmission(4, getDerbyConnection()));
+    assertTrue(handler.existsSubmission(5, getDerbyConnection()));
+    assertFalse(handler.existsSubmission(6, getDerbyConnection()));
+  }
+
+  public void testCreateSubmission() throws Exception {
+    MSubmission submission =
+      new MSubmission(1, new Date(), SubmissionStatus.RUNNING, "job-x");
+
+    handler.createSubmission(submission, getDerbyConnection());
+
+    assertEquals(1, submission.getPersistenceId());
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 1);
+
+    List<MSubmission> submissions =
+      handler.findSubmissionsUnfinished(getDerbyConnection());
+    assertNotNull(submissions);
+    assertEquals(1, submissions.size());
+
+    submission = submissions.get(0);
+
+    assertEquals(1, submission.getJobId());
+    assertEquals(SubmissionStatus.RUNNING, submission.getStatus());
+    assertEquals("job-x", submission.getExternalId());
+
+    // Let's create second connection
+    submission =
+      new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x");
+    handler.createSubmission(submission, getDerbyConnection());
+
+    assertEquals(2, submission.getPersistenceId());
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
+  }
+
+  public void testUpdateConnection() throws Exception {
+    loadSubmissions();
+
+    List<MSubmission> submissions =
+      handler.findSubmissionsUnfinished(getDerbyConnection());
+    assertNotNull(submissions);
+    assertEquals(2, submissions.size());
+
+    MSubmission submission = submissions.get(0);
+    submission.setStatus(SubmissionStatus.SUCCEEDED);
+
+    handler.updateSubmission(submission, getDerbyConnection());
+
+    submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+    assertNotNull(submissions);
+    assertEquals(1, submissions.size());
+  }
+
+  public void testPurgeSubmissions() throws Exception {
+    loadSubmissions();
+    List<MSubmission> submissions;
+
+    submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+    assertNotNull(submissions);
+    assertEquals(2, submissions.size());
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 5);
+
+    Calendar calendar = Calendar.getInstance();
+    // 2012-01-03 05:05:05
+    calendar.set(2012, Calendar.JANUARY, 3, 5, 5, 5);
+    handler.purgeSubmissions(calendar.getTime(), getDerbyConnection());
+
+    submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+    assertNotNull(submissions);
+    assertEquals(1, submissions.size());
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
+
+    handler.purgeSubmissions(new Date(), getDerbyConnection());
+
+    submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+    assertNotNull(submissions);
+    assertEquals(0, submissions.size());
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
+
+    handler.purgeSubmissions(new Date(), getDerbyConnection());
+
+    submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
+    assertNotNull(submissions);
+    assertEquals(0, submissions.size());
+    assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index 78ad8ee..71aa6c9 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -46,6 +46,12 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.sqoop.submission</groupId>
+      <artifactId>sqoop-submission-mapreduce</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.sqoop.repository</groupId>
       <artifactId>sqoop-repository-derby</artifactId>
       <version>2.0.0-SNAPSHOT</version>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
index eba334e..64ef84a 100644
--- a/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/ConnectionRequestHandler.java
@@ -33,7 +33,7 @@ import org.apache.sqoop.repository.RepositoryManager;
 import org.apache.sqoop.server.RequestContext;
 import org.apache.sqoop.server.RequestHandler;
 import org.apache.sqoop.server.common.ServerError;
-import org.apache.sqoop.utils.ClassLoadingUtils;
+import org.apache.sqoop.utils.ClassUtils;
 import org.apache.sqoop.validation.Status;
 import org.apache.sqoop.validation.Validation;
 import org.apache.sqoop.validation.Validator;
@@ -158,9 +158,9 @@ public class ConnectionRequestHandler implements RequestHandler {
     Validator frameworkValidator = FrameworkManager.getValidator();
 
     // We need translate forms to configuration objects
-    Object connectorConfig = ClassLoadingUtils.instantiate(
+    Object connectorConfig = ClassUtils.instantiate(
       connector.getConnectionConfigurationClass());
-    Object frameworkConfig = ClassLoadingUtils.instantiate(
+    Object frameworkConfig = ClassUtils.instantiate(
       FrameworkManager.getConnectionConfigurationClass());
 
     FormUtils.fillValues(

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
index fda91fd..8a52243 100644
--- a/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/ConnectorRequestHandler.java
@@ -60,7 +60,7 @@ public class ConnectorRequestHandler implements RequestHandler {
       Long id = Long.parseLong(cid);
 
       // Check that user is not asking for non existing connector id
-      if(!ConnectorManager.getConnectoIds().contains(id)) {
+      if(!ConnectorManager.getConnectorIds().contains(id)) {
         throw new SqoopException(ServerError.SERVER_0004, "Invalid id " + id);
       }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
index 0589e30..070b290 100644
--- a/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
+++ b/server/src/main/java/org/apache/sqoop/handler/JobRequestHandler.java
@@ -33,7 +33,7 @@ import org.apache.sqoop.repository.RepositoryManager;
 import org.apache.sqoop.server.RequestContext;
 import org.apache.sqoop.server.RequestHandler;
 import org.apache.sqoop.server.common.ServerError;
-import org.apache.sqoop.utils.ClassLoadingUtils;
+import org.apache.sqoop.utils.ClassUtils;
 import org.apache.sqoop.validation.Status;
 import org.apache.sqoop.validation.Validation;
 import org.apache.sqoop.validation.Validator;
@@ -159,10 +159,10 @@ public class JobRequestHandler implements RequestHandler {
     Validator frameworkValidator = FrameworkManager.getValidator();
 
     // We need translate forms to configuration objects
-    Object connectorConfig = ClassLoadingUtils.instantiate(
-      connector.getConnectionConfigurationClass());
-    Object frameworkConfig = ClassLoadingUtils.instantiate(
-      FrameworkManager.getConnectionConfigurationClass());
+    Object connectorConfig = ClassUtils.instantiate(
+      connector.getJobConfigurationClass(job.getType()));
+    Object frameworkConfig = ClassUtils.instantiate(
+      FrameworkManager.getJobConfigurationClass(job.getType()));
 
     FormUtils.fillValues(job.getConnectorPart().getForms(), connectorConfig);
     FormUtils.fillValues(job.getFrameworkPart().getForms(), frameworkConfig);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
new file mode 100644
index 0000000..e9e6551
--- /dev/null
+++ b/server/src/main/java/org/apache/sqoop/handler/SubmissionRequestHandler.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.framework.FrameworkManager;
+import org.apache.sqoop.json.JsonBean;
+import org.apache.sqoop.json.SubmissionBean;
+import org.apache.sqoop.model.MSubmission;
+import org.apache.sqoop.server.RequestContext;
+import org.apache.sqoop.server.RequestHandler;
+import org.apache.sqoop.server.common.ServerError;
+
+/**
+ * Submission request handler is supporting following resources:
+ *
+ * GET /v1/submission/action/:jid
+ * Get status of last submission for job with id :jid
+ *
+ * POST /v1/submission/action/:jid
+ * Create new submission for job with id :jid
+ *
+ * DELETE /v1/submission/action/:jid
+ * Stop last submission for job with id :jid
+ *
+ * Possible additions in the future: /v1/submission/history/* for history.
+ */
+public class SubmissionRequestHandler implements RequestHandler {
+
+  private final Logger logger = Logger.getLogger(getClass());
+
+  public SubmissionRequestHandler() {
+    logger.info("SubmissionRequestHandler initialized");
+  }
+
+  @Override
+  public JsonBean handleEvent(RequestContext ctx) {
+    String[] urlElements = ctx.getUrlElements();
+    if (urlElements.length < 2) {
+      throw new SqoopException(ServerError.SERVER_0003,
+        "Invalid URL, too few arguments for this servlet.");
+    }
+
+    // Let's check
+    int length = urlElements.length;
+    String action = urlElements[length - 2];
+
+    if(action.equals("action")) {
+      return handleActionEvent(ctx, urlElements[length - 1]);
+    }
+
+    throw new SqoopException(ServerError.SERVER_0003,
+      "Do not know what to do.");
+  }
+
+  private JsonBean handleActionEvent(RequestContext ctx, String sjid) {
+    long jid = Long.parseLong(sjid);
+
+    switch (ctx.getMethod()) {
+      case GET:
+        return submissionStatus(jid);
+      case POST:
+        return submissionSubmit(jid);
+      case DELETE:
+        return submissionStop(jid);
+    }
+
+    return null;
+  }
+
+  private JsonBean submissionStop(long jid) {
+    MSubmission submission = FrameworkManager.stop(jid);
+    return new SubmissionBean(submission);
+  }
+
+  private JsonBean submissionSubmit(long jid) {
+    MSubmission submission = FrameworkManager.submit(jid);
+    return new SubmissionBean(submission);
+  }
+
+  private JsonBean submissionStatus(long jid) {
+    MSubmission submission = FrameworkManager.status(jid);
+    return new SubmissionBean(submission);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/server/src/main/java/org/apache/sqoop/server/RequestContext.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/RequestContext.java b/server/src/main/java/org/apache/sqoop/server/RequestContext.java
index 78950f6..c6b6569 100644
--- a/server/src/main/java/org/apache/sqoop/server/RequestContext.java
+++ b/server/src/main/java/org/apache/sqoop/server/RequestContext.java
@@ -85,6 +85,13 @@ public class RequestContext {
   }
 
   /**
+   * Return all elements in the url as an array
+   */
+  public String[] getUrlElements() {
+    return getRequest().getRequestURI().split("/");
+  }
+
+  /**
    * Get locale specified in accept-language HTTP header.
    *
    * @return First specified locale

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
index 993c153..ae0735b 100644
--- a/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
+++ b/server/src/main/java/org/apache/sqoop/server/ServerInitializer.java
@@ -37,18 +37,22 @@ public class ServerInitializer implements ServletContextListener {
       Logger.getLogger(ServerInitializer.class);
 
   public void contextDestroyed(ServletContextEvent arg0) {
+    LOG.info("Shutting down Sqoop server");
     FrameworkManager.destroy();
     ConnectorManager.destroy();
     RepositoryManager.destroy();
     SqoopConfiguration.destroy();
+    LOG.info("Sqoop server has been correctly terminated");
   }
 
   public void contextInitialized(ServletContextEvent arg0) {
     try {
+      LOG.info("Booting up Sqoop server");
       SqoopConfiguration.initialize();
       RepositoryManager.initialize();
       ConnectorManager.initialize();
       FrameworkManager.initialize();
+      LOG.info("Sqoop server has successfully boot up");
     } catch (RuntimeException ex) {
       LOG.error("Server startup failure", ex);
       throw ex;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java b/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java
new file mode 100644
index 0000000..7252e11
--- /dev/null
+++ b/server/src/main/java/org/apache/sqoop/server/v1/SubmissionServlet.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.server.v1;
+
+import org.apache.sqoop.handler.SubmissionRequestHandler;
+import org.apache.sqoop.json.JsonBean;
+import org.apache.sqoop.server.RequestContext;
+import org.apache.sqoop.server.RequestHandler;
+import org.apache.sqoop.server.SqoopProtocolServlet;
+
+/**
+ *
+ */
+public class SubmissionServlet extends SqoopProtocolServlet {
+
+  private RequestHandler requestHandler;
+
+  public SubmissionServlet() {
+    requestHandler = new SubmissionRequestHandler();
+  }
+
+  @Override
+  protected JsonBean handleGetRequest(RequestContext ctx) throws Exception {
+    return requestHandler.handleEvent(ctx);
+  }
+
+  @Override
+  protected JsonBean handlePostRequest(RequestContext ctx) throws Exception {
+    return requestHandler.handleEvent(ctx);
+  }
+
+  @Override
+  protected JsonBean handleDeleteRequest(RequestContext ctx) throws Exception {
+    return requestHandler.handleEvent(ctx);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/server/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/server/src/main/webapp/WEB-INF/web.xml b/server/src/main/webapp/WEB-INF/web.xml
index 69229bf..f053062 100644
--- a/server/src/main/webapp/WEB-INF/web.xml
+++ b/server/src/main/webapp/WEB-INF/web.xml
@@ -87,5 +87,18 @@ limitations under the License.
     <url-pattern>/v1/job/*</url-pattern>
   </servlet-mapping>
 
+  <!-- Submission servlet -->
+  <servlet>
+    <servlet-name>v1.SubmissionServlet</servlet-name>
+    <servlet-class>org.apache.sqoop.server.v1.SubmissionServlet</servlet-class>
+    <load-on-startup>1</load-on-startup>
+  </servlet>
+
+  <servlet-mapping>
+    <servlet-name>v1.SubmissionServlet</servlet-name>
+    <url-pattern>/v1/submission/*</url-pattern>
+  </servlet-mapping>
+
+
 </web-app>
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java b/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
new file mode 100644
index 0000000..59a9457
--- /dev/null
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/CallbackBase.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+/**
+ * Set of default callbacks that must be implement by each job type.
+ */
+public abstract class CallbackBase {
+
+  private Class<? extends Initializer> initializer;
+  private Class<? extends Destroyer> destroyer;
+
+  public CallbackBase(
+    Class<? extends Initializer> initializer,
+    Class<? extends Destroyer> destroyer
+  ) {
+    this.initializer = initializer;
+    this.destroyer = destroyer;
+  }
+
+  public Class<? extends Destroyer> getDestroyer() {
+    return destroyer;
+  }
+
+  public Class<? extends Initializer> getInitializer() {
+    return initializer;
+  }
+
+  @Override
+  public String toString() {
+    return "initializer=" + initializer.getName() +
+            ", destroyer=" + destroyer.getName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Context.java b/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
deleted file mode 100644
index fc01c96..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Context.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * The context for getting configuration values.
- */
-public interface Context {
-
-  String getString(String key);
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
index af766f3..37b9f1b 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
@@ -17,12 +17,15 @@
  */
 package org.apache.sqoop.job.etl;
 
+import org.apache.sqoop.common.MapContext;
+
 /**
  * This allows connector to define work to complete execution, for example,
  * resource cleaning.
  */
 public abstract class Destroyer {
 
-  public abstract void run(Context context);
+  // TODO(Jarcec): This should be called with ImmutableContext
+  public abstract void run(MapContext context);
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java b/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
index ef690bf..cdaa623 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Exporter.java
@@ -25,32 +25,27 @@ package org.apache.sqoop.job.etl;
  * -> Loader
  * -> Destroyer
  */
-public class Exporter {
+public class Exporter extends CallbackBase {
 
-  private Class<? extends Initializer> initializer;
   private Class<? extends Loader> loader;
-  private Class<? extends Destroyer> destroyer;
 
   public Exporter(
       Class<? extends Initializer> initializer,
       Class<? extends Loader> loader,
       Class<? extends Destroyer> destroyer
       ) {
-    this.initializer = initializer;
+    super(initializer, destroyer);
     this.loader = loader;
-    this.destroyer = destroyer;
-  }
-
-  public Class<? extends Initializer> getInitializer() {
-    return initializer;
   }
 
   public Class<? extends Loader> getLoader() {
     return loader;
   }
 
-  public Class<? extends Destroyer> getDestroyer() {
-    return destroyer;
+  @Override
+  public String toString() {
+    return "Exporter{" + super.toString() +
+      ", loader=" + loader +
+      '}';
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
index 20bdeda..ba04be9 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.job.etl;
 
+import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.job.io.DataWriter;
 
 /**
@@ -25,7 +26,10 @@ import org.apache.sqoop.job.io.DataWriter;
  */
 public abstract class Extractor {
 
-  public abstract void run(Context context,
-      Partition partition, DataWriter writer);
+  public abstract void run(ImmutableContext context,
+                           Object connectionConfiguration,
+                           Object jobConfiguration,
+                           Partition partition,
+                           DataWriter writer);
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
index f0a8d1a..d4c9e70 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Importer.java
@@ -26,25 +26,18 @@ package org.apache.sqoop.job.etl;
  * -> (framework-defined steps)
  * -> Destroyer
  */
-public class Importer {
+public class Importer extends CallbackBase {
 
-  private Class<? extends Initializer> initializer;
   private Class<? extends Partitioner> partitioner;
   private Class<? extends Extractor> extractor;
-  private Class<? extends Destroyer> destroyer;
 
   public Importer(Class<? extends Initializer> initializer,
       Class<? extends Partitioner> partitioner,
       Class<? extends Extractor> extractor,
       Class<? extends Destroyer> destroyer) {
-    this.initializer = initializer;
+    super(initializer, destroyer);
     this.partitioner = partitioner;
     this.extractor = extractor;
-    this.destroyer = destroyer;
-  }
-
-  public Class<? extends Initializer> getInitializer() {
-    return initializer;
   }
 
   public Class<? extends Partitioner> getPartitioner() {
@@ -55,8 +48,11 @@ public class Importer {
     return extractor;
   }
 
-  public Class<? extends Destroyer> getDestroyer() {
-    return destroyer;
+  @Override
+  public String toString() {
+    return "Importer{" + super.toString() +
+      ", partitioner=" + partitioner.getName() +
+      ", extractor=" + extractor.getName() +
+      '}';
   }
-
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
index 75bd42e..2092815 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
@@ -17,12 +17,42 @@
  */
 package org.apache.sqoop.job.etl;
 
+import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.MutableMapContext;
+
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * This allows connector to define initialization work for execution,
  * for example, context configuration.
  */
 public abstract class Initializer {
 
-  public abstract void run(MutableContext context, Options options);
+  /**
+   * Initialize new submission based on given configuration properties. Any
+   * needed temporary values might be saved to context object and they will be
+   * promoted to all other part of the workflow automatically.
+   *
+   * @param context Changeable context object, purely for connector usage
+   * @param connectionConfiguration Connector's connection configuration object
+   * @param jobConfiguration Connector's job configuration object
+   */
+  public abstract void initialize(MutableMapContext context,
+                                  Object connectionConfiguration,
+                                  Object jobConfiguration);
+
+  /**
+   * Return list of all jars that this particular connector needs to operate
+   * on following job. This method will be called after running initialize
+   * method.
+   *
+   * @return
+   */
+  public List<String> getJars(MapContext context,
+                              Object connectionConfiguration,
+                              Object jobConfiguration) {
+    return new LinkedList<String>();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
index 5474927..3a708df 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.job.etl;
 
+import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.job.io.DataReader;
 
 /**
@@ -24,6 +25,6 @@ import org.apache.sqoop.job.io.DataReader;
  */
 public abstract class Loader {
 
-  public abstract void run(Context context, DataReader reader);
+  public abstract void run(ImmutableContext context, DataReader reader);
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java b/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java
deleted file mode 100644
index 03678c5..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/MutableContext.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * The context for getting and setting configuration values.
- */
-public interface MutableContext extends Context {
-
-  void setString(String key, String value);
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/Options.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Options.java b/spi/src/main/java/org/apache/sqoop/job/etl/Options.java
deleted file mode 100644
index 2dc4671..0000000
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Options.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.job.etl;
-
-/**
- * The options provided from user input.
- */
-public interface Options {
-
-  public String getOption(String key);
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
index 8834c80..db07844 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partition.java
@@ -36,4 +36,11 @@ public abstract class Partition {
    */
   public abstract void write(DataOutput out) throws IOException;
 
+  /**
+   * Each partition must be easily serializable to human readable form so that
+   * it can be logged for debugging purpose.
+   *
+   * @return Human readable representation
+   */
+  public abstract String toString();
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
index 21310be..3a525c4 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.sqoop.job.etl;
 
+import org.apache.sqoop.common.ImmutableContext;
+
 import java.util.List;
 
 /**
@@ -25,6 +27,8 @@ import java.util.List;
  */
 public abstract class Partitioner {
 
-  public abstract List<Partition> run(Context context);
+  public abstract List<Partition> getPartitions(ImmutableContext context,
+                                                Object connectionConfiguration,
+                                                Object jobConfiguration);
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/submission/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/submission/mapreduce/pom.xml b/submission/mapreduce/pom.xml
new file mode 100644
index 0000000..03c06c0
--- /dev/null
+++ b/submission/mapreduce/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.sqoop</groupId>
+    <artifactId>submission</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop.submission</groupId>
+  <artifactId>sqoop-submission-mapreduce</artifactId>
+  <version>2.0.0-SNAPSHOT</version>
+  <name>Sqoop Mapreduce Submission Engine</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-core</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-core</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java
new file mode 100644
index 0000000..e562701
--- /dev/null
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/Constants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.submission.mapreduce;
+
+/**
+ * Configuration constants for Mapreduce submission engine
+ */
+public class Constants {
+
+  public static final String PREFIX_MAPREDUCE = "mapreduce.";
+
+  public static final String CONF_CONFIG_DIR =
+    PREFIX_MAPREDUCE + "configuration.directory";
+
+  private Constants() {
+    // Instantiation is prohibited
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
new file mode 100644
index 0000000..7049924
--- /dev/null
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.submission.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.MapContext;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.framework.SubmissionRequest;
+import org.apache.sqoop.framework.SubmissionEngine;
+import org.apache.sqoop.job.JobConstants;
+import org.apache.sqoop.model.FormUtils;
+import org.apache.sqoop.submission.counter.Counters;
+import org.apache.sqoop.submission.SubmissionStatus;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Map;
+
+
+/**
+ * This is very simple and straightforward implementation of map-reduce based
+ * submission engine.
+ */
+public class MapreduceSubmissionEngine extends SubmissionEngine {
+
+
+  private static Logger LOG = Logger.getLogger(MapreduceSubmissionEngine.class);
+
+  /**
+   * Global configuration object that is build from hadoop configuration files
+   * on engine initialization and cloned during each new submission creation.
+   */
+  private Configuration globalConfiguration;
+
+  /**
+   * Job client that is configured to talk to one specific Job tracker.
+   */
+  private JobClient jobClient;
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void initialize(MapContext context, String prefix) {
+    LOG.info("Initializing Map-reduce Submission Engine");
+
+    // Build global configuration, start with empty configuration object
+    globalConfiguration = new Configuration();
+    globalConfiguration.clear();
+
+    // Load configured hadoop configuration directory
+    String configDirectory = context.getString(prefix + Constants.CONF_CONFIG_DIR);
+
+    // Git list of files ending with "-site.xml" (configuration files)
+    File dir = new File(configDirectory);
+    String [] files = dir.list(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        return name.endsWith("-site.xml");
+      }
+    });
+
+    // Add each such file to our global configuration object
+    for (String file : files) {
+      LOG.info("Found hadoop configuration file " + file);
+      try {
+        globalConfiguration.addResource(new File(configDirectory, file).toURI().toURL());
+      } catch (MalformedURLException e) {
+        LOG.error("Can't load configuration file: " + file, e);
+      }
+    }
+
+    // Create job client
+    try {
+      jobClient = new JobClient(new Configuration(globalConfiguration));
+    } catch (IOException e) {
+      throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0002, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void destroy() {
+    LOG.info("Destroying Mapreduce Submission Engine");
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  @SuppressWarnings("unchecked")
+  public boolean submit(SubmissionRequest request) {
+    // Clone global configuration
+    Configuration configuration = new Configuration(globalConfiguration);
+
+    // Serialize framework context into job configuration
+    for(Map.Entry<String, String> entry: request.getFrameworkContext()) {
+      configuration.set(entry.getKey(), entry.getValue());
+    }
+
+    // Serialize connector context as a sub namespace
+    for(Map.Entry<String, String> entry :request.getConnectorContext()) {
+      configuration.set(
+        JobConstants.PREFIX_CONNECTOR_CONTEXT + entry.getKey(),
+        entry.getValue());
+    }
+
+    // Serialize configuration objects - Firstly configuration classes
+    configuration.set(JobConstants.JOB_CONFIG_CLASS_CONNECTOR_CONNECTION,
+      request.getConfigConnectorConnection().getClass().getName());
+    configuration.set(JobConstants.JOB_CONFIG_CLASS_CONNECTOR_JOB,
+      request.getConfigConnectorJob().getClass().getName());
+    configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_CONNECTION,
+      request.getConfigFrameworkConnection().getClass().getName());
+    configuration.set(JobConstants.JOB_CONFIG_CLASS_FRAMEWORK_JOB,
+      request.getConfigFrameworkJob().getClass().getName());
+
+    // And finally configuration data
+    configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_CONNECTION,
+      FormUtils.toJson(request.getConfigConnectorConnection()));
+    configuration.set(JobConstants.JOB_CONFIG_CONNECTOR_JOB,
+      FormUtils.toJson(request.getConfigConnectorJob()));
+    configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_CONNECTION,
+      FormUtils.toJson(request.getConfigFrameworkConnection()));
+    configuration.set(JobConstants.JOB_CONFIG_FRAMEWORK_JOB,
+      FormUtils.toJson(request.getConfigFrameworkConnection()));
+
+    // Promote all required jars to the job
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for(String jar : request.getJars()) {
+      if(first) {
+        first = false;
+      } else {
+        sb.append(",");
+      }
+      LOG.debug("Adding jar to the job: " + jar);
+      sb.append(jar);
+    }
+    configuration.set("tmpjars", sb.toString());
+
+    try {
+      Job job = Job.getInstance(configuration);
+      job.setJobName(request.getJobName());
+
+      job.setInputFormatClass(request.getInputFormatClass());
+
+      job.setMapperClass(request.getMapperClass());
+      job.setMapOutputKeyClass(request.getMapOutputKeyClass());
+      job.setMapOutputValueClass(request.getMapOutputValueClass());
+
+      String outputDirectory = request.getOutputDirectory();
+      if(outputDirectory != null) {
+        FileOutputFormat.setOutputPath(job, new Path(outputDirectory));
+      }
+
+      // TODO(jarcec): Harcoded no reducers
+      job.setNumReduceTasks(0);
+
+      job.setOutputFormatClass(request.getOutputFormatClass());
+      job.setOutputKeyClass(request.getOutputKeyClass());
+      job.setOutputValueClass(request.getOutputValueClass());
+
+      job.submit();
+
+      String jobId = job.getJobID().toString();
+      request.getSummary().setExternalId(jobId);
+      request.getSummary().setExternalLink(job.getTrackingURL());
+
+      LOG.debug("Executed new map-reduce job with id " + jobId);
+    } catch (Exception e) {
+      LOG.error("Error in submitting job", e);
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void stop(String submissionId) {
+    try {
+      RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
+      if(runningJob == null) {
+        return;
+      }
+
+      runningJob.killJob();
+    } catch (IOException e) {
+      throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public SubmissionStatus status(String submissionId) {
+    try {
+      RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
+      if(runningJob == null) {
+        return SubmissionStatus.UNKNOWN;
+      }
+
+      int status = runningJob.getJobState();
+      return convertMapreduceState(status);
+
+    } catch (IOException e) {
+      throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public double progress(String submissionId) {
+    try {
+      // Get some reasonable approximation of map-reduce job progress
+      // TODO(jarcec): What if we're running without reducers?
+      RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
+      if(runningJob == null) {
+        // Return default value
+        return super.progress(submissionId);
+      }
+
+      return (runningJob.mapProgress() + runningJob.reduceProgress()) / 2;
+    } catch (IOException e) {
+      throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Counters stats(String submissionId) {
+    //TODO(jarcec): Not supported yet
+    return super.stats(submissionId);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String externalLink(String submissionId) {
+    try {
+      RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId));
+      if(runningJob == null) {
+        return null;
+      }
+
+      return runningJob.getTrackingURL();
+    } catch (IOException e) {
+      throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e);
+    }
+  }
+
+  /**
+   * Convert map-reduce specific job status constants to Sqoop job status
+   * constants.
+   *
+   * @param status Map-reduce job constant
+   * @return Equivalent submission status
+   */
+  protected SubmissionStatus convertMapreduceState(int status) {
+    if(status == JobStatus.PREP) {
+      return SubmissionStatus.BOOTING;
+    } else if (status == JobStatus.RUNNING) {
+      return SubmissionStatus.RUNNING;
+    } else if (status == JobStatus.FAILED) {
+      return SubmissionStatus.FAILED;
+    } else if (status == JobStatus.KILLED) {
+      return SubmissionStatus.FAILED;
+    } else if (status == JobStatus.SUCCEEDED) {
+      return SubmissionStatus.SUCCEEDED;
+    }
+
+    throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0004,
+      "Unknown status " + status);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java
----------------------------------------------------------------------
diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java
new file mode 100644
index 0000000..9296717
--- /dev/null
+++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionError.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.submission.mapreduce;
+
+import org.apache.sqoop.common.ErrorCode;
+
+/**
+ *
+ */
+public enum MapreduceSubmissionError implements ErrorCode {
+
+  MAPREDUCE_0001("Unknown error"),
+
+  MAPREDUCE_0002("Failure on submission engine initialization"),
+
+  MAPREDUCE_0003("Can't get RunningJob instance"),
+
+  MAPREDUCE_0004("Unknown map reduce job status"),
+
+  ;
+
+  private final String message;
+
+  private MapreduceSubmissionError(String message) {
+    this.message = message;
+  }
+
+  public String getCode() {
+    return name();
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/2481b7f8/submission/pom.xml
----------------------------------------------------------------------
diff --git a/submission/pom.xml b/submission/pom.xml
new file mode 100644
index 0000000..16550d9
--- /dev/null
+++ b/submission/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache</groupId>
+    <artifactId>sqoop</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop</groupId>
+  <artifactId>submission</artifactId>
+  <name>Sqoop Submission Engines</name>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>mapreduce</module>
+  </modules>
+
+</project>


Mime
View raw message