sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject sqoop git commit: SQOOP-2352: Sqoop2: Generic JDBC Connector support for fetch size
Date Wed, 14 Oct 2015 21:11:38 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 d69bd34e0 -> 72a9d4383


SQOOP-2352: Sqoop2: Generic JDBC Connector support for fetch size

(Abraham Fine via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/72a9d438
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/72a9d438
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/72a9d438

Branch: refs/heads/sqoop2
Commit: 72a9d4383156517f2d335f23e23e327d10e01dd1
Parents: d69bd34
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Wed Oct 14 14:11:20 2015 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Wed Oct 14 14:11:20 2015 -0700

----------------------------------------------------------------------
 .../connector/jdbc/GenericJdbcExecutor.java     | 36 ++++++++++++++------
 .../jdbc/GenericJdbcFromInitializer.java        |  8 ++---
 .../jdbc/GenericJdbcToInitializer.java          |  3 +-
 .../jdbc/configuration/LinkConfig.java          |  4 +++
 .../generic-jdbc-connector-config.properties    |  5 +++
 .../connector/jdbc/GenericJdbcExecutorTest.java |  9 ++++-
 .../jdbc/GenericJdbcTestConstants.java          |  1 +
 .../apache/sqoop/connector/jdbc/TestLoader.java |  3 +-
 8 files changed, 49 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index ad6f649..ff33a4b 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -130,15 +130,34 @@ public class GenericJdbcExecutor {
     return connection;
   }
 
-  public PreparedStatement createStatement(String sql) {
-     try {
-      return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+  public Statement createStatement() {
+    try {
+      Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+      setFetchSize(statement);
+      return statement;
     } catch (SQLException e) {
       logSQLException(e);
       throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
     }
   }
 
+  public PreparedStatement prepareStatement(String sql) {
+    try {
+      PreparedStatement preparedStatement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+      setFetchSize(preparedStatement);
+      return preparedStatement;
+    } catch (SQLException e) {
+      logSQLException(e);
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+    }
+  }
+
+  private void setFetchSize(Statement statement) throws SQLException {
+    if(link.linkConfig.fetchSize != null) {
+      statement.setFetchSize(link.linkConfig.fetchSize);
+    }
+  }
+
   public void setAutoCommit(boolean autoCommit) {
     try {
       connection.setAutoCommit(autoCommit);
@@ -217,7 +236,7 @@ public class GenericJdbcExecutor {
       final long expectedInsertCount = getTableRowCount(fromTable);
       oldAutoCommit = connection.getAutoCommit();
       connection.setAutoCommit(false);
-      stmt = connection.createStatement();
+      stmt = createStatement();
       final int actualInsertCount = stmt.executeUpdate(insertQuery);
       if(expectedInsertCount == actualInsertCount) {
         LOG.info("Transferred " + actualInsertCount + " rows of staged data " +
@@ -255,8 +274,7 @@ public class GenericJdbcExecutor {
   }
 
   public long getTableRowCount(String tableName) {
-    try (Statement statement = connection.createStatement(
-            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try (Statement statement = createStatement();
          ResultSet resultSet = statement.executeQuery("SELECT COUNT(1) FROM " + encloseIdentifier(tableName));)
{
       resultSet.next();
       return resultSet.getLong(1);
@@ -267,8 +285,7 @@ public class GenericJdbcExecutor {
   }
 
   public void executeUpdate(String sql) {
-    try (Statement statement = connection.createStatement(
-            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+    try (Statement statement = createStatement()) {
       statement.executeUpdate(sql);
     } catch (SQLException e) {
       logSQLException(e);
@@ -425,8 +442,7 @@ public class GenericJdbcExecutor {
   }
 
   public String[] getQueryColumns(String query) {
-    try (Statement statement = connection.createStatement(
-            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try (Statement statement = createStatement();
          ResultSet rs = statement.executeQuery(query);) {
       ResultSetMetaData rsmd = rs.getMetaData();
       int count = rsmd.getColumnCount();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
index 5a357bd..c2d22f7 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
@@ -82,8 +82,7 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration,
F
 
     Schema schema = new Schema(schemaName);
     ResultSetMetaData rsmt = null;
-    try (Statement statement = executor.getConnection().createStatement(
-            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try (Statement statement = executor.createStatement();
          ResultSet rs = statement.executeQuery(context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_FROM_DATA_SQL)
                  .replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 0"));)
{
 
@@ -172,8 +171,7 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration,
F
       String incrementalNewMaxValueQuery = sb.toString();
       LOG.info("Incremental new max value query:  " + incrementalNewMaxValueQuery);
 
-      try (Statement statement = executor.getConnection().createStatement(
-              ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+      try (Statement statement = executor.createStatement();
            ResultSet rs = statement.executeQuery(incrementalNewMaxValueQuery);) {
         if (!rs.next()) {
           throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0022);
@@ -208,7 +206,7 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration,
F
     PreparedStatement ps = null;
     ResultSet rs = null;
     try {
-      ps = executor.createStatement(minMaxQuery);
+      ps = executor.prepareStatement(minMaxQuery);
       if (incrementalImport) {
         ps.setString(1, jobConf.incrementalRead.lastValue);
         ps.setString(2, incrementalMaxValue);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
index ed215ea..fc49061 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcToInitializer.java
@@ -69,8 +69,7 @@ public class GenericJdbcToInitializer extends Initializer<LinkConfiguration,
ToJ
     assert schemaName != null;
 
     Schema schema = new Schema(schemaName);
-    try (Statement statement = executor.getConnection().createStatement(
-            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try (Statement statement = executor.createStatement();
          ResultSet rs = statement.executeQuery("SELECT * FROM " + schemaName + " WHERE 1
= 0");) {
 
       ResultSetMetaData rsmt = rs.getMetaData();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfig.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfig.java
index 885c6f5..ea6b85e 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfig.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/LinkConfig.java
@@ -22,6 +22,7 @@ import org.apache.sqoop.model.Input;
 import org.apache.sqoop.model.Validator;
 import org.apache.sqoop.validation.Status;
 import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.InRange;
 import org.apache.sqoop.validation.validators.NotEmpty;
 import org.apache.sqoop.validation.validators.ClassAvailable;
 import org.apache.sqoop.validation.validators.StartsWith;
@@ -48,6 +49,9 @@ public class LinkConfig {
   @Input(size = 40, sensitive = true)
   public String password;
 
+  @Input(validators = {@Validator(value = InRange.class, strArg = "0," + Integer.MAX_VALUE)})
+  public Integer fetchSize;
+
   @Input
   public Map<String, String> jdbcProperties;
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
index 73fa308..6defb26 100644
--- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
+++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
@@ -43,6 +43,11 @@ linkConfig.password.label = Password
 linkConfig.password.help = Enter the password to be used for connecting to the \
                    database.
 
+# fetch size int
+linkConfig.fetchSize.label = Fetch Size
+linkConfig.fetchSize.help = Optional hint for JDBC fetch size. See  \
+                   http://docs.oracle.com/javase/7/docs/api/java/sql/Statement.html#setFetchSize(int)
+
 # jdbc properties
 linkConfig.jdbcProperties.label = JDBC Connection Properties
 linkConfig.jdbcProperties.help = Enter any JDBC properties that should be \

http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
index 5587840..3e756a1 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutorTest.java
@@ -28,7 +28,6 @@ import java.sql.SQLException;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
 public class GenericJdbcExecutorTest {
@@ -158,4 +157,12 @@ public class GenericJdbcExecutorTest {
     assertEquals(NUMBER_OF_ROWS, executor.getTableRowCount(table),
             "Table " + table + " is expected to be empty.");
   }
+
+  @Test
+  public void testFetchSize() throws Exception {
+    assertEquals((int) GenericJdbcTestConstants.LINK_CONFIGURATION.linkConfig.fetchSize,
+      executor.createStatement().getFetchSize());
+    assertEquals((int) GenericJdbcTestConstants.LINK_CONFIGURATION.linkConfig.fetchSize,
+      executor.prepareStatement("SELECT * FROM " + table).getFetchSize());
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java
index e16c631..4c313a6 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/GenericJdbcTestConstants.java
@@ -43,5 +43,6 @@ public class GenericJdbcTestConstants {
   static {
     LINK_CONFIGURATION.linkConfig.jdbcDriver = DRIVER;
     LINK_CONFIGURATION.linkConfig.connectionString = URL;
+    LINK_CONFIGURATION.linkConfig.fetchSize = 25;
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/72a9d438/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
index dfacc20..83411fb 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestLoader.java
@@ -111,8 +111,7 @@ public class TestLoader {
     loader.load(loaderContext, linkConfig, jobConfig);
 
     int index = START;
-    try (Statement statement = executor.getConnection().createStatement(
-            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    try (Statement statement = executor.createStatement();
          ResultSet rs = statement.executeQuery("SELECT * FROM "
                  + executor.encloseIdentifier(tableName) + " ORDER BY ICOL");) {
       while (rs.next()) {


Mime
View raw message