sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject sqoop git commit: SQOOP-1862: Sqoop2: JDBC Connector To side needs to handle converting JODA objects to sql date
Date Tue, 03 Feb 2015 21:59:32 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 fc32358ab -> 27d87b4f2


SQOOP-1862: Sqoop2: JDBC Connector To side needs to handle converting JODA objects to sql
date

(Veena Basavaraj via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/27d87b4f
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/27d87b4f
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/27d87b4f

Branch: refs/heads/sqoop2
Commit: 27d87b4f2f7fcbcdb52503016446b4de1dd5709a
Parents: fc32358
Author: Abraham Elmahrek <abe@apache.org>
Authored: Tue Feb 3 13:58:36 2015 -0800
Committer: Abraham Elmahrek <abe@apache.org>
Committed: Tue Feb 3 13:58:36 2015 -0800

----------------------------------------------------------------------
 .../common/test/asserts/ProviderAsserts.java    |  2 +-
 .../connector/jdbc/GenericJdbcExecutor.java     | 44 ++++++++++++++++----
 .../sqoop/connector/jdbc/GenericJdbcLoader.java |  3 +-
 .../apache/sqoop/connector/jdbc/TestLoader.java | 36 ++++++++++++++--
 .../idf/TestCSVIntermediateDataFormat.java      |  2 +-
 .../java/org/apache/sqoop/test/data/Cities.java |  9 ++--
 .../jdbc/generic/FromHDFSToRDBMSTest.java       | 16 +++----
 .../jdbc/generic/FromRDBMSToHDFSTest.java       |  8 ++--
 .../jdbc/generic/TableStagedRDBMSTest.java      | 18 ++++----
 .../connector/kafka/FromRDBMSToKafkaTest.java   |  8 ++--
 10 files changed, 102 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
----------------------------------------------------------------------
diff --git a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
index fb4e7af..d8c3c8e 100644
--- a/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
+++ b/common-test/src/main/java/org/apache/sqoop/common/test/asserts/ProviderAsserts.java
@@ -53,7 +53,7 @@ public class ProviderAsserts {
       int i = 1;
       for(Object expectedValue : values) {
         Object actualValue = rs.getObject(i);
-        assertEquals("Columns do not match on position: " + i, expectedValue, actualValue);
+        assertEquals("Columns do not match on position: " + i, expectedValue.toString(),
actualValue.toString());
         i++;
       }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index 5e7e4e6..7a01992 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -17,6 +17,15 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+import org.apache.log4j.Logger;
+import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.error.code.GenericJdbcConnectorError;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Column;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
@@ -25,10 +34,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
-
-import org.apache.log4j.Logger;
-import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.error.code.GenericJdbcConnectorError;
+import java.sql.Timestamp;
 
 public class GenericJdbcExecutor {
 
@@ -167,10 +173,34 @@ public class GenericJdbcExecutor {
     }
   }
 
-  public void addBatch(Object[] array) {
+  public void addBatch(Object[] array, Schema schema) {
     try {
-      for (int i=0; i<array.length; i++) {
-        preparedStatement.setObject(i+1, array[i]);
+      Column[] schemaColumns = schema.getColumnsArray();
+      for (int i = 0; i < array.length; i++) {
+        Column schemaColumn = schemaColumns[i];
+        switch (schemaColumn.getType()) {
+        case DATE:
+          // convert the JODA date to sql date
+          LocalDate date = (LocalDate) array[i];
+          java.sql.Date sqlDate = new java.sql.Date(date.toDateTimeAtCurrentTime().getMillis());
+          preparedStatement.setObject(i + 1, sqlDate);
+          break;
+        case DATE_TIME:
+          // convert the JODA date time to sql date
+          DateTime dateTime = (DateTime) array[i];
+          Timestamp timestamp = new Timestamp(dateTime.getMillis());
+          preparedStatement.setObject(i + 1, timestamp);
+          break;
+        case TIME:
+          // convert the JODA time to sql date
+          LocalTime time = (LocalTime) array[i];
+          java.sql.Time sqlTime = new java.sql.Time(time.toDateTimeToday().getMillis());
+          preparedStatement.setObject(i + 1, sqlTime);
+          break;
+        default:
+          // for anything else
+          preparedStatement.setObject(i + 1, array[i]);
+        }
       }
       preparedStatement.addBatch();
     } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
index 31fd876..ab1ac86 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcLoader.java
@@ -38,7 +38,6 @@ public class GenericJdbcLoader extends Loader<LinkConfiguration, ToJobConfigurat
     String password = linkConfig.linkConfig.password;
     GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
     executor.setAutoCommit(false);
-
     String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL);
     executor.beginBatch(sql);
     try {
@@ -48,7 +47,7 @@ public class GenericJdbcLoader extends Loader<LinkConfiguration, ToJobConfigurat
 
       while ((array = context.getDataReader().readArrayRecord()) != null) {
         numberOfRowsPerBatch++;
-        executor.addBatch(array);
+        executor.addBatch(array, context.getSchema());
 
         if (numberOfRowsPerBatch == rowsPerBatch) {
           numberOfBatchesPerTransaction++;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
index ba66510..2479f89 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
@@ -26,6 +26,17 @@ import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
 import org.apache.sqoop.etl.io.DataReader;
 import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
+import org.apache.sqoop.schema.Schema;
+import org.apache.sqoop.schema.type.Date;
+import org.apache.sqoop.schema.type.DateTime;
+
+import org.apache.sqoop.schema.type.Time;
+
+import org.apache.sqoop.schema.type.Decimal;
+import org.apache.sqoop.schema.type.FixedPoint;
+import org.apache.sqoop.schema.type.Text;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -64,7 +75,7 @@ public class TestLoader {
     if (!executor.existTable(tableName)) {
       executor.executeUpdate("CREATE TABLE "
           + executor.delimitIdentifier(tableName)
-          + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20))");
+          + "(ICOL INTEGER PRIMARY KEY, DCOL DOUBLE, VCOL VARCHAR(20), DATECOL DATE, DATETIMECOL
TIMESTAMP, TIMECOL TIME)");
     } else {
       executor.deleteTableData(tableName);
     }
@@ -75,6 +86,7 @@ public class TestLoader {
     executor.close();
   }
 
+  @SuppressWarnings({ "rawtypes", "unchecked" })
   @Test
   public void testInsert() throws Exception {
     MutableContext context = new MutableMapContext();
@@ -87,11 +99,16 @@ public class TestLoader {
     ToJobConfiguration jobConfig = new ToJobConfiguration();
 
     context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL,
-        "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?)");
+        "INSERT INTO " + executor.delimitIdentifier(tableName) + " VALUES (?,?,?,?,?,?)");
+
 
     Loader loader = new GenericJdbcLoader();
     DummyReader reader = new DummyReader();
-    LoaderContext loaderContext = new LoaderContext(context, reader, null);
+    Schema schema = new Schema("TestLoader");
+    schema.addColumn(new FixedPoint("c1", 2L, true)).addColumn(new Decimal("c2", 5, 2))
+        .addColumn(new Text("c3")).addColumn(new Date("c4"))
+        .addColumn(new DateTime("c5", false, false)).addColumn(new Time("c6", false));
+    LoaderContext loaderContext = new LoaderContext(context, reader, schema);
     loader.load(loaderContext, linkConfig, jobConfig);
 
     int index = START;
@@ -101,6 +118,10 @@ public class TestLoader {
       assertEquals(index, rs.getObject(1));
       assertEquals((double) index, rs.getObject(2));
       assertEquals(String.valueOf(index), rs.getObject(3));
+      assertEquals("2004-10-19", rs.getObject(4).toString());
+      assertEquals("2004-10-19 10:23:34.0", rs.getObject(5).toString());
+      assertEquals("11:33:59", rs.getObject(6).toString());
+
       index++;
     }
     assertEquals(numberOfRows, index-START);
@@ -111,11 +132,18 @@ public class TestLoader {
 
     @Override
     public Object[] readArrayRecord() {
+      LocalDate jodaDate= new LocalDate(2004, 10, 19);
+      org.joda.time.DateTime jodaDateTime= new org.joda.time.DateTime(2004, 10, 19, 10, 23,
34);
+      LocalTime time= new LocalTime(11, 33, 59);
+
       if (index < numberOfRows) {
         Object[] array = new Object[] {
             START + index,
             (double) (START + index),
-            String.valueOf(START+index) };
+            String.valueOf(START+index),
+            jodaDate,
+            jodaDateTime,
+            time};
         index++;
         return array;
       } else {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
----------------------------------------------------------------------
diff --git a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
index 2630a9d..9229639 100644
--- a/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
+++ b/connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestCSVIntermediateDataFormat.java
@@ -458,7 +458,7 @@ public class TestCSVIntermediateDataFormat {
     dataFormat = new CSVIntermediateDataFormat(schema);
     dataFormat.setCSVTextData("'2014-10-01'");
     org.joda.time.LocalDate date = new org.joda.time.LocalDate(2014, 10, 01);
-    assertEquals(date.toString(), dataFormat.getObjectData()[0].toString());
+    assertEquals(date, dataFormat.getObjectData()[0]);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/test/src/main/java/org/apache/sqoop/test/data/Cities.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/sqoop/test/data/Cities.java b/test/src/main/java/org/apache/sqoop/test/data/Cities.java
index 1e90c59..2589061 100644
--- a/test/src/main/java/org/apache/sqoop/test/data/Cities.java
+++ b/test/src/main/java/org/apache/sqoop/test/data/Cities.java
@@ -35,6 +35,7 @@ public class Cities extends DataSet {
       "id",
       "id", "int",
       "country", "varchar(50)",
+      "some_date", "date",
       "city", "varchar(50)"
     );
 
@@ -43,10 +44,10 @@ public class Cities extends DataSet {
 
   @Override
   public DataSet loadBasicData() {
-    provider.insertRow(tableBaseName, 1, "USA", "San Francisco");
-    provider.insertRow(tableBaseName, 2, "USA", "Sunnyvale");
-    provider.insertRow(tableBaseName, 3, "Czech Republic", "Brno");
-    provider.insertRow(tableBaseName, 4, "USA", "Palo Alto");
+    provider.insertRow(tableBaseName, 1, "USA", "2004-10-23","San Francisco");
+    provider.insertRow(tableBaseName, 2, "USA", "2004-10-24", "Sunnyvale");
+    provider.insertRow(tableBaseName, 3, "Czech Republic", "2004-10-25", "Brno");
+    provider.insertRow(tableBaseName, 4, "USA", "2004-10-26", "Palo Alto");
 
     return this;
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
index f82abc7..0b530b9 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromHDFSToRDBMSTest.java
@@ -36,10 +36,10 @@ public class FromHDFSToRDBMSTest extends ConnectorTestCase {
   public void testBasic() throws Exception {
     createTableCities();
     createFromFile("input-0001",
-      "1,'USA','San Francisco'",
-      "2,'USA','Sunnyvale'",
-      "3,'Czech Republic','Brno'",
-      "4,'USA','Palo Alto'"
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'",
+      "3,'Czech Republic','2004-10-25','Brno'",
+      "4,'USA','2004-10-26','Palo Alto'"
     );
 
     // RDBMS link
@@ -69,10 +69,10 @@ public class FromHDFSToRDBMSTest extends ConnectorTestCase {
     executeJob(job);
 
     assertEquals(4L, rowCount());
-    assertRowInCities(1, "USA", "San Francisco");
-    assertRowInCities(2, "USA", "Sunnyvale");
-    assertRowInCities(3, "Czech Republic", "Brno");
-    assertRowInCities(4, "USA", "Palo Alto");
+    assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
+    assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
+    assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
+    assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
 
     // Clean up testing table
     dropTable();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
index aa9f212..ced52cc 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java
@@ -67,10 +67,10 @@ public class FromRDBMSToHDFSTest extends ConnectorTestCase {
 
     // Assert correct output
     assertTo(
-      "1,'USA','San Francisco'",
-      "2,'USA','Sunnyvale'",
-      "3,'Czech Republic','Brno'",
-      "4,'USA','Palo Alto'"
+      "1,'USA','2004-10-23','San Francisco'",
+      "2,'USA','2004-10-24','Sunnyvale'",
+      "3,'Czech Republic','2004-10-25','Brno'",
+      "4,'USA','2004-10-26','Palo Alto'"
     );
 
     // Clean up testing table

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
index b648870..1d09b82 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/TableStagedRDBMSTest.java
@@ -37,11 +37,11 @@ public class TableStagedRDBMSTest extends ConnectorTestCase {
     final String stageTableName = "STAGE_" + getTableName();
     createTableCities();
     createFromFile("input-0001",
-      "1,'USA','San Francisco'",
-      "2,'USA','Sunnyvale'",
-      "3,'Czech Republic','Brno'",
-      "4,'USA','Palo Alto'"
-    );
+        "1,'USA','2004-10-23','San Francisco'",
+        "2,'USA','2004-10-24','Sunnyvale'",
+        "3,'Czech Republic','2004-10-25','Brno'",
+        "4,'USA','2004-10-26','Palo Alto'"
+      );
     new Cities(provider, stageTableName).createTables();
 
     // RDBMS link
@@ -76,10 +76,10 @@ public class TableStagedRDBMSTest extends ConnectorTestCase {
 
     assertEquals(0L, provider.rowCount(stageTableName));
     assertEquals(4L, rowCount());
-    assertRowInCities(1, "USA", "San Francisco");
-    assertRowInCities(2, "USA", "Sunnyvale");
-    assertRowInCities(3, "Czech Republic", "Brno");
-    assertRowInCities(4, "USA", "Palo Alto");
+    assertRowInCities(1, "USA", "2004-10-23", "San Francisco");
+    assertRowInCities(2, "USA", "2004-10-24", "Sunnyvale");
+    assertRowInCities(3, "Czech Republic", "2004-10-25", "Brno");
+    assertRowInCities(4, "USA", "2004-10-26", "Palo Alto");
 
     // Clean up testing table
     provider.dropTable(stageTableName);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/27d87b4f/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
index 04d2835..8a09d7e 100644
--- a/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/kafka/FromRDBMSToKafkaTest.java
@@ -29,10 +29,10 @@ import org.testng.annotations.Test;
 public class FromRDBMSToKafkaTest extends KafkaConnectorTestCase {
 
   private static final String[] input = {
-          "1,'USA','San Francisco'",
-          "2,'USA','Sunnyvale'",
-          "3,'Czech Republic','Brno'",
-          "4,'USA','Palo Alto'"
+          "1,'USA','2004-10-23','San Francisco'",
+          "2,'USA','2004-10-24','Sunnyvale'",
+          "3,'Czech Republic','2004-10-25','Brno'",
+          "4,'USA','2004-10-26','Palo Alto'"
   };
 
   @Test


Mime
View raw message