sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject sqoop git commit: SQOOP-2757: Sqoop2: Add module connector-sdk-hadoop to hold hadoop specific SDK classes used by the connectors
Date Sun, 27 Dec 2015 07:21:15 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 b1561866e -> b69fb9b68


SQOOP-2757: Sqoop2: Add module connector-sdk-hadoop to hold hadoop specific SDK classes used
by the connectors

(Dian Fu via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b69fb9b6
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b69fb9b6
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b69fb9b6

Branch: refs/heads/sqoop2
Commit: b69fb9b6801e70cd4b144ff963c08a3679f1b261
Parents: b156186
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Sun Dec 27 08:20:10 2015 +0100
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Sun Dec 27 08:20:10 2015 +0100

----------------------------------------------------------------------
 connector/connector-hdfs/pom.xml                |   6 +
 .../sqoop/connector/hdfs/HdfsConstants.java     |   2 -
 .../sqoop/connector/hdfs/HdfsExtractor.java     |   3 +-
 .../connector/hdfs/HdfsFromInitializer.java     |   4 +-
 .../apache/sqoop/connector/hdfs/HdfsLoader.java |   3 +-
 .../sqoop/connector/hdfs/HdfsPartitioner.java   |   3 +-
 .../sqoop/connector/hdfs/HdfsToDestroyer.java   |   4 +-
 .../sqoop/connector/hdfs/HdfsToInitializer.java |   4 +-
 .../connector/hdfs/security/SecurityUtils.java  | 146 ------------------
 .../hdfs/security/TestSecurityUtils.java        |  49 -------
 connector/connector-sdk-hadoop/pom.xml          |  60 ++++++++
 .../hadoop/security/SecurityUtils.java          | 147 +++++++++++++++++++
 .../hadoop/security/TestSecurityUtils.java      |  50 +++++++
 connector/pom.xml                               |   1 +
 pom.xml                                         |   5 +
 server/pom.xml                                  |   5 +
 16 files changed, 280 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/pom.xml b/connector/connector-hdfs/pom.xml
index d695750..5996314 100644
--- a/connector/connector-hdfs/pom.xml
+++ b/connector/connector-hdfs/pom.xml
@@ -44,6 +44,12 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>connector-sdk-hadoop</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
index f06300a..39ee4a3 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConstants.java
@@ -35,6 +35,4 @@ public final class HdfsConstants extends Constants {
   public static final String WORK_DIRECTORY = PREFIX + "work_dir";
 
   public static final String MAX_IMPORT_DATE = PREFIX + "max_import_date";
-
-  public static final String DELEGATION_TOKENS = PREFIX + "delegation_tokens";
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 441fe30..a813c47 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -30,14 +30,13 @@ import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.LineReader;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.common.SqoopIDFUtils;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
 import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.etl.Extractor;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
index 3a0d626..d815d58 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
@@ -21,19 +21,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
 import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index a6551e6..774221a 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -25,16 +25,15 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.common.SqoopIDFUtils;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
 import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsSequenceWriter;
 import org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriter;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
 import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.job.etl.Loader;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
index d01e932..f35b8e9 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -39,13 +39,12 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.IncrementalType;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
 import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
index 858042c..0c62ab1 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
@@ -21,17 +21,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
 import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
 
-import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
 public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration>
{

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index 204c978..70e0fde 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -21,18 +21,16 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.MapContext;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
 import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
-import org.apache.sqoop.connector.hdfs.security.SecurityUtils;
 import org.apache.sqoop.error.code.HdfsConnectorError;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 
-import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.UUID;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java
b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java
deleted file mode 100644
index 0a42936..0000000
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/security/SecurityUtils.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs.security;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.ImmutableContext;
-import org.apache.sqoop.common.MutableContext;
-import org.apache.sqoop.connector.hdfs.HdfsConstants;
-import org.apache.sqoop.job.etl.TransferableContext;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Sqoop is designed in a way to abstract connectors from execution engine. Hence the security
portion
- * (like generating and distributing delegation tokens) won't happen automatically for us
under the hood
- * and we have to do everything manually.
- */
-public class SecurityUtils {
-
-  private static final Logger LOG = Logger.getLogger(SecurityUtils.class);
-
-  /**
-   * Creates proxy user for user who submitted the Sqoop job (e.g. who has issued the "start
job" commnad)
-   */
-  static public UserGroupInformation createProxyUser(TransferableContext context) throws
IOException {
-    return UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser());
-  }
-
-  /**
-   * Creates proxy user and load's it up with all delegation tokens that we have created
ourselves
-   */
-  static public UserGroupInformation createProxyUserAndLoadDelegationTokens(TransferableContext
context) throws IOException {
-    UserGroupInformation proxyUser = createProxyUser(context);
-    loadDelegationTokensToUGI(proxyUser, context.getContext());
-
-    return proxyUser;
-  }
-
-  /**
-   * Generate delegation tokens for current user (this code is suppose to run in doAs) and
store them
-   * serialized in given mutable context.
-   */
-  static public void generateDelegationTokens(MutableContext context, Path path, Configuration
configuration) throws IOException {
-    if(!UserGroupInformation.isSecurityEnabled()) {
-      LOG.info("Running on unsecured cluster, skipping delegation token generation.");
-      return;
-    }
-
-    // String representation of all tokens that we will create (most likely single one)
-    List<String> tokens = new LinkedList<>();
-
-    Credentials credentials = new Credentials();
-    TokenCache.obtainTokensForNamenodes(credentials, new Path[]{path}, configuration);
-    for (Token token : credentials.getAllTokens()) {
-      LOG.info("Generated token: " + token.toString());
-      tokens.add(serializeToken(token));
-    }
-
-    // The context classes are transferred via "Credentials" rather then with jobconf, so
we're not leaking the DT out here
-    if(tokens.size() > 0) {
-      context.setString(HdfsConstants.DELEGATION_TOKENS, StringUtils.join(tokens, " "));
-    }
-  }
-
-  /**
-   * Loads delegation tokens that we created and serialize into the mutable context
-   */
-  static public void loadDelegationTokensToUGI(UserGroupInformation ugi, ImmutableContext
context) throws IOException {
-    String tokenList = context.getString(HdfsConstants.DELEGATION_TOKENS);
-    if(tokenList == null) {
-      LOG.info("No delegation tokens found");
-      return;
-    }
-
-    for(String stringToken: tokenList.split(" ")) {
-      Token token = deserializeToken(stringToken);
-      LOG.info("Loaded delegation token: " + token.toString());
-      ugi.addToken(token);
-    }
-  }
-
-  /**
-   * Serialize given token into String.
-   *
-   * We'll convert token to byte[] using Writable methods fro I/O and then Base64
-   * encode the bytes to a human readable string.
-   */
-  static public String serializeToken(Token token) throws IOException {
-    // Serialize the Token to a byte array
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream dos = new DataOutputStream(baos);
-    token.write(dos);
-    baos.flush();
-
-    return Base64.encodeBase64String(baos.toByteArray());
-  }
-
-  /**
-   * Deserialize token from given String.
-   *
-   * See serializeToken for details how the token is expected to be serialized.
-   */
-  static public Token deserializeToken(String stringToken) throws IOException {
-    Token token = new Token();
-    byte[] tokenBytes = Base64.decodeBase64(stringToken);
-
-    ByteArrayInputStream bais = new ByteArrayInputStream(tokenBytes);
-    DataInputStream dis = new DataInputStream(bais);
-    token.readFields(dis);
-
-    return token;
-  }
-
-  private SecurityUtils() {
-    // Initialization is prohibited
-  }
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java
b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java
deleted file mode 100644
index 713c704..0000000
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/security/TestSecurityUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.hdfs.security;
-
-import org.apache.hadoop.io.Text;
-import org.testng.annotations.Test;
-import org.apache.hadoop.security.token.Token;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-public class TestSecurityUtils {
-
-  @Test
-  public void testTokenSerializationDeserialization() throws Exception {
-    byte[] identifier = "identifier".getBytes();
-    byte[] password = "password".getBytes();
-    Text kind = new Text("kind");
-    Text service = new Text("service");
-
-    Token token = new Token(identifier, password, kind, service);
-    String serializedForm = SecurityUtils.serializeToken(token);
-    assertNotNull(serializedForm);
-
-    Token deserializedToken = SecurityUtils.deserializeToken(serializedForm);
-    assertNotNull(deserializedToken);
-
-    assertEquals(identifier, deserializedToken.getIdentifier());
-    assertEquals(password, deserializedToken.getPassword());
-    assertEquals(kind.toString(), deserializedToken.getKind().toString());
-    assertEquals(service.toString(), deserializedToken.getService().toString());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-sdk-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/connector/connector-sdk-hadoop/pom.xml b/connector/connector-sdk-hadoop/pom.xml
new file mode 100644
index 0000000..2793886
--- /dev/null
+++ b/connector/connector-sdk-hadoop/pom.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+  http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.sqoop</groupId>
+    <artifactId>connector</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.sqoop</groupId>
+  <artifactId>connector-sdk-hadoop</artifactId>
+  <name>Sqoop Connector Hadoop Specific SDK</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>sqoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java
b/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java
new file mode 100644
index 0000000..44f5c03
--- /dev/null
+++ b/connector/connector-sdk-hadoop/src/main/java/org/apache/sqoop/connector/hadoop/security/SecurityUtils.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hadoop.security;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.ImmutableContext;
+import org.apache.sqoop.common.MutableContext;
+import org.apache.sqoop.job.etl.TransferableContext;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Sqoop is designed in a way to abstract connectors from execution engine. Hence the security
portion
+ * (like generating and distributing delegation tokens) won't happen automatically for us
under the hood
+ * and we have to do everything manually.
+ */
+public class SecurityUtils {
+
+  private static final Logger LOG = Logger.getLogger(SecurityUtils.class);
+
+  private static final String DELEGATION_TOKENS = "org.apache.sqoop.connector.delegation_tokens";
+
+  /**
+   * Creates proxy user for user who submitted the Sqoop job (e.g. who has issued the "start
job" commnad)
+   */
+  static public UserGroupInformation createProxyUser(TransferableContext context) throws
IOException {
+    return UserGroupInformation.createProxyUser(context.getUser(), UserGroupInformation.getLoginUser());
+  }
+
+  /**
+   * Creates proxy user and load's it up with all delegation tokens that we have created
ourselves
+   */
+  static public UserGroupInformation createProxyUserAndLoadDelegationTokens(TransferableContext
context) throws IOException {
+    UserGroupInformation proxyUser = createProxyUser(context);
+    loadDelegationTokensToUGI(proxyUser, context.getContext());
+
+    return proxyUser;
+  }
+
+  /**
+   * Generate delegation tokens for current user (this code is suppose to run in doAs) and
store them
+   * serialized in given mutable context.
+   */
+  static public void generateDelegationTokens(MutableContext context, Path path, Configuration
configuration) throws IOException {
+    if(!UserGroupInformation.isSecurityEnabled()) {
+      LOG.info("Running on unsecured cluster, skipping delegation token generation.");
+      return;
+    }
+
+    // String representation of all tokens that we will create (most likely single one)
+    List<String> tokens = new LinkedList<>();
+
+    Credentials credentials = new Credentials();
+    TokenCache.obtainTokensForNamenodes(credentials, new Path[]{path}, configuration);
+    for (Token token : credentials.getAllTokens()) {
+      LOG.info("Generated token: " + token.toString());
+      tokens.add(serializeToken(token));
+    }
+
+    // The context classes are transferred via "Credentials" rather then with jobconf, so
we're not leaking the DT out here
+    if(tokens.size() > 0) {
+      context.setString(DELEGATION_TOKENS, StringUtils.join(tokens, " "));
+    }
+  }
+
+  /**
+   * Loads delegation tokens that we created and serialize into the mutable context
+   */
+  static public void loadDelegationTokensToUGI(UserGroupInformation ugi, ImmutableContext
context) throws IOException {
+    String tokenList = context.getString(DELEGATION_TOKENS);
+    if(tokenList == null) {
+      LOG.info("No delegation tokens found");
+      return;
+    }
+
+    for(String stringToken: tokenList.split(" ")) {
+      Token token = deserializeToken(stringToken);
+      LOG.info("Loaded delegation token: " + token.toString());
+      ugi.addToken(token);
+    }
+  }
+
+  /**
+   * Serialize given token into String.
+   *
+   * We'll convert token to byte[] using Writable methods fro I/O and then Base64
+   * encode the bytes to a human readable string.
+   */
+  static public String serializeToken(Token token) throws IOException {
+    // Serialize the Token to a byte array
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    token.write(dos);
+    baos.flush();
+
+    return Base64.encodeBase64String(baos.toByteArray());
+  }
+
+  /**
+   * Deserialize token from given String.
+   *
+   * See serializeToken for details how the token is expected to be serialized.
+   */
+  static public Token deserializeToken(String stringToken) throws IOException {
+    Token token = new Token();
+    byte[] tokenBytes = Base64.decodeBase64(stringToken);
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(tokenBytes);
+    DataInputStream dis = new DataInputStream(bais);
+    token.readFields(dis);
+
+    return token;
+  }
+
+  private SecurityUtils() {
+    // Initialization is prohibited
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java
b/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java
new file mode 100644
index 0000000..59293b2
--- /dev/null
+++ b/connector/connector-sdk-hadoop/src/test/java/org/apache/sqoop/connector/hadoop/security/TestSecurityUtils.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hadoop.security;
+
+import org.apache.hadoop.io.Text;
+import org.testng.annotations.Test;
+import org.apache.hadoop.security.token.Token;
+import org.apache.sqoop.connector.hadoop.security.SecurityUtils;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class TestSecurityUtils {
+
+  @Test
+  public void testTokenSerializationDeserialization() throws Exception {
+    byte[] identifier = "identifier".getBytes();
+    byte[] password = "password".getBytes();
+    Text kind = new Text("kind");
+    Text service = new Text("service");
+
+    Token token = new Token(identifier, password, kind, service);
+    String serializedForm = SecurityUtils.serializeToken(token);
+    assertNotNull(serializedForm);
+
+    Token deserializedToken = SecurityUtils.deserializeToken(serializedForm);
+    assertNotNull(deserializedToken);
+
+    assertEquals(identifier, deserializedToken.getIdentifier());
+    assertEquals(password, deserializedToken.getPassword());
+    assertEquals(kind.toString(), deserializedToken.getKind().toString());
+    assertEquals(service.toString(), deserializedToken.getService().toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/connector/pom.xml
----------------------------------------------------------------------
diff --git a/connector/pom.xml b/connector/pom.xml
index be8fcb1..7340b37 100644
--- a/connector/pom.xml
+++ b/connector/pom.xml
@@ -34,6 +34,7 @@ limitations under the License.
 
   <modules>
     <module>connector-sdk</module>
+    <module>connector-sdk-hadoop</module>
     <module>connector-generic-jdbc</module>
     <module>connector-hdfs</module>
     <module>connector-kite</module>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d7b0dd5..12786ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -347,6 +347,11 @@ limitations under the License.
         <version>${project.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.sqoop</groupId>
+        <artifactId>connector-sdk-hadoop</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.sqoop.connector</groupId>
         <artifactId>sqoop-connector-generic-jdbc</artifactId>
         <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/sqoop/blob/b69fb9b6/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index c24183c..c0e40d5 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -78,6 +78,11 @@ limitations under the License.
     </dependency>
 
     <dependency>
+      <groupId>org.apache.sqoop</groupId>
+      <artifactId>connector-sdk-hadoop</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.sqoop.connector</groupId>
       <artifactId>sqoop-connector-generic-jdbc</artifactId>
       <exclusions>


Mime
View raw message