sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject [2/2] sqoop git commit: SQOOP-1690: Implement doAs for Sqoop2
Date Tue, 27 Oct 2015 16:27:27 GMT
SQOOP-1690: Implement doAs for Sqoop2

(Abraham Fine via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bc0de7c1
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bc0de7c1
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bc0de7c1

Branch: refs/heads/sqoop2
Commit: bc0de7c199a74bdc0b804696159c31e13bdd5c3b
Parents: 0773c10
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Tue Oct 27 09:26:59 2015 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Tue Oct 27 09:26:59 2015 -0700

----------------------------------------------------------------------
 .../apache/sqoop/job/etl/DestroyerContext.java  |  14 +-
 .../apache/sqoop/job/etl/ExtractorContext.java  |  13 +-
 .../sqoop/job/etl/InitializerContext.java       |  14 +-
 .../org/apache/sqoop/job/etl/LoaderContext.java |  15 +-
 .../sqoop/job/etl/PartitionerContext.java       |  14 +-
 .../sqoop/connector/ftp/TestFtpLoader.java      |   2 +-
 .../sqoop/connector/jdbc/TestExtractor.java     |   8 +-
 .../connector/jdbc/TestFromInitializer.java     |  26 +--
 .../apache/sqoop/connector/jdbc/TestLoader.java |   2 +-
 .../sqoop/connector/jdbc/TestPartitioner.java   |  32 ++--
 .../sqoop/connector/jdbc/TestToInitializer.java |  18 +-
 .../sqoop/connector/hdfs/HdfsExtractor.java     |  36 ++--
 .../connector/hdfs/HdfsFromInitializer.java     |  56 +++---
 .../apache/sqoop/connector/hdfs/HdfsLoader.java |  69 ++++----
 .../sqoop/connector/hdfs/HdfsPartitioner.java   | 170 ++++++++++---------
 .../sqoop/connector/hdfs/HdfsToDestroyer.java   |  44 +++--
 .../sqoop/connector/hdfs/HdfsToInitializer.java |  39 +++--
 .../sqoop/connector/hdfs/TestExtractor.java     |   7 +-
 .../sqoop/connector/hdfs/TestFromDestroyer.java |   4 +-
 .../connector/hdfs/TestFromInitializer.java     |   2 +-
 .../sqoop/connector/hdfs/TestHdfsBase.java      |  11 ++
 .../apache/sqoop/connector/hdfs/TestLoader.java |  10 +-
 .../sqoop/connector/hdfs/TestPartitioner.java   |   2 +-
 .../sqoop/connector/hdfs/TestToDestroyer.java   |   4 +-
 .../sqoop/connector/hdfs/TestToInitializer.java |   8 +-
 .../sqoop/connector/kafka/TestKafkaLoader.java  |   2 +-
 .../sqoop/connector/kite/TestKiteExtractor.java |   2 +-
 .../sqoop/connector/kite/TestKiteLoader.java    |   2 +-
 .../connector/kite/TestKiteToDestroyer.java     |   7 +-
 .../org/apache/sqoop/driver/JobManager.java     |  15 +-
 .../mapreduce/MapreduceExecutionEngine.java     |   1 +
 .../org/apache/sqoop/job/MRJobConstants.java    |   4 +
 .../sqoop/job/mr/SqoopDestroyerExecutor.java    |   4 +-
 .../apache/sqoop/job/mr/SqoopInputFormat.java   |   2 +-
 .../org/apache/sqoop/job/mr/SqoopMapper.java    |   3 +-
 .../sqoop/job/mr/SqoopNullOutputFormat.java     |   5 +-
 .../job/mr/SqoopOutputFormatLoadExecutor.java   |   2 +-
 .../mr/TestSqoopOutputFormatLoadExecutor.java   |   2 +-
 .../sqoop/filter/SqoopAuthenticationFilter.java |   2 +-
 .../test/minicluster/SqoopMiniCluster.java      |  13 +-
 .../connector/hdfs/OutputDirectoryTest.java     |   3 +-
 41 files changed, 423 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
index f4f6d1d..38d94db 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/DestroyerContext.java
@@ -35,10 +35,13 @@ public class DestroyerContext extends TransferableContext {
 
   private Schema schema;
 
-  public DestroyerContext(ImmutableContext context, boolean success, Schema schema) {
+  private String user;
+
+  public DestroyerContext(ImmutableContext context, boolean success, Schema schema, String user) {
     super(context);
     this.success = success;
     this.schema = schema;
+    this.user = user;
   }
 
   /**
@@ -58,4 +61,13 @@ public class DestroyerContext extends TransferableContext {
   public Schema getSchema() {
     return schema;
   }
+
+  /**
+   * Return user associated with this step.
+   *
+   * @return
+   */
+  public String getUser() {
+    return user;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
index 43fcaa2..748bdfb 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/ExtractorContext.java
@@ -36,10 +36,13 @@ public class ExtractorContext extends TransferableContext {
 
   private final Schema schema;
 
-  public ExtractorContext(ImmutableContext context, DataWriter writer, Schema schema) {
+  private final String user;
+
+  public ExtractorContext(ImmutableContext context, DataWriter writer, Schema schema, String user) {
     super(context);
     this.writer = writer;
     this.schema = schema;
+    this.user = user;
   }
 
   /**
@@ -58,5 +61,13 @@ public class ExtractorContext extends TransferableContext {
   public Schema getSchema() {
     return schema;
   }
+  /**
+   * Return the user
+   *
+   * @return
+   */
+  public String getUser() {
+    return user;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java
index 469132b..7ad0d70 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/InitializerContext.java
@@ -31,8 +31,11 @@ import org.apache.sqoop.common.MutableContext;
 @InterfaceStability.Unstable
 public class InitializerContext extends TransferableContext {
 
-  public InitializerContext(MutableContext context) {
+  private String user;
+
+  public InitializerContext(MutableContext context, String user) {
     super(context);
+    this.user = user;
   }
 
   /**
@@ -47,4 +50,13 @@ public class InitializerContext extends TransferableContext {
   public MutableContext getContext() {
     return (MutableContext)super.getContext();
   }
+
+  /**
+   * Return the user
+   *
+   * @return
+   */
+  public String getUser() {
+    return user;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
index f9ea9ad..f0f2e38 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/LoaderContext.java
@@ -36,10 +36,13 @@ public class LoaderContext extends TransferableContext {
 
   private final Schema schema;
 
-  public LoaderContext(ImmutableContext context, DataReader reader, Schema schema) {
+  private final String user;
+
+  public LoaderContext(ImmutableContext context, DataReader reader, Schema schema, String user) {
     super(context);
     this.reader = reader;
     this.schema = schema;
+    this.user = user;
   }
 
   /**
@@ -59,4 +62,14 @@ public class LoaderContext extends TransferableContext {
   public Schema getSchema() {
     return schema;
   }
+
+  /**
+   * Return the String representing the user.
+   *
+   * @return
+   */
+  public String getUser() {
+    return user;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
index bb52bb2..b39497b 100644
--- a/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
+++ b/common/src/main/java/org/apache/sqoop/job/etl/PartitionerContext.java
@@ -37,10 +37,13 @@ public class PartitionerContext extends TransferableContext {
 
   private boolean skipMaxPartitionCheck = false;
 
-  public PartitionerContext(ImmutableContext context, long maxPartitions, Schema schema) {
+  private String user;
+
+  public PartitionerContext(ImmutableContext context, long maxPartitions, Schema schema, String user) {
     super(context);
     this.maxPartitions = maxPartitions;
     this.schema = schema;
+    this.user = user;
   }
 
   /**
@@ -89,4 +92,13 @@ public class PartitionerContext extends TransferableContext {
   public Schema getSchema() {
     return schema;
   }
+
+  /**
+   * Return user that submitted job
+   *
+   * @return
+   */
+  public String getUser() {
+    return user;
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java
index 33c808a..e1255ff 100644
--- a/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java
+++ b/connector/connector-ftp/src/test/java/org/apache/sqoop/connector/ftp/TestFtpLoader.java
@@ -75,7 +75,7 @@ public class TestFtpLoader {
     };
 
     try {
-      LoaderContext context = new LoaderContext(null, reader, null);
+      LoaderContext context = new LoaderContext(null, reader, null, "test_user");
       LinkConfiguration linkConfig = new LinkConfiguration();
       linkConfig.linkConfig.username = username;
       linkConfig.linkConfig.password = password;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
index 264cadf..3b52128 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestExtractor.java
@@ -104,7 +104,7 @@ public class TestExtractor {
     // result set
     schema.addColumn(new FixedPoint("c1",2L, true)).addColumn(new Decimal("c2", 5, 2)).addColumn(new Text("c3")).addColumn(new Date("c4"));
 
-    ExtractorContext extractorContext = new ExtractorContext(context, writer, schema);
+    ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
 
     partition = new GenericJdbcPartition();
     partition.setConditions("-50.0 <= DCOL AND DCOL < -16.6666666666666665");
@@ -144,7 +144,7 @@ public class TestExtractor {
     // result set
     schema.addColumn(new FixedPoint("c1", 2L, true)).addColumn(new Text("c2")).addColumn(new Date("c3"));
 
-    ExtractorContext extractorContext = new ExtractorContext(context, writer, schema);
+    ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
 
     partition = new GenericJdbcPartition();
     partition.setConditions("-50 <= ICOL AND ICOL < -16");
@@ -181,7 +181,7 @@ public class TestExtractor {
     Extractor extractor = new GenericJdbcExtractor();
     DummyWriter writer = new DummyWriter();
     Schema schema = new Schema("TestIncorrectColumns");
-    ExtractorContext extractorContext = new ExtractorContext(context, writer, schema);
+    ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
 
     partition.setConditions("-50 <= ICOL AND ICOL < -16");
     extractor.extract(extractorContext, linkConfig, jobConfig, partition);
@@ -217,7 +217,7 @@ public class TestExtractor {
     Schema schema = new Schema("TestExtractor");
     schema.addColumn(new FixedPoint("c1",2L, true)).addColumn(new Decimal("c2", 5, 2)).addColumn(new Text("c3")).addColumn(new Date("c4"));
 
-    ExtractorContext extractorContext = new ExtractorContext(context, writer, schema);
+    ExtractorContext extractorContext = new ExtractorContext(context, writer, schema, "test_user");
 
     GenericJdbcPartition partition = new GenericJdbcPartition();
     partition.setConditions("-50 <= ICOL AND ICOL < -16");

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
index ab31932..1c8379d 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
@@ -44,6 +44,7 @@ public class TestFromInitializer {
   private final String tableSql;
   private final String schemalessTableSql;
   private final String tableColumns;
+  private final String testUser;
 
   private GenericJdbcExecutor executor;
 
@@ -57,6 +58,7 @@ public class TestFromInitializer {
     tableSql = "SELECT * FROM " + schemaName + "." + tableName + " WHERE ${CONDITIONS}";
     schemalessTableSql = "SELECT * FROM " + schemalessTableName + " WHERE ${CONDITIONS}";
     tableColumns = "ICOL,VCOL";
+    testUser = "test_user";
   }
 
   @BeforeMethod(alwaysRun = true)
@@ -123,7 +125,7 @@ public class TestFromInitializer {
     jobConfig.fromJobConfig.tableName = schemalessTableName;
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -151,7 +153,7 @@ public class TestFromInitializer {
     jobConfig.incrementalRead.lastValue = "-51";
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -181,7 +183,7 @@ public class TestFromInitializer {
     jobConfig.incrementalRead.lastValue = "0";
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -210,7 +212,7 @@ public class TestFromInitializer {
     jobConfig.fromJobConfig.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -237,7 +239,7 @@ public class TestFromInitializer {
     jobConfig.fromJobConfig.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -266,7 +268,7 @@ public class TestFromInitializer {
     jobConfig.incrementalRead.lastValue = "-51";
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -296,7 +298,7 @@ public class TestFromInitializer {
     jobConfig.incrementalRead.lastValue = "0";
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -326,7 +328,7 @@ public class TestFromInitializer {
     jobConfig.fromJobConfig.tableName = tableName;
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -356,7 +358,7 @@ public class TestFromInitializer {
     jobConfig.fromJobConfig.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -384,7 +386,7 @@ public class TestFromInitializer {
     jobConfig.fromJobConfig.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -414,7 +416,7 @@ public class TestFromInitializer {
     jobConfig.fromJobConfig.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();
@@ -436,7 +438,7 @@ public class TestFromInitializer {
     jobConfig.fromJobConfig.partitionColumn = "DCOL";
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcFromInitializer();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
index 83411fb..6f7612c 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
@@ -107,7 +107,7 @@ public class TestLoader {
     schema.addColumn(new FixedPoint("c1", 2L, true)).addColumn(new Decimal("c2", 5, 2))
         .addColumn(new Text("c3")).addColumn(new Date("c4"))
         .addColumn(new DateTime("c5", false, false)).addColumn(new Time("c6", false)).addColumn(new DateTime("c7", false, false));
-    LoaderContext loaderContext = new LoaderContext(context, reader, schema);
+    LoaderContext loaderContext = new LoaderContext(context, reader, schema, "test_user");
     loader.load(loaderContext, linkConfig, jobConfig);
 
     int index = START;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
index bec6478..3a767ab 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestPartitioner.java
@@ -62,7 +62,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
@@ -94,7 +94,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
@@ -124,7 +124,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 13, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 13, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
@@ -161,7 +161,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
@@ -193,7 +193,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
@@ -215,7 +215,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
@@ -239,7 +239,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[]{
@@ -261,7 +261,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[]{
@@ -286,7 +286,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
 
@@ -315,7 +315,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[]{
@@ -341,7 +341,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
     verifyResult(partitions, new String[]{
         "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 12:14:17.634'",
@@ -366,7 +366,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 3, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
     verifyResult(partitions, new String[]{
       "BCOL = TRUE",
@@ -390,7 +390,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 25, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 25, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
     verifyResult(partitions, new String[] {
@@ -437,7 +437,7 @@ public class TestPartitioner {
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user");
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
     assertEquals(partitions.size(), 5);
     // First partition needs to contain entire upper bound
@@ -462,7 +462,7 @@ public class TestPartitioner {
     FromJobConfiguration jobConfig = new FromJobConfiguration();
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user");
 
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 
@@ -493,7 +493,7 @@ public class TestPartitioner {
     jobConfig.fromJobConfig.allowNullValueInPartitionColumn = true;
 
     Partitioner partitioner = new GenericJdbcPartitioner();
-    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null);
+    PartitionerContext partitionerContext = new PartitionerContext(context, 5, null, "test_user");
 
     List<Partition> partitions = partitioner.getPartitions(partitionerContext, linkConfig, jobConfig);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
index df405c8..40278b6 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestToInitializer.java
@@ -41,6 +41,7 @@ public class TestToInitializer {
   private final String schemalessTableName;
   private final String stageTableName;
   private final String tableColumns;
+  private final String testUser;
 
   private GenericJdbcExecutor executor;
 
@@ -50,6 +51,7 @@ public class TestToInitializer {
     schemalessTableName = getClass().getSimpleName().toUpperCase() + "TABLE";
     stageTableName = getClass().getSimpleName().toUpperCase() + "_STAGE_TABLE";
     tableColumns = "ICOL,VCOL";
+    testUser = "test_user";
   }
 
   @BeforeMethod(alwaysRun = true)
@@ -86,7 +88,7 @@ public class TestToInitializer {
     jobConfig.toJobConfig.tableName = schemalessTableName;
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
@@ -109,7 +111,7 @@ public class TestToInitializer {
     jobConfig.toJobConfig.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
@@ -132,7 +134,7 @@ public class TestToInitializer {
     jobConfig.toJobConfig.tableName = tableName;
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
@@ -156,7 +158,7 @@ public class TestToInitializer {
     jobConfig.toJobConfig.columns = tableColumns;
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
@@ -191,7 +193,7 @@ public class TestToInitializer {
     jobConfig.toJobConfig.stageTableName = stageTableName;
 
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
@@ -219,7 +221,7 @@ public class TestToInitializer {
     executor.executeUpdate("INSERT INTO " + fullStageTableName +
       " VALUES(1, 1.1, 'one')");
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
@@ -278,7 +280,7 @@ public class TestToInitializer {
     executor.executeUpdate("INSERT INTO " + fullStageTableName +
       " VALUES(1, 1.1, 'one')");
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();
@@ -301,7 +303,7 @@ public class TestToInitializer {
     jobConfig.toJobConfig.stageTableName = stageTableName;
     createTable(fullStageTableName);
     MutableContext context = new MutableMapContext();
-    InitializerContext initializerContext = new InitializerContext(context);
+    InitializerContext initializerContext = new InitializerContext(context, testUser);
 
     @SuppressWarnings("rawtypes")
     Initializer initializer = new GenericJdbcToInitializer();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 23bbcc0..583acdd 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -19,6 +19,7 @@ package org.apache.sqoop.connector.hdfs;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -29,6 +30,7 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.LineReader;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
@@ -56,25 +58,29 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
   private long rowsRead = 0;
 
   @Override
-  public void extract(ExtractorContext context, LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration, HdfsPartition partition) {
-    HdfsUtils.contextToConfiguration(context.getContext(), conf);
-    dataWriter = context.getDataWriter();
-    schema = context.getSchema();
-
+  public void extract(final ExtractorContext context, final LinkConfiguration linkConfiguration, final FromJobConfiguration jobConfiguration, final HdfsPartition partition) {
     try {
-      HdfsPartition p = partition;
-      LOG.info("Working on partition: " + p);
-      int numFiles = p.getNumberOfFiles();
-      for (int i = 0; i < numFiles; i++) {
-        extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i));
-      }
-    } catch (IOException e) {
+      UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          HdfsUtils.contextToConfiguration(context.getContext(), conf);
+          dataWriter = context.getDataWriter();
+          schema = context.getSchema();
+          HdfsPartition p = partition;
+          LOG.info("Working on partition: " + p);
+          int numFiles = p.getNumberOfFiles();
+          for (int i = 0; i < numFiles; i++) {
+            extractFile(linkConfiguration, jobConfiguration, p.getFile(i), p.getOffset(i), p.getLength(i));
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
       throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0001, e);
     }
   }
 
   private void extractFile(LinkConfiguration linkConfiguration,
-                           FromJobConfiguration fromJobCOnfiguration,
+                           FromJobConfiguration fromJobConfiguration,
                            Path file, long start, long length)
       throws IOException {
     long end = start + length;
@@ -83,9 +89,9 @@ public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfigura
     LOG.info("\t to offset " + end);
     LOG.info("\t of length " + length);
     if(isSequenceFile(file)) {
-      extractSequenceFile(linkConfiguration, fromJobCOnfiguration, file, start, length);
+      extractSequenceFile(linkConfiguration, fromJobConfiguration, file, start, length);
     } else {
-      extractTextFile(linkConfiguration, fromJobCOnfiguration, file, start, length);
+      extractTextFile(linkConfiguration, fromJobConfiguration, file, start, length);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
index e98e02b..be837ca 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
@@ -32,6 +33,7 @@ import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 
 
 public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobConfiguration> {
@@ -48,43 +50,49 @@ public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobC
    * @param jobConfig FROM job configuration object
    */
   @Override
-  public void initialize(InitializerContext context, LinkConfiguration linkConfig, FromJobConfiguration jobConfig) {
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+  public void initialize(final InitializerContext context, final LinkConfiguration linkConfig, final FromJobConfiguration jobConfig) {
     assert jobConfig.incremental != null;
 
-    Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
+    final Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
     HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration);
     HdfsUtils.configurationToContext(configuration, context.getContext());
 
-    boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;
+    final boolean incremental = jobConfig.incremental.incrementalType != null && jobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;
 
     // In case of incremental import, we need to persist the highest last modified
     try {
-      FileSystem fs = FileSystem.get(configuration);
-      Path path = new Path(jobConfig.fromJobConfig.inputDirectory);
-      LOG.info("Input directory: " + path.toString());
+      UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          FileSystem fs = FileSystem.get(configuration);
+          Path path = new Path(jobConfig.fromJobConfig.inputDirectory);
+          LOG.info("Input directory: " + path.toString());
 
-      if(!fs.exists(path)) {
-        throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory doesn't exists");
-      }
+          if(!fs.exists(path)) {
+            throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory doesn't exists");
+          }
+
+          if(fs.isFile(path)) {
+            throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory is a file");
+          }
 
-      if(fs.isFile(path)) {
-        throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Input directory is a file");
-      }
+          if(incremental) {
+            LOG.info("Detected incremental import");
+            long maxModifiedTime = -1;
+            FileStatus[] fileStatuses = fs.listStatus(path);
+            for(FileStatus status : fileStatuses) {
+              if(maxModifiedTime < status.getModificationTime()) {
+                maxModifiedTime = status.getModificationTime();
+              }
+            }
 
-      if(incremental) {
-        LOG.info("Detected incremental import");
-        long maxModifiedTime = -1;
-        FileStatus[] fileStatuses = fs.listStatus(path);
-        for(FileStatus status : fileStatuses) {
-          if(maxModifiedTime < status.getModificationTime()) {
-            maxModifiedTime = status.getModificationTime();
+            LOG.info("Maximal age of file is: " + maxModifiedTime);
+            context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime);
           }
+          return null;
         }
-
-        LOG.info("Maximal age of file is: " + maxModifiedTime);
-        context.getContext().setLong(HdfsConstants.MAX_IMPORT_DATE, maxModifiedTime);
-      }
-    } catch (IOException e) {
+      });
+    } catch (Exception e) {
       throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected exception", e);
     }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 798e552..04acd18 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -18,12 +18,14 @@
 package org.apache.sqoop.connector.hdfs;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.UUID;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.common.SqoopIDFUtils;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
@@ -52,40 +54,43 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
    * @throws Exception
    */
   @Override
-  public void load(LoaderContext context, LinkConfiguration linkConfiguration,
-                   ToJobConfiguration toJobConfig) throws Exception {
-    Configuration conf = new Configuration();
-    HdfsUtils.contextToConfiguration(context.getContext(), conf);
-
-    DataReader reader = context.getDataReader();
-    String directoryName = context.getString(HdfsConstants.WORK_DIRECTORY);
-    String codecname = getCompressionCodecName(toJobConfig);
-
-    CompressionCodec codec = null;
-    if (codecname != null) {
-      Class<?> clz = ClassUtils.loadClass(codecname);
-      if (clz == null) {
-        throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0003, codecname);
-      }
+  public void load(final LoaderContext context, final LinkConfiguration linkConfiguration,
+                   final ToJobConfiguration toJobConfig) throws Exception {
+    UserGroupInformation.createProxyUser(context.getUser(),
+      UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+      public Void run() throws Exception {
+        Configuration conf = new Configuration();
+        HdfsUtils.contextToConfiguration(context.getContext(), conf);
+
+        DataReader reader = context.getDataReader();
+        String directoryName = context.getString(HdfsConstants.WORK_DIRECTORY);
+        String codecname = getCompressionCodecName(toJobConfig);
+
+        CompressionCodec codec = null;
+        if (codecname != null) {
+          Class<?> clz = ClassUtils.loadClass(codecname);
+          if (clz == null) {
+            throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0003, codecname);
+          }
 
-      try {
-        codec = (CompressionCodec) clz.newInstance();
-        if (codec instanceof Configurable) {
-          ((Configurable) codec).setConf(conf);
+          try {
+            codec = (CompressionCodec) clz.newInstance();
+            if (codec instanceof Configurable) {
+              ((Configurable) codec).setConf(conf);
+            }
+          } catch (RuntimeException|InstantiationException|IllegalAccessException e) {
+            throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0004, codecname, e);
+          }
         }
-      } catch (RuntimeException|InstantiationException|IllegalAccessException e) {
-        throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0004, codecname, e);
-      }
-    }
 
-    String filename = directoryName + "/" + UUID.randomUUID() + getExtension(toJobConfig,codec);
+        String filename = directoryName + "/" + UUID.randomUUID() + getExtension(toJobConfig,codec);
 
-    try {
-      Path filepath = new Path(filename);
+        try {
+          Path filepath = new Path(filename);
 
-      GenericHdfsWriter filewriter = getWriter(toJobConfig);
+          GenericHdfsWriter filewriter = getWriter(toJobConfig);
 
-      filewriter.initialize(filepath, conf, codec);
+          filewriter.initialize(filepath, conf, codec);
 
       if (!HdfsUtils.hasCustomFormat(linkConfiguration, toJobConfig) || (context.getSchema() instanceof ByteArraySchema)) {
         String record;
@@ -110,10 +115,12 @@ public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
       }
       filewriter.destroy();
 
-    } catch (IOException e) {
-      throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e);
+        } catch (IOException e) {
+          throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0005, e);
+        }
+      return null;
     }
-
+  });
   }
 
   private GenericHdfsWriter getWriter(ToJobConfiguration toJobConf) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
index ff16ad7..998b903 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -19,6 +19,7 @@
 package org.apache.sqoop.connector.hdfs;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -38,6 +39,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
@@ -71,104 +73,108 @@ public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfi
       new HashMap<String, Set<String>>();
 
   @Override
-  public List<Partition> getPartitions(PartitionerContext context,
-                                       LinkConfiguration linkConfiguration,
-                                       FromJobConfiguration fromJobConfig) {
+  public List<Partition> getPartitions(final PartitionerContext context,
+                                       final LinkConfiguration linkConfiguration,
+                                       final FromJobConfiguration fromJobConfig) {
     assert fromJobConfig.incremental != null;
 
-    Configuration conf = new Configuration();
+    final Configuration conf = new Configuration();
     HdfsUtils.contextToConfiguration(context.getContext(), conf);
 
+    final List<Partition> partitions = new ArrayList<>();
     try {
-      long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory);
-      maxSplitSize = numInputBytes / context.getMaxPartitions();
-
-      if(numInputBytes % context.getMaxPartitions() != 0 ) {
-        maxSplitSize += 1;
-       }
-
-      long minSizeNode = 0;
-      long minSizeRack = 0;
-      long maxSize = 0;
+      UserGroupInformation.createProxyUser(context.getUser(),
+        UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory);
+          maxSplitSize = numInputBytes / context.getMaxPartitions();
+
+          if (numInputBytes % context.getMaxPartitions() != 0) {
+            maxSplitSize += 1;
+          }
 
-      // the values specified by setxxxSplitSize() takes precedence over the
-      // values that might have been specified in the config
-      if (minSplitSizeNode != 0) {
-        minSizeNode = minSplitSizeNode;
-      } else {
-        minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
-      }
-      if (minSplitSizeRack != 0) {
-        minSizeRack = minSplitSizeRack;
-      } else {
-        minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
-      }
-      if (maxSplitSize != 0) {
-        maxSize = maxSplitSize;
-      } else {
-        maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
-      }
-      if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
-        throw new IOException("Minimum split size pernode " + minSizeNode +
-                              " cannot be larger than maximum split size " +
-                              maxSize);
-      }
-      if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
-        throw new IOException("Minimum split size per rack" + minSizeRack +
-                              " cannot be larger than maximum split size " +
-                              maxSize);
-      }
-      if (minSizeRack != 0 && minSizeNode > minSizeRack) {
-        throw new IOException("Minimum split size per node" + minSizeNode +
-                              " cannot be smaller than minimum split " +
-                              "size per rack " + minSizeRack);
-      }
+          long minSizeNode = 0;
+          long minSizeRack = 0;
+          long maxSize = 0;
 
-      // Incremental import related options
-      boolean incremental = fromJobConfig.incremental.incrementalType != null && fromJobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;
-      long lastImportedDate = fromJobConfig.incremental.lastImportedDate != null ? fromJobConfig.incremental.lastImportedDate.getMillis() : -1;
-      long maxImportDate = context.getLong(HdfsConstants.MAX_IMPORT_DATE, -1);
-
-      // all the files in input set
-      String indir = fromJobConfig.fromJobConfig.inputDirectory;
-      FileSystem fs = FileSystem.get(conf);
-
-      List<Path> paths = new LinkedList<Path>();
-      for(FileStatus status : fs.listStatus(new Path(indir))) {
-        if(!status.isDir()) {
-          if(incremental) {
-            long modifiedDate = status.getModificationTime();
-            if(lastImportedDate < modifiedDate && modifiedDate <= maxImportDate) {
-              LOG.info("Will process input file: " + status.getPath() + " with modification date " + modifiedDate);
-              paths.add(status.getPath());
-            } else {
-              LOG.info("Skipping input file: " + status.getPath() + " with modification date " + modifiedDate);
-            }
+          // the values specified by setxxxSplitSize() takes precedence over the
+          // values that might have been specified in the config
+          if (minSplitSizeNode != 0) {
+            minSizeNode = minSplitSizeNode;
           } else {
-            // Without incremental mode, we're processing all files
-            LOG.info("Will process input file: " + status.getPath());
-            paths.add(status.getPath());
+            minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
+          }
+          if (minSplitSizeRack != 0) {
+            minSizeRack = minSplitSizeRack;
+          } else {
+            minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
+          }
+          if (maxSplitSize != 0) {
+            maxSize = maxSplitSize;
+          } else {
+            maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
+          }
+          if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
+            throw new IOException("Minimum split size pernode " + minSizeNode +
+              " cannot be larger than maximum split size " + maxSize);
+          }
+          if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
+            throw new IOException("Minimum split size per rack" + minSizeRack +
+              " cannot be larger than maximum split size " + maxSize);
+          }
+          if (minSizeRack != 0 && minSizeNode > minSizeRack) {
+            throw new IOException("Minimum split size per node" + minSizeNode +
+              " cannot be smaller than minimum split " + "size per rack " + minSizeRack);
           }
-        }
-      }
 
-      List<Partition> partitions = new ArrayList<Partition>();
-      if (paths.size() == 0) {
-        return partitions;
-      }
+          // Incremental import related options
+          boolean incremental = fromJobConfig.incremental.incrementalType != null
+            && fromJobConfig.incremental.incrementalType == IncrementalType.NEW_FILES;
+          long lastImportedDate = fromJobConfig.incremental.lastImportedDate != null
+            ? fromJobConfig.incremental.lastImportedDate.getMillis() : -1;
+          long maxImportDate = context.getLong(HdfsConstants.MAX_IMPORT_DATE, -1);
+
+          // all the files in input set
+          String indir = fromJobConfig.fromJobConfig.inputDirectory;
+          FileSystem fs = FileSystem.get(conf);
+
+          List<Path> paths = new LinkedList<Path>();
+          for (FileStatus status : fs.listStatus(new Path(indir))) {
+            if (!status.isDir()) {
+              if (incremental) {
+                long modifiedDate = status.getModificationTime();
+                if (lastImportedDate < modifiedDate && modifiedDate <= maxImportDate) {
+                  LOG.info("Will process input file: " + status.getPath() + " with modification date " + modifiedDate);
+                  paths.add(status.getPath());
+                } else {
+                  LOG.info("Skipping input file: " + status.getPath() + " with modification date " + modifiedDate);
+                }
+              } else {
+                // Without incremental mode, we're processing all files
+                LOG.info("Will process input file: " + status.getPath());
+                paths.add(status.getPath());
+              }
+            }
+          }
 
-      // create splits for all files that are not in any pool.
-      getMoreSplits(conf, paths,
-                    maxSize, minSizeNode, minSizeRack, partitions);
+          if (paths.size() == 0) {
+            return null;
+          }
 
-      // free up rackToNodes map
-      rackToNodes.clear();
+          // create splits for all files that are not in any pool.
+          getMoreSplits(conf, paths, maxSize, minSizeNode, minSizeRack, partitions);
 
-      return partitions;
+          // free up rackToNodes map
+          rackToNodes.clear();
 
-    } catch (IOException e) {
+          return null;
+        }
+      });
+    } catch (Exception e) {
       throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0000, e);
     }
+
+    return partitions;
   }
 
   //TODO: Perhaps get the FS from link configuration so we can support remote HDFS

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
index 11b2ae3..2bad23a 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
@@ -30,6 +31,7 @@ import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 
 public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
 
@@ -39,28 +41,38 @@ public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfigura
    * {@inheritDoc}
    */
   @Override
-  public void destroy(DestroyerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
-    Configuration configuration = new Configuration();
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+  public void destroy(final DestroyerContext context, final LinkConfiguration linkConfig, final ToJobConfiguration jobConfig) {
+    final Configuration configuration = new Configuration();
     HdfsUtils.contextToConfiguration(context.getContext(), configuration);
 
-    String workingDirectory = context.getString(HdfsConstants.WORK_DIRECTORY);
-    Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory);
+    final String workingDirectory = context.getString(HdfsConstants.WORK_DIRECTORY);
+    final Path targetDirectory = new Path(jobConfig.toJobConfig.outputDirectory);
 
     try {
-      FileSystem fs = FileSystem.get(configuration);
+      UserGroupInformation.createProxyUser(context.getUser(),
+        UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          FileSystem fs = FileSystem.get(configuration);
 
-      // If we succeeded, we need to move all files from working directory
-      if(context.isSuccess()) {
-        FileStatus[] fileStatuses = fs.listStatus(new Path(workingDirectory));
-        for (FileStatus status : fileStatuses) {
-          LOG.info("Committing file: " + status.getPath().toString() + " of size " + status.getLen());
-          fs.rename(status.getPath(), new Path(targetDirectory, status.getPath().getName()));
-        }
-      }
+          // If we succeeded, we need to move all files from working directory
+          if (context.isSuccess()) {
+            FileStatus[] fileStatuses = fs.listStatus(new Path
+              (workingDirectory));
+            for (FileStatus status : fileStatuses) {
+              LOG.info("Committing file: " + status.getPath().toString() + " " +
+                "of size " + status.getLen());
+              fs.rename(status.getPath(), new Path(targetDirectory, status
+                .getPath().getName()));
+            }
+          }
 
-      // Clean up working directory
-      fs.delete(new Path(workingDirectory), true);
-    } catch (IOException e) {
+          // Clean up working directory
+          fs.delete(new Path(workingDirectory), true);
+          return null;
+        }
+      });
+    } catch (Exception e) {
       throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0008, e);
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 29cf3b9..5856371 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
@@ -31,6 +32,7 @@ import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.UUID;
 
 public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> {
@@ -41,36 +43,43 @@ public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfi
    * {@inheritDoc}
    */
   @Override
-  public void initialize(InitializerContext context, LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings({"SIC_INNER_SHOULD_BE_STATIC_ANON"})
+  public void initialize(final InitializerContext context, final LinkConfiguration linkConfig, final ToJobConfiguration jobConfig) {
     assert jobConfig != null;
     assert linkConfig != null;
     assert jobConfig.toJobConfig != null;
     assert jobConfig.toJobConfig.outputDirectory != null;
 
-    Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
+    final Configuration configuration = HdfsUtils.createConfiguration(linkConfig);
     HdfsUtils.contextToConfiguration(new MapContext(linkConfig.linkConfig.configOverrides), configuration);
     HdfsUtils.configurationToContext(configuration, context.getContext());
 
-    boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode);
+    final boolean appendMode = Boolean.TRUE.equals(jobConfig.toJobConfig.appendMode);
 
     // Verification that given HDFS directory either don't exists or is empty
     try {
-      FileSystem fs = FileSystem.get(configuration);
-      Path path = new Path(jobConfig.toJobConfig.outputDirectory);
+      UserGroupInformation.createProxyUser(context.getUser(),
+        UserGroupInformation.getLoginUser()).doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          FileSystem fs = FileSystem.get(configuration);
+          Path path = new Path(jobConfig.toJobConfig.outputDirectory);
 
-      if(fs.exists(path)) {
-        if(fs.isFile(path)) {
-          throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory already exists and is a file");
-        }
+          if (fs.exists(path)) {
+            if (fs.isFile(path)) {
+              throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory already exists and is a file");
+            }
 
-        if(fs.isDirectory(path) && !appendMode) {
-          FileStatus[] fileStatuses = fs.listStatus(path);
-          if(fileStatuses.length != 0) {
-            throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty");
+            if (fs.isDirectory(path) && !appendMode) {
+              FileStatus[] fileStatuses = fs.listStatus(path);
+              if (fileStatuses.length != 0) {
+                throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Output directory is not empty");
+              }
+            }
           }
+          return null;
         }
-      }
-    } catch (IOException e) {
+      });
+    } catch (Exception e) {
       throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0007, "Unexpected exception", e);
     }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
index 6f9986d..7d2177f 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -51,6 +51,7 @@ public class TestExtractor extends TestHdfsBase {
   private static final String INPUT_ROOT = System.getProperty("maven.build.directory", "/tmp") + "/sqoop/warehouse/";
   private static final int NUMBER_OF_FILES = 5;
   private static final int NUMBER_OF_ROWS_PER_FILE = 1000;
+  private static final String TEST_USER = "test_user";
 
   private ToFormat outputFileType;
   private Class<? extends CompressionCodec> compressionClass;
@@ -132,6 +133,8 @@ public class TestExtractor extends TestHdfsBase {
         Assert.assertEquals("'" + index + "'", components[3]);
         Assert.assertEquals("\\\\N", components[4]);
 
+        assertTestUser(TEST_USER);
+
         visited[index - 1] = true;
       }
 
@@ -139,7 +142,7 @@ public class TestExtractor extends TestHdfsBase {
       public void writeRecord(Object obj) {
         throw new AssertionError("Should not be writing object.");
       }
-    }, schema);
+    }, schema, TEST_USER);
 
     LinkConfiguration emptyLinkConfig = new LinkConfiguration();
     FromJobConfiguration emptyJobConfig = new FromJobConfiguration();
@@ -193,7 +196,7 @@ public class TestExtractor extends TestHdfsBase {
       public void writeRecord(Object obj) {
         throw new AssertionError("Should not be writing object.");
       }
-    }, schema);
+    }, schema, TEST_USER);
 
     LinkConfiguration emptyLinkConfig = new LinkConfiguration();
     FromJobConfiguration fromJobConfiguration = new FromJobConfiguration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java
index 569c60b..f388040 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromDestroyer.java
@@ -34,19 +34,21 @@ public class TestFromDestroyer {
   LinkConfiguration linkConfig;
   FromJobConfiguration jobConfig;
   MutableContext context;
+  String user;
 
   public TestFromDestroyer() {
     linkConfig = new LinkConfiguration();
     jobConfig = new FromJobConfiguration();
     context = new MutableMapContext();
     destroyer = new HdfsFromDestroyer();
+    user = "test_user";
   }
 
   @Test
   public void testUpdateConfiguration() {
     DateTime dt = new DateTime();
     context.setLong(HdfsConstants.MAX_IMPORT_DATE, dt.getMillis());
-    destroyer.updateConfiguration(new DestroyerContext(context, true, null), linkConfig, jobConfig);
+    destroyer.updateConfiguration(new DestroyerContext(context, true, null, user), linkConfig, jobConfig);
     assertEquals(jobConfig.incremental.lastImportedDate, dt);
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java
index 52c174e..119dbdb 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestFromInitializer.java
@@ -48,7 +48,7 @@ public class TestFromInitializer {
     jobConfig = new FromJobConfiguration();
     context = new MutableMapContext();
     initializer = new HdfsFromInitializer();
-    initializerContext = new InitializerContext(context);
+    initializerContext = new InitializerContext(context, "test_user");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
index ac44595..6bacfa2 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsBase.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.testng.Assert;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
@@ -150,4 +152,13 @@ public class TestHdfsBase {
       throws IOException, InstantiationException, IllegalAccessException {
     createSequenceInput(indir, clz, numberOfFiles, numberOfRows, "%d,%f,%s");
   }
+
+  protected void assertTestUser(String testUser) {
+    // Ensure that we are impersonating correctly
+    try{
+      Assert.assertEquals(UserGroupInformation.getCurrentUser().getUserName(), testUser);
+    } catch (Exception e) {
+      Assert.fail();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index 688067b..11fcef2 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -62,6 +62,7 @@ public class TestLoader extends TestHdfsBase {
   private ToCompression compression;
   private final String outputDirectory;
   private Loader loader;
+  private String user = "test_user";
 
   @Factory(dataProvider="test-hdfs-loader")
   public TestLoader(ToFormat outputFormat,
@@ -110,11 +111,13 @@ public class TestLoader extends TestHdfsBase {
 
       @Override
       public Object[] readArrayRecord() {
+        assertTestUser(user);
         return null;
       }
 
       @Override
       public String readTextRecord() {
+        assertTestUser(user);
         if (index++ < NUMBER_OF_ROWS_PER_FILE) {
           return index + "," + (double)index + ",'" + index + "'";
         } else {
@@ -124,9 +127,10 @@ public class TestLoader extends TestHdfsBase {
 
       @Override
       public Object readContent() {
+        assertTestUser(user);
         return null;
       }
-    }, null);
+    }, null, user);
     LinkConfiguration linkConf = new LinkConfiguration();
     ToJobConfiguration jobConf = new ToJobConfiguration();
     jobConf.toJobConfig.compression = compression;
@@ -163,6 +167,8 @@ public class TestLoader extends TestHdfsBase {
 
       @Override
       public Object[] readArrayRecord() {
+        assertTestUser(user);
+
         if (index++ < NUMBER_OF_ROWS_PER_FILE) {
           return new Object[]{
               index,
@@ -184,7 +190,7 @@ public class TestLoader extends TestHdfsBase {
       public Object readContent() {
         throw new AssertionError("should not be at readContent");
       }
-    }, schema);
+    }, schema, "test_user");
     LinkConfiguration linkConf = new LinkConfiguration();
     ToJobConfiguration jobConf = new ToJobConfiguration();
     jobConf.toJobConfig.compression = compression;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
index 9a6bfff..7627e98 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
@@ -94,7 +94,7 @@ public class TestPartitioner extends TestHdfsBase {
 
   @Test
   public void testPartitioner() {
-    PartitionerContext context = new PartitionerContext(new MapContext(new HashMap<String, String>()), 5, null);
+    PartitionerContext context = new PartitionerContext(new MapContext(new HashMap<String, String>()), 5, null, "test_user");
     LinkConfiguration linkConf = new LinkConfiguration();
     FromJobConfiguration jobConf = new FromJobConfiguration();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java
index e1f416e..687a9b3 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToDestroyer.java
@@ -54,7 +54,7 @@ public class TestToDestroyer {
     context.setString(HdfsConstants.WORK_DIRECTORY, workDir.getAbsolutePath());
 
     Destroyer destroyer = new HdfsToDestroyer();
-    destroyer.destroy(new DestroyerContext(context, true, null), linkConfig, jobConfig);
+    destroyer.destroy(new DestroyerContext(context, true, null, "test_user"), linkConfig, jobConfig);
 
     File[] files = targetDir.listFiles();
 
@@ -99,7 +99,7 @@ public class TestToDestroyer {
     context.setString(HdfsConstants.WORK_DIRECTORY, workDir.getAbsolutePath());
 
     Destroyer destroyer = new HdfsToDestroyer();
-    destroyer.destroy(new DestroyerContext(context, false, null), linkConfig, jobConfig);
+    destroyer.destroy(new DestroyerContext(context, false, null, "test_user"), linkConfig, jobConfig);
 
     File[] files = targetDir.listFiles();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
index a98a46a..5441702 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestToInitializer.java
@@ -46,7 +46,7 @@ public class TestToInitializer extends TestHdfsBase {
     linkConfig.linkConfig.uri = "file:///";
     jobConfig.toJobConfig.outputDirectory = TARGET_DIR;
 
-    InitializerContext initializerContext = new InitializerContext(new MutableMapContext());
+    InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
 
     Initializer initializer = new HdfsToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
@@ -66,7 +66,7 @@ public class TestToInitializer extends TestHdfsBase {
     linkConfig.linkConfig.uri = "file:///";
     jobConfig.toJobConfig.outputDirectory = file.getAbsolutePath();
 
-    InitializerContext initializerContext = new InitializerContext(new MutableMapContext());
+    InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
 
     Initializer initializer = new HdfsToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
@@ -83,7 +83,7 @@ public class TestToInitializer extends TestHdfsBase {
     linkConfig.linkConfig.uri = "file:///";
     jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
 
-    InitializerContext initializerContext = new InitializerContext(new MutableMapContext());
+    InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
 
     Initializer initializer = new HdfsToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);
@@ -101,7 +101,7 @@ public class TestToInitializer extends TestHdfsBase {
     jobConfig.toJobConfig.outputDirectory = dir.getAbsolutePath();
     jobConfig.toJobConfig.appendMode = true;
 
-    InitializerContext initializerContext = new InitializerContext(new MutableMapContext());
+    InitializerContext initializerContext = new InitializerContext(new MutableMapContext(), "test_user");
 
     Initializer initializer = new HdfsToInitializer();
     initializer.initialize(initializerContext, linkConfig, jobConfig);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
index da2a708..0dd00a7 100644
--- a/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
+++ b/connector/connector-kafka/src/test/java/org/apache/sqoop/connector/kafka/TestKafkaLoader.java
@@ -77,7 +77,7 @@ public class TestKafkaLoader {
       public Object readContent() {
         return null;
       }
-    }, null);
+    }, null, "test_user");
     LinkConfiguration linkConf = new LinkConfiguration();
     ToJobConfiguration jobConf = new ToJobConfiguration();
     linkConf.linkConfig.brokerList = testUtil.getKafkaServerUrl();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
index 08d2cb3..c49be92 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteExtractor.java
@@ -73,7 +73,7 @@ public class TestKiteExtractor {
     // setup
     Schema schema = new Schema("testExtractor");
     schema.addColumn(new Text("TextCol"));
-    ExtractorContext context = new ExtractorContext(null, writerMock, schema);
+    ExtractorContext context = new ExtractorContext(null, writerMock, schema, "test_user");
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();
     KiteDatasetPartition partition = new KiteDatasetPartition();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
index 533b8c3..c5aa1bd 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
@@ -81,7 +81,7 @@ public class TestKiteLoader {
         return null;
       }
     };
-    LoaderContext context = new LoaderContext(null, reader, schema);
+    LoaderContext context = new LoaderContext(null, reader, schema, "test_user");
     LinkConfiguration linkConfig = new LinkConfiguration();
     ToJobConfiguration toJobConfig = new ToJobConfiguration();
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
index 3fcc339..00b8871 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
@@ -49,6 +49,8 @@ public class TestKiteToDestroyer extends PowerMockTestCase {
 
   private final String[] expectedUris = new String[]{"a", "b"};
 
+  private String user;
+
   @org.mockito.Mock
   private KiteDatasetExecutor executorMock;
 
@@ -70,12 +72,13 @@ public class TestKiteToDestroyer extends PowerMockTestCase {
     toJobConfig = new ToJobConfiguration();
     toJobConfig.toJobConfig.uri = "dataset:file:/foo/bar";
     toJobConfig.toJobConfig.fileFormat = FileFormat.AVRO;
+    user = "test_user";
   }
 
   @Test
   public void testDestroyForSuccessfulJob() {
     // setup
-    DestroyerContext context = new DestroyerContext(null, true, null);
+    DestroyerContext context = new DestroyerContext(null, true, null, user);
     when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
         .thenReturn(expectedUris);
 
@@ -91,7 +94,7 @@ public class TestKiteToDestroyer extends PowerMockTestCase {
   @Test
   public void testDestroyForFailedJob() {
     // setup
-    DestroyerContext context = new DestroyerContext(null, false, null);
+    DestroyerContext context = new DestroyerContext(null, false, null, user);
     when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
         .thenReturn(expectedUris);
     for (String uri : expectedUris) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/bc0de7c1/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 0d230f9..15ca796 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -531,7 +531,8 @@ public class JobManager implements Reconfigurable {
   }
 
   private InitializerContext getConnectorInitializerContext(JobRequest jobRequest, Direction direction) {
-    return new InitializerContext(jobRequest.getConnectorContext(direction));
+    return new InitializerContext(jobRequest.getConnectorContext(direction),
+      jobRequest.getJobSubmission().getCreationUser());
   }
 
   void prepareJob(JobRequest request) {
@@ -571,8 +572,8 @@ public class JobManager implements Reconfigurable {
       Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromConnector.getFrom().getDestroyer());
       Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toConnector.getTo().getDestroyer());
 
-      DestroyerContext fromDestroyerContext = new DestroyerContext(submission.getFromConnectorContext(), true, submission.getFromSchema());
-      DestroyerContext toDestroyerContext = new DestroyerContext(submission.getToConnectorContext(), false, submission.getToSchema());
+      DestroyerContext fromDestroyerContext = new DestroyerContext(submission.getFromConnectorContext(), true, submission.getFromSchema(), submission.getCreationUser());
+      DestroyerContext toDestroyerContext = new DestroyerContext(submission.getToConnectorContext(), false, submission.getToSchema(), submission.getCreationUser());
 
       fromDestroyer.updateConfiguration(fromDestroyerContext, fromLinkConfig, fromJob);
       toDestroyer.updateConfiguration(toDestroyerContext, toLinkConfig, toJob);
@@ -626,11 +627,11 @@ public class JobManager implements Reconfigurable {
     }
 
     DestroyerContext fromDestroyerContext = new DestroyerContext(
-      request.getConnectorContext(Direction.FROM), false, request.getJobSubmission()
-        .getFromSchema());
+      request.getConnectorContext(Direction.FROM), false, request.getJobSubmission().getFromSchema(),
+      request.getJobSubmission().getCreationUser());
     DestroyerContext toDestroyerContext = new DestroyerContext(
-        request.getConnectorContext(Direction.TO), false, request.getJobSubmission()
-        .getToSchema());
+      request.getConnectorContext(Direction.TO), false, request.getJobSubmission().getToSchema(),
+      request.getJobSubmission().getCreationUser());
 
     fromDestroyer.destroy(fromDestroyerContext, request.getConnectorLinkConfig(Direction.FROM),
         request.getJobConfig(Direction.FROM));


Mime
View raw message