sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From venkatran...@apache.org
Subject [1/2] SQOOP-1073: Sqoop2: Introduce schema for transferred data
Date Mon, 01 Jul 2013 04:06:59 GMT
Updated Branches:
  refs/heads/sqoop2 344c6309c -> aa8e1e779


http://git-wip-us.apache.org/repos/asf/sqoop/blob/aa8e1e77/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
index 3e9789c..96818ba 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java
@@ -29,9 +29,12 @@ import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
+import org.apache.sqoop.connector.jdbc.util.SqlTypesUtils;
 import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class GenericJdbcImportInitializer extends Initializer<ConnectionConfiguration,
ImportJobConfiguration> {
@@ -61,6 +64,55 @@ public class GenericJdbcImportInitializer extends Initializer<ConnectionConfigur
     return jars;
   }
 
+  @Override
+  public Schema getSchema(InitializerContext context, ConnectionConfiguration connectionConfiguration,
ImportJobConfiguration importJobConfiguration) {
+    configureJdbcProperties(context.getContext(), connectionConfiguration, importJobConfiguration);
+
+    String schemaName = importJobConfiguration.table.tableName;
+    if(schemaName == null) {
+      schemaName = "Query";
+    }
+
+    Schema schema = new Schema(schemaName);
+
+    ResultSet rs = null;
+    ResultSetMetaData rsmt = null;
+    try {
+      rs = executor.executeQuery(
+        context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL)
+          .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0")
+      );
+
+      rsmt = rs.getMetaData();
+      for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
+        Column column = SqlTypesUtils.sqlTypeToAbstractType(rsmt.getColumnType(i));
+
+        String columnName = rsmt.getColumnName(i);
+        if (columnName == null || columnName.equals("")) {
+          columnName = rsmt.getColumnLabel(i);
+          if (null == columnName) {
+            columnName = "Column " + i;
+          }
+        }
+
+        column.setName(columnName);
+        schema.addColumn(column);
+      }
+
+      return schema;
+    } catch (SQLException e) {
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
+    } finally {
+      if(rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          LOG.info("Ignoring exception while closing ResultSet", e);
+        }
+      }
+    }
+  }
+
   private void configureJdbcProperties(MutableContext context, ConnectionConfiguration connectionConfig,
ImportJobConfiguration jobConfig) {
     String driver = connectionConfig.connection.jdbcDriver;
     String url = connectionConfig.connection.connectionString;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/aa8e1e77/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
new file mode 100644
index 0000000..c18f165
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/util/SqlTypesUtils.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.util;
+
+import org.apache.sqoop.schema.type.Column;
+import org.apache.sqoop.schema.type.Binary;
+import org.apache.sqoop.schema.type.Bit;
+import org.apache.sqoop.schema.type.Date;
+import org.apache.sqoop.schema.type.DateTime;
+import org.apache.sqoop.schema.type.Decimal;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.apache.sqoop.schema.type.Time;
+import org.apache.sqoop.schema.type.Unsupported;
+
+import java.sql.Types;
+
+/**
+ * Utility class to work with SQL types.
+ */
+public class SqlTypesUtils {
+
+  /**
+   * Convert given java.sql.Types number into internal data type.
+   *
+   * @param sqlType java.sql.Types constant
+   * @return Concrete Column implementation
+   */
+  public static Column sqlTypeToAbstractType(int sqlType) {
+    switch (sqlType) {
+      case Types.SMALLINT:
+      case Types.TINYINT:
+      case Types.INTEGER:
+        return new FixedPoint();
+
+      case Types.VARCHAR:
+      case Types.CHAR:
+      case Types.LONGVARCHAR:
+      case Types.NVARCHAR:
+      case Types.NCHAR:
+      case Types.LONGNVARCHAR:
+        return new Text();
+
+      case Types.DATE:
+        return new Date();
+
+      case Types.TIME:
+        return new Time();
+
+      case Types.TIMESTAMP:
+        return new DateTime();
+
+      case Types.CLOB:
+      case Types.FLOAT:
+      case Types.REAL:
+      case Types.DOUBLE:
+        return new FloatingPoint();
+
+      case Types.NUMERIC:
+      case Types.DECIMAL:
+      case Types.BIGINT:
+        return new Decimal();
+
+      case Types.BIT:
+      case Types.BOOLEAN:
+        return new Bit();
+
+      case Types.BINARY:
+      case Types.VARBINARY:
+      case Types.BLOB:
+      case Types.LONGVARBINARY:
+        return new Binary();
+
+      default:
+        return new Unsupported((long)sqlType);
+    }
+  }
+
+  private SqlTypesUtils() {
+    // Instantiation is prohibited
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/aa8e1e77/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
index 9f4269a..a33fa36 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportInitializer.java
@@ -28,6 +28,10 @@ import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.Constants;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.FloatingPoint;
+import org.apache.sqoop.schema.type.Text;
 
 public class TestImportInitializer extends TestCase {
 
@@ -87,6 +91,20 @@ public class TestImportInitializer extends TestCase {
     }
   }
 
+  /**
+   * Return Schema representation for the testing table.
+   *
+   * @param name Name that should be used for the generated schema.
+   * @return
+   */
+  public Schema getSchema(String name) {
+    return new Schema(name)
+      .addColumn(new FixedPoint("ICOL"))
+      .addColumn(new FloatingPoint("DCOL"))
+      .addColumn(new Text("VCOL"))
+    ;
+  }
+
   @Override
   public void tearDown() {
     executor.close();
@@ -290,6 +308,49 @@ public class TestImportInitializer extends TestCase {
         String.valueOf((double)(START+NUMBER_OF_ROWS-1)));
   }
 
+
+  @SuppressWarnings("unchecked")
+  public void testGetSchemaForTable() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.schemaName = schemaName;
+    jobConf.table.tableName = tableName;
+    jobConf.table.partitionColumn = "DCOL";
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcImportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+    Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
+    assertEquals(getSchema(tableName), schema);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testGetSchemaForSql() throws Exception {
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    connConf.connection.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    connConf.connection.connectionString = GenericJdbcTestConstants.URL;
+    jobConf.table.schemaName = schemaName;
+    jobConf.table.sql = tableSql;
+    jobConf.table.partitionColumn = "DCOL";
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcImportInitializer();
+    initializer.initialize(initializerContext, connConf, jobConf);
+    Schema schema = initializer.getSchema(initializerContext, connConf, jobConf);
+    assertEquals(getSchema("Query"), schema);
+  }
+
   @SuppressWarnings("unchecked")
   public void testTableSqlWithTableColumnsWithSchema() throws Exception {
     ConnectionConfiguration connConf = new ConnectionConfiguration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/aa8e1e77/core/src/main/java/org/apache/sqoop/framework/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/framework/JobManager.java b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
index 5a2f490..58d6c10 100644
--- a/core/src/main/java/org/apache/sqoop/framework/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/framework/JobManager.java
@@ -355,6 +355,13 @@ public class JobManager implements Reconfigurable {
                request.getConfigConnectorConnection(),
                request.getConfigConnectorJob()));
 
+       // Retrieve and persist the schema
+       request.getSummary().setConnectorSchema(initializer.getSchema(
+           initializerContext,
+           request.getConfigConnectorConnection(),
+           request.getConfigConnectorJob()
+       ));
+
        // Bootstrap job from framework perspective
        switch (job.getType()) {
            case IMPORT:

http://git-wip-us.apache.org/repos/asf/sqoop/blob/aa8e1e77/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
----------------------------------------------------------------------
diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
index 346b84c..88744ea 100644
--- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
+++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.sqoop.job.etl;
 
+import org.apache.sqoop.schema.Schema;
+
 import java.util.LinkedList;
 import java.util.List;
 
@@ -52,4 +54,8 @@ public abstract class Initializer<ConnectionConfiguration, JobConfiguration>
{
     return new LinkedList<String>();
   }
 
+  public abstract Schema getSchema(InitializerContext context,
+                                   ConnectionConfiguration connectionConfiguration,
+                                   JobConfiguration jobConfiguration);
+
 }


Mime
View raw message