sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject sqoop git commit: SQOOP-1805: Sqoop2: GenericJdbcConnector: Delta read support
Date Mon, 16 Mar 2015 17:14:54 GMT
Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 3f618c917 -> e6519c76c


SQOOP-1805: Sqoop2: GenericJdbcConnector: Delta read support

(jarek Jarcec Cecho via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/e6519c76
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/e6519c76
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/e6519c76

Branch: refs/heads/sqoop2
Commit: e6519c76c673caaf2816b5e68183d1723d777e01
Parents: 3f618c9
Author: Abraham Elmahrek <abe@apache.org>
Authored: Mon Mar 16 10:13:05 2015 -0700
Committer: Abraham Elmahrek <abe@apache.org>
Committed: Mon Mar 16 10:13:05 2015 -0700

----------------------------------------------------------------------
 .../error/code/GenericJdbcConnectorError.java   |   4 +
 .../jdbc/GenericJdbcConnectorConstants.java     |   2 +
 .../connector/jdbc/GenericJdbcExecutor.java     |   9 +
 .../jdbc/GenericJdbcFromInitializer.java        | 186 +++++++++++--------
 .../configuration/FromJobConfiguration.java     |   3 +
 .../jdbc/configuration/IncrementalRead.java     |  49 +++++
 .../generic-jdbc-connector-config.properties    |  14 +-
 .../connector/jdbc/TestFromInitializer.java     | 120 ++++++++++++
 .../jdbc/TestGenericJdbcConnector.java          |  77 ++++++++
 .../jdbc/generic/IncrementalReadTest.java       | 176 ++++++++++++++++++
 10 files changed, 560 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
b/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
index 03bc104..f18acbd 100644
--- a/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
+++ b/common/src/main/java/org/apache/sqoop/error/code/GenericJdbcConnectorError.java
@@ -85,6 +85,10 @@ public enum GenericJdbcConnectorError implements ErrorCode {
 
   GENERIC_JDBC_CONNECTOR_0021("Schema column size do not match the result set column size"),
 
+  GENERIC_JDBC_CONNECTOR_0022("Can't find maximal value of column"),
+
+  GENERIC_JDBC_CONNECTOR_0023("Received error from the database"),
+
   ;
 
   private final String message;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
index 4369071..dc86821 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcConnectorConstants.java
@@ -41,6 +41,8 @@ public final class GenericJdbcConnectorConstants {
       PREFIX_CONNECTOR_JDBC_CONFIG + "partition.minvalue";
   public static final String CONNECTOR_JDBC_PARTITION_MAXVALUE =
       PREFIX_CONNECTOR_JDBC_CONFIG + "partition.maxvalue";
+  public static final String CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE =
+    PREFIX_CONNECTOR_JDBC_CONFIG + "incremental.last_value";
 
   public static final String CONNECTOR_JDBC_FROM_DATA_SQL =
       PREFIX_CONNECTOR_JDBC_CONFIG + "from.data.sql";

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
index 7a01992..5af34a5 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExecutor.java
@@ -72,6 +72,15 @@ public class GenericJdbcExecutor {
     }
   }
 
+  public PreparedStatement createStatement(String sql) {
+     try {
+      return connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+    } catch (SQLException e) {
+      logSQLException(e);
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0002, e);
+    }
+  }
+
   public void setAutoCommit(boolean autoCommit) {
     try {
       connection.setAutoCommit(autoCommit);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
index 1ecd152..6ad2cab 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.sqoop.connector.jdbc;
 
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -50,6 +51,8 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration,
F
     try {
       configurePartitionProperties(context.getContext(), linkConfig, fromJobConfig);
       configureTableProperties(context.getContext(), linkConfig, fromJobConfig);
+    } catch(SQLException e) {
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0016, e);
     } finally {
       executor.close();
     }
@@ -124,108 +127,141 @@ public class GenericJdbcFromInitializer extends Initializer<LinkConfiguration,
F
     executor = new GenericJdbcExecutor(driver, url, username, password);
   }
 
-  private void configurePartitionProperties(MutableContext context, LinkConfiguration linkConfig,
FromJobConfiguration fromJobConfig) {
-    // ----- configure column name -----
-
-    String partitionColumnName = fromJobConfig.fromJobConfig.partitionColumn;
-
-    if (partitionColumnName == null) {
-      // if column is not specified by the user,
-      // find the primary key of the fromTable (when there is a fromTable).
-      String tableName = fromJobConfig.fromJobConfig.tableName;
-      if (tableName != null) {
-        partitionColumnName = executor.getPrimaryKey(tableName);
-      }
+  private void configurePartitionProperties(MutableContext context, LinkConfiguration linkConfig,
FromJobConfiguration jobConf) throws SQLException {
+    // Assertions that should be valid (verified via validator)
+    assert (jobConf.fromJobConfig.tableName != null && jobConf.fromJobConfig.sql
== null) ||
+           (jobConf.fromJobConfig.tableName == null && jobConf.fromJobConfig.sql
!= null);
+    assert (jobConf.fromJobConfig.boundaryQuery == null && jobConf.incrementalRead.checkColumn
== null) ||
+           (jobConf.fromJobConfig.boundaryQuery != null && jobConf.incrementalRead.checkColumn
== null) ||
+           (jobConf.fromJobConfig.boundaryQuery == null && jobConf.incrementalRead.checkColumn
!= null);
+
+    // We have few if/else conditions based on import type
+    boolean tableImport = jobConf.fromJobConfig.tableName != null;
+    boolean incrementalImport = jobConf.incrementalRead.checkColumn != null;
+
+    // For generating queries
+    StringBuilder sb = new StringBuilder();
+
+    // Partition column name
+    String partitionColumnName = jobConf.fromJobConfig.partitionColumn;
+    // If it's not specified, we can use primary key of given table (if it's table based
import)
+    if (StringUtils.isBlank(partitionColumnName) && tableImport) {
+        partitionColumnName = executor.getPrimaryKey(jobConf.fromJobConfig.tableName);
     }
-
+    // If we don't have partition column name, we will error out
     if (partitionColumnName != null) {
-      context.setString(
-          GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
-          partitionColumnName);
-
+      context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
partitionColumnName);
     } else {
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
+      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0005);
     }
+    LOG.info("Using partition column: " + partitionColumnName);
 
-    // ----- configure column type, min value, and max value -----
-
-    String minMaxQuery = fromJobConfig.fromJobConfig.boundaryQuery;
+    // From fragment for subsequent queries
+    String fromFragment;
+    if(tableImport) {
+      String tableName = jobConf.fromJobConfig.tableName;
+      String schemaName = jobConf.fromJobConfig.schemaName;
 
-    if (minMaxQuery == null) {
-      StringBuilder builder = new StringBuilder();
-
-      String schemaName = fromJobConfig.fromJobConfig.schemaName;
-      String tableName = fromJobConfig.fromJobConfig.tableName;
-      String tableSql = fromJobConfig.fromJobConfig.sql;
+      fromFragment = executor.delimitIdentifier(tableName);
+      if(schemaName != null) {
+        fromFragment = executor.delimitIdentifier(schemaName) + "." + fromFragment;
+      }
+    } else {
+      sb.setLength(0);
+      sb.append("(");
+      sb.append(jobConf.fromJobConfig.sql.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN,
"1 = 1"));
+      sb.append(") ");
+      sb.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
+      fromFragment = sb.toString();
+    }
 
-      if (tableName != null && tableSql != null) {
-        // when both fromTable name and fromTable sql are specified:
-        throw new SqoopException(
-            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0007);
+    // If this is incremental, then we need to get new maximal value and persist is a constant
+    String incrementalMaxValue = null;
+    if(incrementalImport) {
+      sb.setLength(0);
+      sb.append("SELECT ");
+      sb.append("MAX(").append(jobConf.incrementalRead.checkColumn).append(") ");
+      sb.append("FROM ");
+      sb.append(fromFragment);
 
-      } else if (tableName != null) {
-        // when fromTable name is specified:
+      String incrementalNewMaxValueQuery = sb.toString();
+      LOG.info("Incremental new max value query:  " + incrementalNewMaxValueQuery);
 
-        // For databases that support schemas (IE: postgresql).
-        String fullTableName = (schemaName == null) ? executor.delimitIdentifier(tableName)
: executor.delimitIdentifier(schemaName) + "." + executor.delimitIdentifier(tableName);
+      ResultSet rs = null;
+      try {
+        rs = executor.executeQuery(incrementalNewMaxValueQuery);
 
-        String column = partitionColumnName;
-        builder.append("SELECT MIN(");
-        builder.append(column);
-        builder.append("), MAX(");
-        builder.append(column);
-        builder.append(") FROM ");
-        builder.append(fullTableName);
+        if (!rs.next()) {
+          throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0022);
+        }
 
-      } else if (tableSql != null) {
-        String column = executor.qualify(
-            partitionColumnName, GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
-        builder.append("SELECT MIN(");
-        builder.append(column);
-        builder.append("), MAX(");
-        builder.append(column);
-        builder.append(") FROM ");
-        builder.append("(");
-        builder.append(tableSql.replace(
-            GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, "1 = 1"));
-        builder.append(") ");
-        builder.append(GenericJdbcConnectorConstants.SUBQUERY_ALIAS);
+        incrementalMaxValue = rs.getString(1);
+        context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE,
incrementalMaxValue);
+        LOG.info("New maximal value for incremental import is " + incrementalMaxValue);
+      } finally {
+        if(rs != null) {
+          rs.close();
+        }
+      }
+    }
 
-      } else {
-        // when neither are specified:
-        throw new SqoopException(
-            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0008);
+    // Retrieving min and max values for partition column
+    String minMaxQuery = jobConf.fromJobConfig.boundaryQuery;
+    if (minMaxQuery == null) {
+      sb.setLength(0);
+      sb.append("SELECT ");
+      sb.append("MIN(").append(partitionColumnName).append("), ");
+      sb.append("MAX(").append(partitionColumnName).append(") ");
+      sb.append("FROM ").append(fromFragment).append(" ");
+
+      if(incrementalImport) {
+        sb.append("WHERE ");
+        sb.append(jobConf.incrementalRead.checkColumn).append(" > ?");
+        sb.append(" AND ");
+        sb.append(jobConf.incrementalRead.checkColumn).append(" <= ?");
       }
 
-      minMaxQuery = builder.toString();
+      minMaxQuery = sb.toString();
     }
+    LOG.info("Using min/max query: " + minMaxQuery);
 
-
-    LOG.debug("Using minMaxQuery: " + minMaxQuery);
-    ResultSet rs = executor.executeQuery(minMaxQuery);
+    PreparedStatement ps = null;
+    ResultSet rs = null;
     try {
-      ResultSetMetaData rsmd = rs.getMetaData();
-      if (rsmd.getColumnCount() != 2) {
-        throw new SqoopException(
-            GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
+      ps = executor.createStatement(minMaxQuery);
+      if (incrementalImport) {
+        ps.setString(1, jobConf.incrementalRead.lastValue);
+        ps.setString(2, incrementalMaxValue);
       }
 
-      rs.next();
+      rs = ps.executeQuery();
+      if(!rs.next()) {
+        throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
+      }
 
-      int columnType = rsmd.getColumnType(1);
+      // Boundaries for the job
       String min = rs.getString(1);
       String max = rs.getString(2);
 
-      LOG.info("Boundaries: min=" + min + ", max=" + max + ", columnType=" + columnType);
+      // Type of the partition column
+      ResultSetMetaData rsmd = rs.getMetaData();
+      if (rsmd.getColumnCount() != 2) {
+        throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006);
+      }
+      int columnType = rsmd.getColumnType(1);
+
+      LOG.info("Boundaries for the job: min=" + min + ", max=" + max + ", columnType=" +
columnType);
 
       context.setInteger(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE,
columnType);
       context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
min);
       context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
max);
-
-    } catch (SQLException e) {
-      throw new SqoopException(
-          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0006, e);
+    } finally {
+      if(ps != null) {
+        ps.close();
+      }
+      if(rs != null) {
+        rs.close();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
index 39e8edd..d11b3b1 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/FromJobConfiguration.java
@@ -27,7 +27,10 @@ import org.apache.sqoop.model.Config;
 public class FromJobConfiguration {
   @Config public FromJobConfig fromJobConfig;
 
+  @Config public IncrementalRead incrementalRead;
+
   public FromJobConfiguration() {
     fromJobConfig = new FromJobConfig();
+    incrementalRead = new IncrementalRead();
   }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java
new file mode 100644
index 0000000..f226532
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/configuration/IncrementalRead.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.InputEditable;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+
+/**
+ */
+@ConfigClass(validators = {@Validator(IncrementalRead.ConfigValidator.class)})
+public class IncrementalRead {
+  @Input(size = 50)
+  public String checkColumn;
+
+  @Input(editable = InputEditable.ANY)
+  public String lastValue;
+
+  public static class ConfigValidator extends AbstractValidator<IncrementalRead> {
+    @Override
+    public void validate(IncrementalRead conf) {
+      if(conf.checkColumn != null && conf.lastValue == null) {
+        addMessage(Status.ERROR, "Last value is required during incremental read");
+      }
+
+      if(conf.checkColumn == null && conf.lastValue != null) {
+        addMessage(Status.ERROR, "Last value can't be filled without check column.");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
index 6a2159b..52bf631 100644
--- a/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
+++ b/connector/connector-generic-jdbc/src/main/resources/generic-jdbc-connector-config.properties
@@ -112,9 +112,13 @@ toJobConfig.stageTableName.help = Name of the staging table to use (Optional)
 toJobConfig.shouldClearStageTable.label = Should clear stage table
 toJobConfig.shouldClearStageTable.help = Indicate if the stage table should be cleared (Defaults
to false)
 
-# Placeholders to have some entities created
-ignored.label = Ignored
-ignored.help = This is completely ignored
+# Incremental related configuration
+incrementalRead.label = Incremental read
+incrementalRead.help = Configuration related to incremental read
+
+incrementalRead.checkColumn.label = Check column
+incrementalRead.checkColumn.help = Column that is checked during incremental read for new
values
+
+incrementalRead.lastValue.label = Last value
+incrementalRead.lastValue.help = Last read value, fetch will resume with higher values
 
-ignored.ignored.label = Ignored
-ignored.ignored.help = This is completely ignored
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
index 52003ab..e9c8d41 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestFromInitializer.java
@@ -142,6 +142,66 @@ public class TestFromInitializer {
 
   @Test
   @SuppressWarnings("unchecked")
+  public void testIncrementalTableNameFullRange() throws Exception {
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.tableName = schemalessTableName;
+    jobConfig.incrementalRead.checkColumn = "ICOL";
+    jobConfig.incrementalRead.lastValue = "-51";
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcFromInitializer();
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+    verifyResult(context,
+        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+        "ICOL,DCOL,VCOL",
+        "ICOL",
+        String.valueOf(Types.INTEGER),
+        String.valueOf(START),
+        String.valueOf(START+NUMBER_OF_ROWS-1));
+
+    assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testIncrementalTableNameFromZero() throws Exception {
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.tableName = schemalessTableName;
+    jobConfig.incrementalRead.checkColumn = "ICOL";
+    jobConfig.incrementalRead.lastValue = "0";
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcFromInitializer();
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+    verifyResult(context,
+        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+        "ICOL,DCOL,VCOL",
+        "ICOL",
+        String.valueOf(Types.INTEGER),
+        String.valueOf(1),
+        String.valueOf(START+NUMBER_OF_ROWS-1));
+
+    assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
   public void testTableNameWithTableColumns() throws Exception {
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();
@@ -198,6 +258,66 @@ public class TestFromInitializer {
 
   @Test
   @SuppressWarnings("unchecked")
+  public void testIncrementalTableSqlFullRange() throws Exception {
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.sql = schemalessTableSql;
+    jobConfig.fromJobConfig.partitionColumn = "ICOL";
+    jobConfig.incrementalRead.checkColumn = "ICOL";
+    jobConfig.incrementalRead.lastValue = "-51";
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcFromInitializer();
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+    verifyResult(context,
+        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+        "ICOL,DCOL,VCOL",
+        "ICOL",
+        String.valueOf(Types.INTEGER),
+        String.valueOf(START),
+        String.valueOf((START+NUMBER_OF_ROWS-1)));
+    assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testIncrementalTableSqlFromZero() throws Exception {
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    FromJobConfiguration jobConfig = new FromJobConfiguration();
+
+    linkConfig.linkConfig.jdbcDriver = GenericJdbcTestConstants.DRIVER;
+    linkConfig.linkConfig.connectionString = GenericJdbcTestConstants.URL;
+    jobConfig.fromJobConfig.sql = schemalessTableSql;
+    jobConfig.fromJobConfig.partitionColumn = "ICOL";
+    jobConfig.incrementalRead.checkColumn = "ICOL";
+    jobConfig.incrementalRead.lastValue = "0";
+
+    MutableContext context = new MutableMapContext();
+    InitializerContext initializerContext = new InitializerContext(context);
+
+    @SuppressWarnings("rawtypes")
+    Initializer initializer = new GenericJdbcFromInitializer();
+    initializer.initialize(initializerContext, linkConfig, jobConfig);
+
+    verifyResult(context,
+        "SELECT * FROM " + executor.delimitIdentifier(schemalessTableName) + " WHERE ${CONDITIONS}",
+        "ICOL,DCOL,VCOL",
+        "ICOL",
+        String.valueOf(Types.INTEGER),
+        String.valueOf(1),
+        String.valueOf((START+NUMBER_OF_ROWS-1)));
+    assertEquals(String.valueOf(START+NUMBER_OF_ROWS-1), context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
   public void testTableSqlWithTableColumns() throws Exception {
     LinkConfiguration linkConfig = new LinkConfiguration();
     FromJobConfiguration jobConfig = new FromJobConfiguration();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java
new file mode 100644
index 0000000..cc1c58f
--- /dev/null
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestGenericJdbcConnector.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.jdbc;
+
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.model.ConfigUtils;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MInput;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+/**
+ */
+public class TestGenericJdbcConnector {
+
+  @Test
+  public void testBundleForLink() {
+    GenericJdbcConnector connector = new GenericJdbcConnector();
+    verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getLinkConfigurationClass());
+  }
+
+  @Test
+  void testBundleForJobToDirection() {
+    GenericJdbcConnector connector = new GenericJdbcConnector();
+    verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.TO));
+  }
+
+  @Test
+  void testBundleForJobFromDirection() {
+    GenericJdbcConnector connector = new GenericJdbcConnector();
+    verifyBundleForConfigClass(connector.getBundle(Locale.getDefault()), connector.getJobConfigurationClass(Direction.FROM));
+  }
+
+  void verifyBundleForConfigClass(ResourceBundle bundle, Class klass) {
+    assertNotNull(bundle);
+    assertNotNull(klass);
+
+    List<MConfig> configs = ConfigUtils.toConfigs(klass);
+
+    for(MConfig config : configs) {
+      assertNotNull(config.getHelpKey());
+      assertNotNull(config.getLabelKey());
+
+      assertTrue(bundle.containsKey(config.getHelpKey()), "Can't find help for " + config.getName());
+      assertTrue(bundle.containsKey(config.getLabelKey()), "Can't find label for " + config.getName());
+
+      for(MInput input : config.getInputs()) {
+        assertNotNull(input.getHelpKey());
+        assertNotNull(input.getLabelKey());
+
+        assertTrue(bundle.containsKey(input.getHelpKey()), "Can't find help for " + input.getName());
+        assertTrue(bundle.containsKey(input.getLabelKey()), "Can't find label for " + input.getName());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/e6519c76/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
new file mode 100644
index 0000000..716de30
--- /dev/null
+++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.integration.connector.jdbc.generic;
+
+import com.google.common.collect.Iterables;
+import org.apache.sqoop.common.Direction;
+import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
+import org.apache.sqoop.model.MConfigList;
+import org.apache.sqoop.model.MJob;
+import org.apache.sqoop.model.MLink;
+import org.apache.sqoop.test.testcases.ConnectorTestCase;
+import org.apache.sqoop.test.utils.ParametrizedUtils;
+import org.testng.ITest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Factory;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+
+/**
+ */
+public class IncrementalReadTest extends ConnectorTestCase implements ITest {
+
+  public static Object[] COLUMNS = new Object [][] {
+    //       column -   last value - new max value
+    {          "id",          "9",         "19"},
+    {     "version",       "8.10",      "13.10"},
+    {"release_date", "2008-10-18", "2013-10-17"},
+  };
+
+  private String checkColumn;
+  private String lastValue;
+  private String newMaxValue;
+
+  @Factory(dataProvider="incremental-integration-test")
+  public IncrementalReadTest(String checkColumn, String lastValue, String newMaxValue) {
+    this.checkColumn = checkColumn;
+    this.lastValue = lastValue;
+    this.newMaxValue = newMaxValue;
+  }
+
+  @DataProvider(name="incremental-integration-test", parallel=true)
+  public static Object[][] data() {
+    return Iterables.toArray(ParametrizedUtils.toArrayOfArrays(COLUMNS), Object[].class);
+  }
+
+  @Test
+  public void testTable() throws Exception {
+    createAndLoadTableUbuntuReleases();
+
+    // RDBMS link
+    MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsLink);
+    saveLink(rdbmsLink);
+
+    // HDFS link
+    MLink hdfsLink = getClient().createLink("hdfs-connector");
+    saveLink(hdfsLink);
+
+    // Job creation
+    MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
+
+    // Set the rdbms "FROM" config
+    MConfigList fromConfig = job.getJobConfig(Direction.FROM);
+    fromConfig.getStringInput("fromJobConfig.tableName").setValue(provider.escapeTableName(getTableName()));
+    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+    fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn));
+    fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue);
+
+    // Fill hdfs "TO" config
+    fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+
+    saveJob(job);
+
+    executeJob(job);
+
+    // Assert correct output
+    assertTo(
+        "10,'Jaunty Jackalope',9.04,'2009-04-23',false",
+        "11,'Karmic Koala',9.10,'2009-10-29',false",
+        "12,'Lucid Lynx',10.04,'2010-04-29',true",
+        "13,'Maverick Meerkat',10.10,'2010-10-10',false",
+        "14,'Natty Narwhal',11.04,'2011-04-28',false",
+        "15,'Oneiric Ocelot',11.10,'2011-10-10',false",
+        "16,'Precise Pangolin',12.04,'2012-04-26',true",
+        "17,'Quantal Quetzal',12.10,'2012-10-18',false",
+        "18,'Raring Ringtail',13.04,'2013-04-25',false",
+        "19,'Saucy Salamander',13.10,'2013-10-17',false"
+      );
+
+    // TODO: After Sqoop will be properly updating configuration objects we need to verify
new max value
+
+    // Clean up testing table
+    dropTable();
+  }
+
+  @Test
+  public void testQuery() throws Exception {
+    createAndLoadTableUbuntuReleases();
+
+    // RDBMS link
+    MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
+    fillRdbmsLinkConfig(rdbmsLink);
+    saveLink(rdbmsLink);
+
+    // HDFS link
+    MLink hdfsLink = getClient().createLink("hdfs-connector");
+    saveLink(hdfsLink);
+
+    // Job creation
+    MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
+
+    String query = "SELECT * FROM " + provider.escapeTableName(getTableName()) + " WHERE
${CONDITIONS}";
+
+    // Set the rdbms "FROM" config
+    MConfigList fromConfig = job.getJobConfig(Direction.FROM);
+    fromConfig.getStringInput("fromJobConfig.sql").setValue(query);
+    fromConfig.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
+    fromConfig.getStringInput("incrementalRead.checkColumn").setValue(provider.escapeColumnName(checkColumn));
+    fromConfig.getStringInput("incrementalRead.lastValue").setValue(lastValue);
+
+    // Fill hdfs "TO" config
+    fillHdfsToConfig(job, ToFormat.TEXT_FILE);
+
+    saveJob(job);
+
+    executeJob(job);
+
+    // Assert correct output
+    assertTo(
+        "10,'Jaunty Jackalope',9.04,'2009-04-23',false",
+        "11,'Karmic Koala',9.10,'2009-10-29',false",
+        "12,'Lucid Lynx',10.04,'2010-04-29',true",
+        "13,'Maverick Meerkat',10.10,'2010-10-10',false",
+        "14,'Natty Narwhal',11.04,'2011-04-28',false",
+        "15,'Oneiric Ocelot',11.10,'2011-10-10',false",
+        "16,'Precise Pangolin',12.04,'2012-04-26',true",
+        "17,'Quantal Quetzal',12.10,'2012-10-18',false",
+        "18,'Raring Ringtail',13.04,'2013-04-25',false",
+        "19,'Saucy Salamander',13.10,'2013-10-17',false"
+      );
+
+    // TODO: After Sqoop will be properly updating configuration objects we need to verify
new max value
+
+    // Clean up testing table
+    dropTable();
+  }
+
+  private String testName;
+
+  @BeforeMethod(alwaysRun = true)
+  public void beforeMethod(Method aMethod) {
+    this.testName = aMethod.getName();
+  }
+
+  @Override
+  public String getTestName() {
+    return testName + "[" + checkColumn + "]";
+  }
+}


Mime
View raw message