sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cheol...@apache.org
Subject [2/2] git commit: SQOOP-864: Introduce ETL context objects
Date Sun, 10 Feb 2013 20:56:36 GMT
Updated Branches:
  refs/heads/sqoop2 addc87ee4 -> ed9c51436


SQOOP-864: Introduce ETL context objects

(Jarcec Cecho via Cheolsoo Park)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ed9c5143
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ed9c5143
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ed9c5143

Branch: refs/heads/sqoop2
Commit: ed9c514361b508b739c76df31d7719660f095bd3
Parents: addc87e
Author: Cheolsoo Park <cheolsoo@apache.org>
Authored: Sun Feb 10 12:55:31 2013 -0800
Committer: Cheolsoo Park <cheolsoo@apache.org>
Committed: Sun Feb 10 12:55:31 2013 -0800

----------------------------------------------------------------------
 .../java/org/apache/sqoop/etl/io/DataReader.java   |   34 +++++++
 .../java/org/apache/sqoop/etl/io/DataWriter.java   |   34 +++++++
 .../org/apache/sqoop/job/etl/ActorContext.java     |   77 +++++++++++++++
 .../org/apache/sqoop/job/etl/DestroyerContext.java |   44 ++++++++
 .../org/apache/sqoop/job/etl/ExtractorContext.java |   45 +++++++++
 .../apache/sqoop/job/etl/InitializerContext.java   |   46 +++++++++
 .../org/apache/sqoop/job/etl/LoaderContext.java    |   46 +++++++++
 .../apache/sqoop/job/etl/PartitionerContext.java   |   47 +++++++++
 .../connector/jdbc/GenericJdbcExportDestroyer.java |    4 +-
 .../jdbc/GenericJdbcExportInitializer.java         |   10 +-
 .../connector/jdbc/GenericJdbcExportLoader.java    |    7 +-
 .../connector/jdbc/GenericJdbcImportDestroyer.java |    9 +-
 .../connector/jdbc/GenericJdbcImportExtractor.java |    8 +-
 .../jdbc/GenericJdbcImportInitializer.java         |   12 +-
 .../jdbc/GenericJdbcImportPartitioner.java         |    6 +-
 .../connector/jdbc/TestExportInitializer.java      |   11 ++-
 .../sqoop/connector/jdbc/TestExportLoader.java     |    7 +-
 .../sqoop/connector/jdbc/TestImportExtractor.java  |   17 ++--
 .../connector/jdbc/TestImportInitializer.java      |   13 ++-
 .../connector/jdbc/TestImportPartitioner.java      |   16 ++-
 .../apache/sqoop/framework/FrameworkManager.java   |   14 ++-
 .../sqoop/job/etl/HdfsExportPartitioner.java       |   11 +-
 .../sqoop/job/etl/HdfsSequenceExportExtractor.java |   17 ++--
 .../sqoop/job/etl/HdfsSequenceImportLoader.java    |    9 +-
 .../sqoop/job/etl/HdfsTextExportExtractor.java     |   20 ++--
 .../apache/sqoop/job/etl/HdfsTextImportLoader.java |    6 +-
 .../sqoop/job/mr/SqoopDestroyerExecutor.java       |    5 +-
 .../org/apache/sqoop/job/mr/SqoopInputFormat.java  |    4 +-
 .../java/org/apache/sqoop/job/mr/SqoopMapper.java  |    7 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java      |    8 +-
 .../java/org/apache/sqoop/job/TestHdfsExtract.java |    9 +-
 .../java/org/apache/sqoop/job/TestHdfsLoad.java    |   10 +-
 .../java/org/apache/sqoop/job/TestMapReduce.java   |   16 ++--
 .../job/mr/TestSqoopOutputFormatLoadExecutor.java  |   23 ++---
 .../java/org/apache/sqoop/job/etl/Destroyer.java   |    8 +-
 .../java/org/apache/sqoop/job/etl/Extractor.java   |   24 +++--
 .../java/org/apache/sqoop/job/etl/Initializer.java |    9 +-
 .../main/java/org/apache/sqoop/job/etl/Loader.java |   11 +--
 .../java/org/apache/sqoop/job/etl/Partitioner.java |   15 ++-
 .../java/org/apache/sqoop/job/io/DataReader.java   |   34 -------
 .../java/org/apache/sqoop/job/io/DataWriter.java   |   34 -------
 41 files changed, 556 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
new file mode 100644
index 0000000..3e1adc7
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/etl/io/DataReader.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.etl.io;
+
+/**
+ * An intermediate layer for passing data from the MR framework
+ * to the ETL framework.
+ */
+public abstract class DataReader {
+
+  public abstract Object[] readArrayRecord() throws Exception;
+
+  public abstract String readCsvRecord() throws Exception;
+
+  public abstract Object readContent(int type) throws Exception;
+
+  public abstract void setFieldDelimiter(char fieldDelimiter);
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
new file mode 100644
index 0000000..d81364e
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/etl/io/DataWriter.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.etl.io;
+
+/**
+ * An intermediate layer for passing data from the ETL framework
+ * to the MR framework.
+ */
+public abstract class DataWriter {
+
+  public abstract void writeArrayRecord(Object[] array);
+
+  public abstract void writeCsvRecord(String csv);
+
+  public abstract void writeContent(Object content, int type);
+
+  public abstract void setFieldDelimiter(char fieldDelimiter);
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/common/src/main/java/org/apache/sqoop/job/etl/ActorContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ActorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ActorContext.java
new file mode 100644
index 0000000..98b2f5e
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/job/etl/ActorContext.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.sqoop.common.ImmutableContext;
+
+/**
+ * Basic context class for each actor containing only the connector/framework
+ * context object.
+ */
+public abstract class ActorContext {
+
+  ImmutableContext context;
+
+  public ActorContext(ImmutableContext context) {
+    this.context = context;
+  }
+
+  /**
+   * Context object associated with the particular actor
+   *
+   * @return
+   */
+  public ImmutableContext getContext() {
+    return context;
+  }
+
+  /**
+   * Convenience method that will return value from wrapped context class.
+   */
+  public String getString(String key) {
+    return context.getString(key);
+  }
+
+  /**
+   * Convenience method that will return value from wrapped context class.
+   */
+  public String getString(String key, String defaultValue) {
+    return context.getString(key, defaultValue);
+  }
+
+  /**
+   * Convenience method that will return value from wrapped context class.
+   */
+  public long getLong(String key, long defaultValue) {
+    return context.getLong(key, defaultValue);
+  }
+
+  /**
+   * Convenience method that will return value from wrapped context class.
+   */
+  public int getInt(String key, int defaultValue) {
+    return context.getInt(key, defaultValue);
+  }
+
+  /**
+   * Convenience method that will return value from wrapped context class.
+   */
+  public boolean getBoolean(String key, boolean defaultValue) {
+    return context.getBoolean(key, defaultValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
new file mode 100644
index 0000000..10cfb10
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.sqoop.common.ImmutableContext;
+
+/**
+ * Context implementation for Destroyer.
+ *
+ * This class is wrapping information if the run was successful or not.
+ */
+public class DestroyerContext extends ActorContext {
+
+  private boolean success;
+
+  public DestroyerContext(ImmutableContext context, boolean success) {
+    super(context);
+    this.success = success;
+  }
+
+  /**
+   * Return true if the job was successful.
+   *
+   * @return True if the job was successful
+   */
+  public boolean isSuccess() {
+    return success;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
new file mode 100644
index 0000000..f9d7a8b
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.etl.io.DataWriter;
+
+/**
+ * Context implementation for Extractor.
+ *
+ * This class is wrapping writer object.
+ */
+public class ExtractorContext extends ActorContext {
+
+  private DataWriter writer;
+
+  public ExtractorContext(ImmutableContext context, DataWriter writer) {
+    super(context);
+    this.writer = writer;
+  }
+
+  /**
+   * Return associated data writer object.
+   *
+   * @return Data writer object for extract output
+   */
+  public DataWriter getDataWriter() {
+    return writer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java
new file mode 100644
index 0000000..d2e2dfc
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.sqoop.common.MutableContext;
+
+/**
+ *
+ * Context implementation for Initializer.
+ *
+ * This class is returning mutable context instead of immutable.
+ */
+public class InitializerContext extends ActorContext {
+
+  public InitializerContext(MutableContext context) {
+    super(context);
+  }
+
+  /**
+   * Return mutable context.
+   *
+   * Initializer can set up multiple properties that will be passed to each
+   * extractor and loader.
+   *
+   * @return Mutable context object
+   */
+  @Override
+  public MutableContext getContext() {
+    return (MutableContext)super.getContext();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
new file mode 100644
index 0000000..dad19f1
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.etl.io.DataReader;
+
+/**
+ * Context implementation for Loader.
+ *
+ * This class is also wrapping reader object.
+ */
+public class LoaderContext extends ActorContext {
+
+  DataReader reader;
+
+  public LoaderContext(ImmutableContext context, DataReader reader) {
+    super(context);
+    this.reader = reader;
+  }
+
+  /**
+   * Return associated data reader object.
+   *
+   * @return Data reader object for loader input
+   */
+  public DataReader getDataReader() {
+    return reader;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
new file mode 100644
index 0000000..5e7cea7
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.job.etl;
+
+import org.apache.sqoop.common.ImmutableContext;
+
+/**
+ * Context implementation for Partitioner.
+ *
+ * This class is also wrapping number of maximal allowed partitions.
+ */
+public class PartitionerContext extends ActorContext {
+
+  private long maxPartitions;
+
+  public PartitionerContext(ImmutableContext context, long maxPartitions) {
+    super(context);
+    this.maxPartitions = maxPartitions;
+  }
+
+  /**
+   * Return maximal number of partitions.
+   *
+   * Framework will ensure that number of returned partitions is not bigger
+   * than this number.
+   *
+   * @return
+   */
+  public long getMaxPartitions() {
+    return maxPartitions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
index 2d53bdd..588e236 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java
@@ -18,17 +18,17 @@
 package org.apache.sqoop.connector.jdbc;
 
 import org.apache.log4j.Logger;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
 
 public class GenericJdbcExportDestroyer extends Destroyer<ConnectionConfiguration, ExportJobConfiguration> {
 
   private static final Logger LOG = Logger.getLogger(GenericJdbcExportDestroyer.class);
 
   @Override
-  public void destroy(boolean success, ImmutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
+  public void destroy(DestroyerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
     LOG.info("Running generic JDBC connector destroyer");
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
index 06fbc51..520b0bb 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java
@@ -21,12 +21,12 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> {
@@ -34,17 +34,17 @@ public class GenericJdbcExportInitializer extends Initializer<ConnectionConfigur
   private GenericJdbcExecutor executor;
 
   @Override
-  public void initialize(MutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
-    configureJdbcProperties(context, connection, job);
+  public void initialize(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
+    configureJdbcProperties(context.getContext(), connection, job);
     try {
-      configureTableProperties(context, connection, job);
+      configureTableProperties(context.getContext(), connection, job);
     } finally {
       executor.close();
     }
   }
 
   @Override
-  public List<String> getJars(ImmutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
+  public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ExportJobConfiguration job) {
     List<String> jars = new LinkedList<String>();
 
     jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
index 5f8e129..f62339f 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java
@@ -17,11 +17,10 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.etl.Loader;
-import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.job.etl.LoaderContext;
 
 public class GenericJdbcExportLoader extends Loader<ConnectionConfiguration, ExportJobConfiguration> {
 
@@ -31,7 +30,7 @@ public class GenericJdbcExportLoader extends Loader<ConnectionConfiguration, Exp
   private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
 
   @Override
-  public void load(ImmutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job, DataReader reader) throws Exception{
+  public void load(LoaderContext context, ConnectionConfiguration connection, ExportJobConfiguration job) throws Exception{
     String driver = connection.connection.jdbcDriver;
     String url = connection.connection.connectionString;
     String username = connection.connection.username;
@@ -46,7 +45,7 @@ public class GenericJdbcExportLoader extends Loader<ConnectionConfiguration, Exp
       int numberOfBatches = 0;
       Object[] array;
 
-      while ((array = reader.readArrayRecord()) != null) {
+      while ((array = context.getDataReader().readArrayRecord()) != null) {
         numberOfRows++;
         executor.addBatch(array);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
index f7043ea..60a69e0 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java
@@ -17,16 +17,19 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
-import org.apache.sqoop.common.ImmutableContext;
+import org.apache.log4j.Logger;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
 
 public class GenericJdbcImportDestroyer extends Destroyer<ConnectionConfiguration, ImportJobConfiguration> {
 
+  private static final Logger LOG = Logger.getLogger(GenericJdbcExportDestroyer.class);
+
   @Override
-  public void destroy(boolean success, ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
-    // No explicit action at the moment
+  public void destroy(DestroyerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
+    LOG.info("Running generic JDBC connector destroyer");
   }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
index 9db3328..d54b430 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java
@@ -22,13 +22,11 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 
 import org.apache.log4j.Logger;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
-import org.apache.sqoop.job.etl.Partition;
+import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.etl.Extractor;
-import org.apache.sqoop.job.io.DataWriter;
 
 public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration, GenericJdbcImportPartition> {
 
@@ -36,7 +34,7 @@ public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguratio
 
  private long rowsRead = 0;
   @Override
-  public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition, DataWriter writer) {
+  public void extract(ExtractorContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition) {
     String driver = connection.connection.jdbcDriver;
     String url = connection.connection.connectionString;
     String username = connection.connection.username;
@@ -59,7 +57,7 @@ public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguratio
         for (int i = 0; i< column; i++) {
           array[i] = resultSet.getObject(i+1);
         }
-        writer.writeArrayRecord(array);
+        context.getDataWriter().writeArrayRecord(array);
         rowsRead++;
       }
     } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
index a509e2b..ad95e65 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
@@ -25,13 +25,13 @@ import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class GenericJdbcImportInitializer extends Initializer<ConnectionConfiguration, ImportJobConfiguration> {
@@ -42,18 +42,18 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
   private GenericJdbcExecutor executor;
 
   @Override
-  public void initialize(MutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
-    configureJdbcProperties(context, connection, job);
+  public void initialize(InitializerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
+    configureJdbcProperties(context.getContext(), connection, job);
     try {
-      configurePartitionProperties(context, connection, job);
-      configureTableProperties(context, connection, job);
+      configurePartitionProperties(context.getContext(), connection, job);
+      configureTableProperties(context.getContext(), connection, job);
     } finally {
       executor.close();
     }
   }
 
   @Override
-  public List<String> getJars(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
+  public List<String> getJars(InitializerContext context, ConnectionConfiguration connection, ImportJobConfiguration job) {
     List<String> jars = new LinkedList<String>();
 
     jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
index b5dbc61..d276c57 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
@@ -21,12 +21,12 @@ import java.sql.Types;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
 
 public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> {
 
@@ -37,8 +37,8 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
   private String partitionMaxValue;
 
   @Override
-  public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, ConnectionConfiguration connection, ImportJobConfiguration job) {
-    numberPartitions = maxPartitions;
+  public List<Partition> getPartitions(PartitionerContext context,ConnectionConfiguration connection, ImportJobConfiguration job) {
+    numberPartitions = context.getMaxPartitions();
     partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
     partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
     partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
index c876780..bb0c23b 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportInitializer.java
@@ -23,8 +23,8 @@ import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
-import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
 
 public class TestExportInitializer extends TestCase {
 
@@ -66,9 +66,10 @@ public class TestExportInitializer extends TestCase {
     jobConf.table.tableName = tableName;
 
     MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
 
     Initializer initializer = new GenericJdbcExportInitializer();
-    initializer.initialize(context, connConf, jobConf);
+    initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
         "INSERT INTO " + executor.delimitIdentifier(tableName)
@@ -85,9 +86,10 @@ public class TestExportInitializer extends TestCase {
     jobConf.table.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
 
     Initializer initializer = new GenericJdbcExportInitializer();
-    initializer.initialize(context, connConf, jobConf);
+    initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
         "INSERT INTO " + executor.delimitIdentifier(tableName)
@@ -103,9 +105,10 @@ public class TestExportInitializer extends TestCase {
     jobConf.table.sql = tableSql;
 
     MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
 
     Initializer initializer = new GenericJdbcExportInitializer();
-    initializer.initialize(context, connConf, jobConf);
+    initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
         "INSERT INTO " + executor.delimitIdentifier(tableName)

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
index 5892cbb..50a32d9 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExportLoader.java
@@ -26,7 +26,8 @@ import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.etl.Loader;
-import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.job.etl.LoaderContext;
+import org.apache.sqoop.etl.io.DataReader;
 
 public class TestExportLoader extends TestCase {
 
@@ -73,8 +74,8 @@ public class TestExportLoader extends TestCase {
 
     Loader loader = new GenericJdbcExportLoader();
     DummyReader reader = new DummyReader();
-
-    loader.load(context, connectionConfig, jobConfig, reader);
+    LoaderContext loaderContext = new LoaderContext(context, reader);
+    loader.load(loaderContext, connectionConfig, jobConfig);
 
     int index = START;
     ResultSet rs = executor.executeQuery("SELECT * FROM "

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
index 0be713e..54ffe5b 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportExtractor.java
@@ -24,7 +24,8 @@ import org.apache.sqoop.common.MutableMapContext;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.etl.Extractor;
-import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.job.etl.ExtractorContext;
+import org.apache.sqoop.etl.io.DataWriter;
 
 public class TestImportExtractor extends TestCase {
 
@@ -80,18 +81,19 @@ public class TestImportExtractor extends TestCase {
 
     Extractor extractor = new GenericJdbcImportExtractor();
     DummyWriter writer = new DummyWriter();
+    ExtractorContext extractorContext = new ExtractorContext(context, writer);
 
     partition = new GenericJdbcImportPartition();
     partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
-    extractor.run(context, connectionConfig, jobConfig, partition, writer);
+    extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
 
     partition = new GenericJdbcImportPartition();
     partition.setConditions("-16.6666666666666665 <= DCOL AND DCOL < 16.666666666666667");
-    extractor.run(context, connectionConfig, jobConfig, partition, writer);
+    extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
 
     partition = new GenericJdbcImportPartition();
     partition.setConditions("16.666666666666667 <= DCOL AND DCOL <= 50.0");
-    extractor.run(context, connectionConfig, jobConfig, partition, writer);
+    extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
   }
 
   public void testSubquery() throws Exception {
@@ -113,18 +115,19 @@ public class TestImportExtractor extends TestCase {
 
     Extractor extractor = new GenericJdbcImportExtractor();
     DummyWriter writer = new DummyWriter();
+    ExtractorContext extractorContext = new ExtractorContext(context, writer);
 
     partition = new GenericJdbcImportPartition();
     partition.setConditions("-50 <= ICOL AND ICOL < -16");
-    extractor.run(context, connectionConfig, jobConfig, partition, writer);
+    extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
 
     partition = new GenericJdbcImportPartition();
     partition.setConditions("-16 <= ICOL AND ICOL < 17");
-    extractor.run(context, connectionConfig, jobConfig, partition, writer);
+    extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
 
     partition = new GenericJdbcImportPartition();
     partition.setConditions("17 <= ICOL AND ICOL < 50");
-    extractor.run(context, connectionConfig, jobConfig, partition, writer);
+    extractor.extract(extractorContext, connectionConfig, jobConfig, partition);
   }
 
   public class DummyWriter extends DataWriter {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
index f3c1d90..45835bd 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
@@ -27,6 +27,7 @@ import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
 
 public class TestImportInitializer extends TestCase {
 
@@ -78,9 +79,10 @@ public class TestImportInitializer extends TestCase {
     jobConf.table.tableName = tableName;
 
     MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
 
     Initializer initializer = new GenericJdbcImportInitializer();
-    initializer.initialize(context, connConf, jobConf);
+    initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
         "SELECT * FROM " + executor.delimitIdentifier(tableName)
@@ -102,9 +104,10 @@ public class TestImportInitializer extends TestCase {
     jobConf.table.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
 
     Initializer initializer = new GenericJdbcImportInitializer();
-    initializer.initialize(context, connConf, jobConf);
+    initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
         "SELECT ICOL,VCOL FROM " + executor.delimitIdentifier(tableName)
@@ -126,9 +129,10 @@ public class TestImportInitializer extends TestCase {
     jobConf.table.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
 
     Initializer initializer = new GenericJdbcImportInitializer();
-    initializer.initialize(context, connConf, jobConf);
+    initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
         "SELECT * FROM " + executor.delimitIdentifier(tableName)
@@ -151,9 +155,10 @@ public class TestImportInitializer extends TestCase {
     jobConf.table.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
 
     Initializer initializer = new GenericJdbcImportInitializer();
-    initializer.initialize(context, connConf, jobConf);
+    initializer.initialize(initializerContext, connConf, jobConf);
 
     verifyResult(context,
         "SELECT SQOOP_SUBQUERY_ALIAS.ICOL,SQOOP_SUBQUERY_ALIAS.VCOL FROM "

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
index 77c4739..43eb1c2 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
@@ -30,6 +30,7 @@ import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
 
 public class TestImportPartitioner extends TestCase {
 
@@ -55,7 +56,8 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    List<Partition> partitions = partitioner.getPartitions(context, 5, connConf, jobConf);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
         "-5 <= ICOL AND ICOL < -3",
@@ -85,7 +87,8 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    List<Partition> partitions = partitioner.getPartitions(context, 3, connConf, jobConf);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
         "-5 <= ICOL AND ICOL < -1",
@@ -113,7 +116,8 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    List<Partition> partitions = partitioner.getPartitions(context, 13, connConf, jobConf);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 13);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
         "-5 <= ICOL AND ICOL < -4",
@@ -148,7 +152,8 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    List<Partition> partitions = partitioner.getPartitions(context, 5, connConf, jobConf);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
         "-5.0 <= DCOL AND DCOL < -3.0",
@@ -179,7 +184,8 @@ public class TestImportPartitioner extends TestCase {
     ImportJobConfiguration jobConf = new ImportJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcImportPartitioner();
-    List<Partition> partitions = partitioner.getPartitions(context, 3, connConf, jobConf);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf, jobConf);
 
     verifyResult(partitions, new String[] {
         "-5.0 <= DCOL AND DCOL < -1.6666666666666665",

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
index 6aab2db..e67ed20 100644
--- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java
@@ -28,7 +28,9 @@ import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.framework.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.etl.CallbackBase;
 import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
 import org.apache.sqoop.job.etl.Initializer;
+import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.model.FormUtils;
 import org.apache.sqoop.model.MConnection;
 import org.apache.sqoop.model.MConnectionForms;
@@ -425,13 +427,16 @@ public class FrameworkManager {
         "Can't create initializer instance: " + initializerClass.getName());
     }
 
+    // Initializer context
+    InitializerContext initializerContext = new InitializerContext(request.getConnectorContext());
+
     // Initialize submission from connector perspective
-    initializer.initialize(request.getConnectorContext(),
+    initializer.initialize(initializerContext,
       request.getConfigConnectorConnection(),
       request.getConfigConnectorJob());
 
     // Add job specific jars to
-    request.addJars(initializer.getJars(request.getConnectorContext(),
+    request.addJars(initializer.getJars(initializerContext,
       request.getConfigConnectorConnection(),
       request.getConfigConnectorJob()));
 
@@ -516,9 +521,10 @@ public class FrameworkManager {
         "Can't create destroyer instance: " + destroyerClass.getName());
     }
 
+    DestroyerContext destroyerContext = new DestroyerContext(request.getConnectorContext(), false);
+
     // Initialize submission from connector perspective
-    destroyer.destroy(false, request.getConnectorContext(),
-      request.getConfigConnectorConnection(), request.getConfigConnectorJob());
+    destroyer.destroy(destroyerContext, request.getConfigConnectorConnection(), request.getConfigConnectorJob());
   }
 
   public MSubmission stop(long jobId) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
index 71e0060..115ca54 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java
@@ -36,10 +36,8 @@ import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
@@ -66,13 +64,14 @@ public class HdfsExportPartitioner extends Partitioner {
       new HashMap<String, Set<String>>();
 
   @Override
-  public List<Partition> getPartitions(ImmutableContext context,
-      long numTasks, Object connectionConfiguration, Object jobConfiguration) {
-    Configuration conf = ((PrefixContext)context).getConfiguration();
+  public List<Partition> getPartitions(PartitionerContext context,
+      Object connectionConfiguration, Object jobConfiguration) {
+
+    Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
 
     try {
       long numInputBytes = getInputSize(conf);
-      maxSplitSize = numInputBytes / numTasks;
+      maxSplitSize = numInputBytes / context.getMaxPartitions();
 
       long minSizeNode = 0;
       long minSizeRack = 0;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
index 45b6166..2280828 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java
@@ -25,14 +25,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
 import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.etl.io.DataWriter;
 
 public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> {
 
@@ -40,7 +39,7 @@ public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfigurati
     LogFactory.getLog(HdfsSequenceExportExtractor.class.getName());
 
   private Configuration conf;
-  private DataWriter datawriter;
+  private DataWriter dataWriter;
 
   private final char fieldDelimiter;
 
@@ -49,12 +48,12 @@ public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfigurati
   }
 
   @Override
-  public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration,
-      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) {
-    writer.setFieldDelimiter(fieldDelimiter);
+  public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration,
+      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
 
-    conf = ((PrefixContext)context).getConfiguration();
-    datawriter = writer;
+    conf = ((PrefixContext)context.getContext()).getConfiguration();
+    dataWriter = context.getDataWriter();
+    dataWriter.setFieldDelimiter(fieldDelimiter);
 
     try {
       LOG.info("Working on partition: " + partition);
@@ -84,7 +83,7 @@ public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfigurati
     Text line = new Text();
     boolean hasNext = filereader.next(line);
     while (hasNext) {
-      datawriter.writeCsvRecord(line.toString());
+      dataWriter.writeCsvRecord(line.toString());
       line = new Text();
       hasNext = filereader.next(line);
       if(filereader.getPosition() >= end && filereader.syncSeen()) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
index a5d6b9c..a07c511 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java
@@ -27,12 +27,11 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class HdfsSequenceImportLoader extends Loader {
@@ -46,13 +45,13 @@ public class HdfsSequenceImportLoader extends Loader {
   }
 
   @Override
-  public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{
+  public void load(LoaderContext context, Object oc, Object oj) throws Exception {
+    DataReader reader = context.getDataReader();
     reader.setFieldDelimiter(fieldDelimiter);
 
     Configuration conf = new Configuration();
 //    Configuration conf = ((EtlContext)context).getConfiguration();
-    String filename =
-        context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
+    String filename = context.getString(JobConstants.JOB_MR_OUTPUT_FILE);
     String codecname = context.getString(JobConstants.JOB_MR_OUTPUT_CODEC);
 
     CompressionCodec codec = null;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
index ed30c91..ae419ff 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java
@@ -18,7 +18,6 @@
 package org.apache.sqoop.job.etl;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,14 +30,13 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.util.LineReader;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
 import org.apache.sqoop.framework.configuration.ExportJobConfiguration;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.etl.io.DataWriter;
 
 public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition>  {
 
@@ -46,7 +44,7 @@ public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration,
     LogFactory.getLog(HdfsTextExportExtractor.class.getName());
 
   private Configuration conf;
-  private DataWriter datawriter;
+  private DataWriter dataWriter;
 
   private final char fieldDelimiter;
 
@@ -55,15 +53,15 @@ public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration,
   }
 
   @Override
-  public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration,
-      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) {
-    writer.setFieldDelimiter(fieldDelimiter);
+  public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration,
+      ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) {
 
-    conf = ((PrefixContext)context).getConfiguration();
-    datawriter = writer;
+    conf = ((PrefixContext)context.getContext()).getConfiguration();
+    dataWriter = context.getDataWriter();
+    dataWriter.setFieldDelimiter(fieldDelimiter);
 
     try {
-      HdfsExportPartition p = (HdfsExportPartition)partition;
+      HdfsExportPartition p = partition;
       LOG.info("Working on partition: " + p);
       int numFiles = p.getNumberOfFiles();
       for (int i=0; i<numFiles; i++) {
@@ -120,7 +118,7 @@ public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration,
       } else {
         next = fileseeker.getPos();
       }
-      datawriter.writeCsvRecord(line.toString());
+      dataWriter.writeCsvRecord(line.toString());
     }
     LOG.info("Extracting ended on position: " + fileseeker.getPos());
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
index 490b1c2..4621942 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextImportLoader.java
@@ -27,12 +27,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class HdfsTextImportLoader extends Loader {
@@ -46,7 +45,8 @@ public class HdfsTextImportLoader extends Loader {
   }
 
   @Override
-  public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{
+  public void load(LoaderContext context, Object oc, Object oj) throws Exception{
+    DataReader reader = context.getDataReader();
     reader.setFieldDelimiter(fieldDelimiter);
 
     Configuration conf = new Configuration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
index f4ef95a..4493a45 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Destroyer;
+import org.apache.sqoop.job.etl.DestroyerContext;
 import org.apache.sqoop.utils.ClassUtils;
 
 /**
@@ -54,8 +55,10 @@ public class SqoopDestroyerExecutor {
     Object configConnection = ConfigurationUtils.getConnectorConnection(configuration);
     Object configJob = ConfigurationUtils.getConnectorJob(configuration);
 
+    DestroyerContext destroyerContext = new DestroyerContext(subContext, success);
+
     LOG.info("Executing destroyer class " + destroyer.getClass());
-    destroyer.destroy(success, subContext, configConnection, configJob);
+    destroyer.destroy(destroyerContext, configConnection, configJob);
   }
 
   private SqoopDestroyerExecutor() {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
index d191e03..0721b7e 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java
@@ -36,6 +36,7 @@ import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
 import org.apache.sqoop.utils.ClassUtils;
 
 /**
@@ -65,8 +66,9 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
     Object connectorJob = ConfigurationUtils.getConnectorJob(conf);
 
     long maxPartitions = conf.getLong(JobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
+    PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions);
 
-    List<Partition> partitions = partitioner.getPartitions(connectorContext, maxPartitions, connectorConnection, connectorJob);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorConnection, connectorJob);
     List<InputSplit> splits = new LinkedList<InputSplit>();
     for (Partition partition : partitions) {
       LOG.debug("Partition: " + partition);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
index ce847f4..2a82303 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java
@@ -29,8 +29,9 @@ import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataWriter;
+import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.submission.counter.SqoopCounters;
 import org.apache.sqoop.utils.ClassUtils;
 
@@ -72,11 +73,11 @@ public class SqoopMapper
     }
 
     SqoopSplit split = context.getCurrentKey();
+    ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context));
 
     try {
       LOG.info("Running extractor class " + extractorName);
-      extractor.run(subContext, configConnection, configJob, split.getPartition(),
-        new MapDataWriter(context));
+      extractor.extract(extractorContext, configConnection, configJob, split.getPartition());
       LOG.info("Extractor has finished");
       context.getCounter(SqoopCounters.ROWS_READ)
               .increment(extractor.getRowsRead());

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
index c5f3abd..d47f861 100644
--- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java
@@ -36,8 +36,9 @@ import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.MapreduceExecutionError;
 import org.apache.sqoop.job.PrefixContext;
 import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
+import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class SqoopOutputFormatLoadExecutor {
@@ -208,8 +209,11 @@ public class SqoopOutputFormatLoadExecutor {
           }
         }
 
+        // Create loader context
+        LoaderContext loaderContext = new LoaderContext(subContext, reader);
+
         LOG.info("Running loader class " + loaderName);
-        loader.load(subContext, configConnection, configJob, reader);
+        loader.load(loaderContext, configConnection, configJob);
         LOG.info("Loader has finished");
       } catch (Throwable t) {
         readerFinished = true;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
index dbd1bbf..fae6573 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java
@@ -33,15 +33,13 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.job.etl.HdfsExportPartitioner;
 import org.apache.sqoop.job.etl.HdfsSequenceExportExtractor;
 import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
 import org.apache.sqoop.job.etl.HdfsTextExportExtractor;
 import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
 import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
 import org.junit.Test;
 
@@ -228,12 +226,11 @@ public class TestHdfsExtract extends TestCase {
 
   public static class DummyLoader extends Loader {
     @Override
-    public void load(ImmutableContext context, Object oc, Object oj, DataReader reader)
-        throws Exception {
+    public void load(LoaderContext context, Object oc, Object oj) throws Exception {
       int index = 1;
       int sum = 0;
       Object[] array;
-      while ((array = reader.readArrayRecord()) != null) {
+      while ((array = context.getDataReader().readArrayRecord()) != null) {
         sum += Integer.valueOf(array[0].toString());
         index++;
       };

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
index 6e1c958..14591b4 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java
@@ -34,14 +34,14 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.etl.HdfsSequenceImportLoader;
 import org.apache.sqoop.job.etl.HdfsTextImportLoader;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
 import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataWriter;
 import org.apache.sqoop.job.mr.SqoopFileOutputFormat;
 
 public class TestHdfsLoad extends TestCase {
@@ -204,7 +204,7 @@ public class TestHdfsLoad extends TestCase {
 
   public static class DummyPartitioner extends Partitioner {
     @Override
-    public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, Object oc, Object oj) {
+    public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) {
       List<Partition> partitions = new LinkedList<Partition>();
       for (int id = START_ID; id <= NUMBER_OF_IDS; id++) {
         DummyPartition partition = new DummyPartition();
@@ -217,7 +217,7 @@ public class TestHdfsLoad extends TestCase {
 
   public static class DummyExtractor extends Extractor {
     @Override
-    public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) {
+    public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
       int id = ((DummyPartition)partition).getId();
       for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) {
         Object[] array = new Object[] {
@@ -225,7 +225,7 @@ public class TestHdfsLoad extends TestCase {
           (double) (id * NUMBER_OF_ROWS_PER_ID + row),
           String.valueOf(id*NUMBER_OF_ROWS_PER_ID+row)
         };
-        writer.writeArrayRecord(array);
+        context.getDataWriter().writeArrayRecord(array);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
index 427132e..ee03427 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java
@@ -34,14 +34,14 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.job.etl.Extractor;
+import org.apache.sqoop.job.etl.ExtractorContext;
 import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
+import org.apache.sqoop.job.etl.PartitionerContext;
 import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
-import org.apache.sqoop.job.io.DataWriter;
 import org.apache.sqoop.job.mr.SqoopInputFormat;
 import org.apache.sqoop.job.mr.SqoopMapper;
 import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
@@ -120,7 +120,7 @@ public class TestMapReduce extends TestCase {
 
   public static class DummyPartitioner extends Partitioner {
     @Override
-    public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, Object oc, Object oj) {
+    public List<Partition> getPartitions(PartitionerContext context, Object oc, Object oj) {
       List<Partition> partitions = new LinkedList<Partition>();
       for (int id = START_PARTITION; id <= NUMBER_OF_PARTITIONS; id++) {
         DummyPartition partition = new DummyPartition();
@@ -133,10 +133,10 @@ public class TestMapReduce extends TestCase {
 
   public static class DummyExtractor extends Extractor {
     @Override
-    public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) {
+    public void extract(ExtractorContext context, Object oc, Object oj, Object partition) {
       int id = ((DummyPartition)partition).getId();
       for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) {
-        writer.writeArrayRecord(new Object[] {
+        context.getDataWriter().writeArrayRecord(new Object[] {
             id * NUMBER_OF_ROWS_PER_PARTITION + row,
             (double) (id * NUMBER_OF_ROWS_PER_PARTITION + row),
             String.valueOf(id*NUMBER_OF_ROWS_PER_PARTITION+row)});
@@ -216,9 +216,9 @@ public class TestMapReduce extends TestCase {
     private Data actual = new Data();
 
     @Override
-    public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{
+    public void load(LoaderContext context, Object oc, Object oj) throws Exception{
       Object[] array;
-      while ((array = reader.readArrayRecord()) != null) {
+      while ((array = context.getDataReader().readArrayRecord()) != null) {
         actual.setContent(array, Data.ARRAY_RECORD);
 
         expected.setContent(new Object[] {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
----------------------------------------------------------------------
diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
index 1a300ae..d2b501e 100644
--- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
+++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java
@@ -22,12 +22,11 @@ import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.sqoop.common.ImmutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.job.JobConstants;
 import org.apache.sqoop.job.etl.Loader;
+import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.job.io.Data;
-import org.apache.sqoop.job.io.DataReader;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -46,9 +45,8 @@ public class TestSqoopOutputFormatLoadExecutor {
     }
 
     @Override
-    public void load(ImmutableContext context, Object connectionConfiguration,
-                     Object jobConfiguration, DataReader reader) throws Exception {
-      reader.readContent(Data.CSV_RECORD);
+    public void load(LoaderContext context, Object cc, Object jc) throws Exception {
+      context.getDataReader().readContent(Data.CSV_RECORD);
       throw new BrokenBarrierException();
     }
   }
@@ -59,12 +57,11 @@ public class TestSqoopOutputFormatLoadExecutor {
     }
 
     @Override
-    public void load(ImmutableContext context, Object connectionConfiguration,
-                     Object jobConfiguration, DataReader reader) throws Exception {
+    public void load(LoaderContext context, Object cc, Object jc) throws Exception {
       int runCount = 0;
       Object o;
       String[] arr;
-      while ((o = reader.readContent(Data.CSV_RECORD)) != null) {
+      while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) {
         arr = o.toString().split(",");
         Assert.assertEquals(100, arr.length);
         for (int i = 0; i < arr.length; i++) {
@@ -85,9 +82,8 @@ public class TestSqoopOutputFormatLoadExecutor {
     }
 
     @Override
-    public void load(ImmutableContext context, Object connectionConfiguration,
-                     Object jobConfiguration, DataReader reader) throws Exception {
-      String[] arr = reader.readContent(Data.CSV_RECORD).toString().split(",");
+    public void load(LoaderContext context, Object cc, Object jc) throws Exception {
+      String[] arr = context.getDataReader().readContent(Data.CSV_RECORD).toString().split(",");
       Assert.assertEquals(100, arr.length);
       for (int i = 0; i < arr.length; i++) {
         Assert.assertEquals(i, Integer.parseInt(arr[i]));
@@ -102,12 +98,11 @@ public class TestSqoopOutputFormatLoadExecutor {
     }
 
     @Override
-    public void load(ImmutableContext context, Object connectionConfiguration,
-                     Object jobConfiguration, DataReader reader) throws Exception {
+    public void load(LoaderContext context, Object cc, Object jc) throws Exception {
       int runCount = 0;
       Object o;
       String[] arr;
-      while ((o = reader.readContent(Data.CSV_RECORD)) != null) {
+      while ((o = context.getDataReader().readContent(Data.CSV_RECORD)) != null) {
         arr = o.toString().split(",");
         Assert.assertEquals(100, arr.length);
         for (int i = 0; i < arr.length; i++) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
index cf2ed9a..149ad2c 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
@@ -17,8 +17,6 @@
  */
 package org.apache.sqoop.job.etl;
 
-import org.apache.sqoop.common.ImmutableContext;
-
 /**
  * This allows connector to define work to complete execution, for example,
  * resource cleaning.
@@ -28,13 +26,11 @@ public abstract class Destroyer<ConnectionConfiguration, JobConfiguration> {
   /**
    * Callback to clean up after job execution.
    *
-   * @param success True if the execution was successfull
-   * @param context Connector context object
+   * @param context Destroyer context
    * @param connectionConfiguration Connection configuration object
    * @param jobConfiguration Job configuration object
    */
-  public abstract void destroy(boolean success,
-                               ImmutableContext context,
+  public abstract void destroy(DestroyerContext context,
                                ConnectionConfiguration connectionConfiguration,
                                JobConfiguration jobConfiguration);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
index 300cf4e..93b3643 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java
@@ -17,31 +17,35 @@
  */
 package org.apache.sqoop.job.etl;
 
-import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.job.io.DataWriter;
-
 /**
  * This allows connector to extract data from a source system
  * based on each partition.
  */
 public abstract class Extractor<ConnectionConfiguration, JobConfiguration, Partition> {
 
-  public abstract void run(ImmutableContext context,
-                           ConnectionConfiguration connectionConfiguration,
-                           JobConfiguration jobConfiguration,
-                           Partition partition,
-                           DataWriter writer);
+  /**
+   * Extract data from source and pass them into the framework.
+   *
+   * @param context Extractor context object
+   * @param connectionConfiguration Connection configuration
+   * @param jobConfiguration Job configuration
+   * @param partition Partition that this extract should work on
+   */
+  public abstract void extract(ExtractorContext context,
+                               ConnectionConfiguration connectionConfiguration,
+                               JobConfiguration jobConfiguration,
+                               Partition partition);
 
   /**
    * Return the number of rows read by the last call to
-   * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) }
+   * {@linkplain Extractor#extract(org.apache.sqoop.job.etl.ExtractorContext, java.lang.Object, java.lang.Object, Partition) }
    * method. This method returns only the number of rows read in the last call,
    * and not a cumulative total of the number of rows read by this Extractor
    * since its creation. If no calls were made to the run method, this method's
    * behavior is undefined.
    *
    * @return the number of rows read by the last call to
-   * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) }
+   * {@linkplain Extractor#extract(org.apache.sqoop.job.etl.ExtractorContext, java.lang.Object, java.lang.Object, Partition) }
    */
   public abstract long getRowsRead();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ed9c5143/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
index 3fb6be0..346b84c 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
@@ -17,9 +17,6 @@
  */
 package org.apache.sqoop.job.etl;
 
-import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.common.MutableContext;
-
 import java.util.LinkedList;
 import java.util.List;
 
@@ -34,11 +31,11 @@ public abstract class Initializer<ConnectionConfiguration, JobConfiguration> {
    * needed temporary values might be saved to context object and they will be
    * promoted to all other part of the workflow automatically.
    *
-   * @param context Changeable context object, purely for connector usage
+   * @param context Initializer context object
    * @param connectionConfiguration Connector's connection configuration object
    * @param jobConfiguration Connector's job configuration object
    */
-  public abstract void initialize(MutableContext context,
+  public abstract void initialize(InitializerContext context,
                                   ConnectionConfiguration connectionConfiguration,
                                   JobConfiguration jobConfiguration);
 
@@ -49,7 +46,7 @@ public abstract class Initializer<ConnectionConfiguration, JobConfiguration> {
    *
    * @return
    */
-  public List<String> getJars(ImmutableContext context,
+  public List<String> getJars(InitializerContext context,
                               ConnectionConfiguration connectionConfiguration,
                               JobConfiguration jobConfiguration) {
     return new LinkedList<String>();


Mime
View raw message