sqoop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject git commit: SQOOP-1013: Sqoop2: Provide splitters for additional data types to Generic JDBC Connector
Date Thu, 13 Jun 2013 20:40:23 GMT
Updated Branches:
  refs/heads/sqoop2 6d1779a05 -> 84127409a


SQOOP-1013: Sqoop2: Provide splitters for additional data types to Generic JDBC Connector

(Venkat Ranganathan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/84127409
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/84127409
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/84127409

Branch: refs/heads/sqoop2
Commit: 84127409a64aeacc220f21566267cf775b60958f
Parents: 6d1779a
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Thu Jun 13 13:39:43 2013 -0700
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Thu Jun 13 13:39:43 2013 -0700

----------------------------------------------------------------------
 .../jdbc/GenericJdbcImportPartitioner.java      | 338 +++++++++++++++++--
 .../connector/jdbc/TestImportPartitioner.java   | 184 ++++++++++
 2 files changed, 499 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/84127409/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
index f80f30d..eef18f2 100644
--- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java
@@ -18,6 +18,9 @@
 package org.apache.sqoop.connector.jdbc;
 
 import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.LinkedList;
 import java.util.List;
@@ -33,6 +36,7 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
 
   private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);
 
+
   private long numberPartitions;
   private String partitionColumnName;
   private int partitionColumnType;
@@ -47,6 +51,14 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
     partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
     partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);
 
+    if (partitionMinValue == null && partitionMaxValue == null) {
+      List<Partition> partitions = new LinkedList<Partition>();
+      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+      partition.setConditions(partitionColumnName + "IS NULL");
+      partitions.add(partition);
+      return partitions;
+    }
+
     switch (partitionColumnType) {
     case Types.TINYINT:
     case Types.SMALLINT:
@@ -69,19 +81,19 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
     case Types.BIT:
     case Types.BOOLEAN:
       // Boolean column
-      // TODO: Add partition function
+      return partitionBooleanColumn();
 
     case Types.DATE:
     case Types.TIME:
     case Types.TIMESTAMP:
       // Date time column
-      // TODO: Add partition function
+      return partitionDateTimeColumn();
 
     case Types.CHAR:
     case Types.VARCHAR:
     case Types.LONGVARCHAR:
       // Text column
-      // TODO: Add partition function
+      return partitionTextColumn();
 
     default:
       throw new SqoopException(
@@ -89,18 +101,168 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
           String.valueOf(partitionColumnType));
     }
   }
+  protected List<Partition> partitionDateTimeColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
 
-  protected List<Partition> partitionIntegerColumn() {
+    long minDateValue = 0;
+    long maxDateValue = 0;
+
+    switch(partitionColumnType) {
+      case Types.DATE:
+        minDateValue = Date.valueOf(partitionMinValue).getTime();
+        maxDateValue = Date.valueOf(partitionMaxValue).getTime();
+        break;
+      case Types.TIME:
+        minDateValue = Time.valueOf(partitionMinValue).getTime();
+        maxDateValue = Time.valueOf(partitionMaxValue).getTime();
+        break;
+      case Types.TIMESTAMP:
+        minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
+        maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
+        break;
+    }
+    long interval =  (maxDateValue - minDateValue) / numberPartitions;
+    long remainder = (maxDateValue - minDateValue) % numberPartitions;
+
+    if (interval == 0) {
+      numberPartitions = (int)remainder;
+    }
+    long lowerBound;
+    long upperBound = minDateValue;
+
+    Object objLB = null;
+    Object objUB = null;
+
+    for (int i = 1; i < numberPartitions; i++) {
+      lowerBound = upperBound;
+      upperBound = lowerBound + interval;
+      upperBound += (i <= remainder) ? 1 : 0;
+
+      switch(partitionColumnType) {
+        case Types.DATE:
+          objLB = new Date(lowerBound);
+          objUB = new Date(upperBound);
+          break;
+        case Types.TIME:
+          objLB = new Time(lowerBound);
+          objUB = new Time(upperBound);
+          break;
+        case Types.TIMESTAMP:
+          objLB = new Timestamp(lowerBound);
+          objUB = new Timestamp(upperBound);
+          break;
+      }
+
+      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+      partition.setConditions(
+          constructDateConditions(objLB, objUB, false));
+      partitions.add(partition);
+    }
+    switch(partitionColumnType) {
+      case Types.DATE:
+        objLB = new Date(upperBound);
+        objUB = new Date(maxDateValue);
+        break;
+      case Types.TIME:
+        objLB = new Time(upperBound);
+        objUB = new Time(maxDateValue);
+        break;
+      case Types.TIMESTAMP:
+        objLB = new Timestamp(upperBound);
+        objUB = new Timestamp(maxDateValue);
+        break;
+    }
+    GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+    partition.setConditions(
+        constructDateConditions(objLB, objUB, true));
+    partitions.add(partition);
+    return partitions;
+  }
+
+  protected List<Partition> partitionTextColumn() {
     List<Partition> partitions = new LinkedList<Partition>();
 
-    if (partitionMinValue == null && partitionMaxValue == null) {
+    String minStringValue = null;
+    String maxStringValue = null;
+
+    // Remove common prefix if any as it does not affect outcome.
+    int maxPrefixLen = Math.min(partitionMinValue.length(),
+        partitionMaxValue.length());
+    // Calculate common prefix length
+    int cpLen = 0;
+
+    for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) {
+      char c1 = partitionMinValue.charAt(cpLen);
+      char c2 = partitionMaxValue.charAt(cpLen);
+      if (c1 != c2) {
+        break;
+      }
+    }
+
+    // The common prefix has length 'sharedLen'. Extract it from both.
+    String prefix = partitionMinValue.substring(0, cpLen);
+    minStringValue = partitionMinValue.substring(cpLen);
+    maxStringValue = partitionMaxValue.substring(cpLen);
+
+    BigDecimal minStringBD = textToBigDecimal(minStringValue);
+    BigDecimal maxStringBD = textToBigDecimal(maxStringValue);
+
+    // Having one single value means that we can create only one single split
+    if(minStringBD.equals(maxStringBD)) {
       GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(partitionColumnName + "IS NULL");
+      partition.setConditions(constructTextConditions(prefix, maxStringBD));
       partitions.add(partition);
       return partitions;
     }
 
-    long minValue = Long.parseLong(partitionMinValue);
+    // Get all the split points together.
+    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
+
+    BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
+        new BigDecimal(numberPartitions));
+    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
+      splitSize = NUMERIC_MIN_INCREMENT;
+    }
+
+    BigDecimal curVal = minStringBD;
+
+    while (curVal.compareTo(maxStringBD) <= 0) {
+      splitPoints.add(curVal);
+      curVal = curVal.add(splitSize);
+    }
+
+    if (splitPoints.size() == 0
+        || splitPoints.get(0).compareTo(minStringBD) != 0) {
+      splitPoints.add(0, minStringBD);
+    }
+
+    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
+        || splitPoints.size() == 1) {
+      splitPoints.add(maxStringBD);
+    }
+
+    // Turn the split points into a set of string intervals.
+    BigDecimal start = splitPoints.get(0);
+    for (int i = 1; i < splitPoints.size(); i++) {
+      BigDecimal end = splitPoints.get(i);
+
+      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+      partition.setConditions(constructTextConditions(prefix, start,
+          end, i == splitPoints.size() - 1));
+      partitions.add(partition);
+
+      start = end;
+    }
+
+    return partitions;
+  }
+
+
+  protected List<Partition> partitionIntegerColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
+
+    long minValue = partitionMinValue == null ? Long.MIN_VALUE
+      : Long.parseLong(partitionMinValue);
     long maxValue = Long.parseLong(partitionMaxValue);
 
     long interval =  (maxValue - minValue) / numberPartitions;
@@ -134,14 +296,9 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
   protected List<Partition> partitionFloatingPointColumn() {
     List<Partition> partitions = new LinkedList<Partition>();
 
-    if (partitionMinValue == null && partitionMaxValue == null) {
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(partitionColumnName + "IS NULL");
-      partitions.add(partition);
-      return partitions;
-    }
 
-    double minValue = Double.parseDouble(partitionMinValue);
+    double minValue = partitionMinValue == null ? Double.MIN_VALUE
+      : Double.parseDouble(partitionMinValue);
     double maxValue = Double.parseDouble(partitionMaxValue);
 
     double interval =  (maxValue - minValue) / numberPartitions;
@@ -168,15 +325,6 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
 
   protected List<Partition> partitionNumericColumn() {
     List<Partition> partitions = new LinkedList<Partition>();
-
-    // All null values will result in single partition
-    if (partitionMinValue == null && partitionMaxValue == null) {
-      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
-      partition.setConditions(partitionColumnName + "IS NULL");
-      partitions.add(partition);
-      return partitions;
-    }
-
     // Having one end in null is not supported
     if (partitionMinValue == null || partitionMaxValue == null) {
       throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
@@ -190,12 +338,14 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
       GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
       partition.setConditions(constructConditions(minValue));
       partitions.add(partition);
+      return partitions;
     }
 
     // Get all the split points together.
     List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();
 
     BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));
+
     if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
       splitSize = NUMERIC_MIN_INCREMENT;
     }
@@ -227,6 +377,60 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
     return partitions;
   }
 
+  protected  List<Partition> partitionBooleanColumn() {
+    List<Partition> partitions = new LinkedList<Partition>();
+
+
+    Boolean minValue = parseBooleanValue(partitionMinValue);
+    Boolean maxValue = parseBooleanValue(partitionMaxValue);
+
+    StringBuilder conditions = new StringBuilder();
+
+    // Having one single value means that we can create only one single split
+    if(minValue.equals(maxValue)) {
+      GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+
+      conditions.append(partitionColumnName).append(" = ")
+          .append(maxValue);
+      partition.setConditions(conditions.toString());
+      partitions.add(partition);
+      return partitions;
+    }
+
+    GenericJdbcImportPartition partition = new GenericJdbcImportPartition();
+
+    if (partitionMinValue == null) {
+      conditions = new StringBuilder();
+      conditions.append(partitionColumnName).append(" IS NULL");
+      partition.setConditions(conditions.toString());
+      partitions.add(partition);
+    }
+    partition = new GenericJdbcImportPartition();
+    conditions = new StringBuilder();
+    conditions.append(partitionColumnName).append(" = TRUE");
+    partition.setConditions(conditions.toString());
+    partitions.add(partition);
+    partition = new GenericJdbcImportPartition();
+    conditions = new StringBuilder();
+    conditions.append(partitionColumnName).append(" = FALSE");
+    partition.setConditions(conditions.toString());
+    partitions.add(partition);
+    return partitions;
+  }
+
+  private Boolean parseBooleanValue(String value) {
+    if (value == null) {
+      return null;
+    }
+    if (value.equals("1")) {
+      return Boolean.TRUE;
+    } else if (value.equals("0")) {
+      return Boolean.FALSE;
+    } else {
+      return Boolean.parseBoolean(value);
+    }
+  }
+
   protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
     try {
       return numerator.divide(denominator);
@@ -256,4 +460,92 @@ public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfigur
       .toString()
      ;
   }
+
+  protected String constructDateConditions(
+      Object lowerBound, Object upperBound, boolean lastOne) {
+    StringBuilder conditions = new StringBuilder();
+    conditions.append('\'').append(lowerBound.toString()).append('\'');
+    conditions.append(" <= ");
+    conditions.append(partitionColumnName);
+    conditions.append(" AND ");
+    conditions.append(partitionColumnName);
+    conditions.append(lastOne ? " <= " : " < ");
+    conditions.append('\'').append(upperBound.toString()).append('\'');
+    return conditions.toString();
+  }
+
+  protected String constructTextConditions(String prefix,
+      Object lowerBound, Object upperBound, boolean lastOne) {
+    StringBuilder conditions = new StringBuilder();
+    String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
+    String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
+    conditions.append('\'').append(lbString).append('\'');
+    conditions.append(" <= ");
+    conditions.append(partitionColumnName);
+    conditions.append(" AND ");
+    conditions.append(partitionColumnName);
+    conditions.append(lastOne ? " <= " : " < ");
+    conditions.append('\'').append(ubString).append('\'');
+    return conditions.toString();
+  }
+
+  protected String constructTextConditions(String prefix, Object value) {
+    return new StringBuilder()
+      .append(partitionColumnName)
+      .append(" = ").append('\'')
+      .append(prefix + bigDecimalToText((BigDecimal)value))
+      .append('\'').toString()
+     ;
+  }
+
+
+  /**
+   *  Converts a string to a BigDecimal representation in Base 2^21 format.
+   *  The maximum Unicode code point value defined is 10FFFF.  Although
+   *  not all database system support UTF16 and mostly we expect UCS2
+   *  characters only, for completeness, we assume that all the unicode
+   *  characters are supported.
+   *  Given a string 's' containing characters s_0, s_1,..s_n,
+   *  the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48)
+   *  This can be split and each split point can be converted back to
+   *  a string value for comparison purposes.   The number of characters
+   *  is restricted to prevent repeating fractions and rounding errors
+   *  towards the higher fraction positions.
+   */
+  private static final BigDecimal UNITS_BASE = new BigDecimal(2097152);
+  private static final int MAX_CHARS_TO_CONVERT = 4;
+
+  private BigDecimal textToBigDecimal(String str) {
+    BigDecimal result = BigDecimal.ZERO;
+    BigDecimal divisor = UNITS_BASE;
+
+    int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT);
+
+    for (int n = 0; n < len; ) {
+      int codePoint = str.codePointAt(n);
+      n += Character.charCount(codePoint);
+      BigDecimal val = divide(new BigDecimal(codePoint), divisor);
+      result = result.add(val);
+      divisor = divisor.multiply(UNITS_BASE);
+    }
+
+    return result;
+  }
+
+  private String bigDecimalToText(BigDecimal bd) {
+    BigDecimal curVal = bd.stripTrailingZeros();
+    StringBuilder sb = new StringBuilder();
+
+    for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) {
+      curVal = curVal.multiply(UNITS_BASE);
+      int cp = curVal.intValue();
+      if (0 == cp) {
+        break;
+      }
+      curVal = curVal.subtract(new BigDecimal(cp));
+      sb.append(Character.toChars(cp));
+    }
+    return sb.toString();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/84127409/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
index ee314d0..522a515 100644
--- a/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
+++ b/connector/connector-generic-jdbc/src/test/java/org/apache/sqoop/connector/jdbc/TestImportPartitioner.java
@@ -18,6 +18,9 @@
 package org.apache.sqoop.connector.jdbc;
 
 import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.Iterator;
 import java.util.List;
@@ -26,6 +29,7 @@ import junit.framework.TestCase;
 
 import org.apache.sqoop.common.MutableContext;
 import org.apache.sqoop.common.MutableMapContext;
+import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration;
 import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration;
 import org.apache.sqoop.job.Constants;
@@ -257,6 +261,186 @@ public class TestImportPartitioner extends TestCase {
     });
   }
 
+
+  public void testDatePartition() throws Exception {
+    MutableContext context = new MutableMapContext();
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME,
"DCOL");
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.DATE));
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
+        Date.valueOf("2013-01-01").toString());
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_MAXVALUE, Date.valueOf("2013-12-31").toString());
+
+
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    Partitioner partitioner = new GenericJdbcImportPartitioner();
+    PartitionerContext partitionerContext = new PartitionerContext(context,
+        3);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf,
jobConf);
+    verifyResult(partitions, new String[]{
+        "'2013-01-01' <= DCOL AND DCOL < '2013-05-02'",
+        "'2013-05-02' <= DCOL AND DCOL < '2013-08-31'",
+        "'2013-08-31' <= DCOL AND DCOL <= '2013-12-31'",
+    });
+
+  }
+
+  public void testTimePartition() throws Exception {
+    MutableContext context = new MutableMapContext();
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TCOL");
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIME));
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
+        Time.valueOf("01:01:01").toString());
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
+        Time.valueOf("10:40:50").toString());
+
+
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    Partitioner partitioner = new GenericJdbcImportPartitioner();
+    PartitionerContext partitionerContext = new PartitionerContext(context,
+        3);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf,
jobConf);
+    verifyResult(partitions, new String[]{
+        "'01:01:01' <= TCOL AND TCOL < '04:14:17'",
+        "'04:14:17' <= TCOL AND TCOL < '07:27:33'",
+        "'07:27:33' <= TCOL AND TCOL <= '10:40:50'",
+    });
+  }
+
+  public void testTimestampPartition() throws Exception {
+    MutableContext context = new MutableMapContext();
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "TSCOL");
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.TIMESTAMP));
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE,
+        Timestamp.valueOf("2013-01-01 01:01:01.123").toString());
+    context.setString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE,
+        Timestamp.valueOf("2013-12-31 10:40:50.654").toString());
+
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    Partitioner partitioner = new GenericJdbcImportPartitioner();
+    PartitionerContext partitionerContext = new PartitionerContext(context,
+        3);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf,
jobConf);
+    verifyResult(partitions, new String[]{
+        "'2013-01-01 01:01:01.123' <= TSCOL AND TSCOL < '2013-05-02 13:14:17.634'",
+        "'2013-05-02 13:14:17.634' <= TSCOL AND TSCOL < '2013-09-01 00:27:34.144'",
+        "'2013-09-01 00:27:34.144' <= TSCOL AND TSCOL <= '2013-12-31 10:40:50.654'",
+    });
+  }
+
+  public void testBooleanPartition() throws Exception {
+    MutableContext context = new MutableMapContext();
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "BCOL");
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.BOOLEAN));
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_MINVALUE, "0");
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_MAXVALUE, "1");
+
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    Partitioner partitioner = new GenericJdbcImportPartitioner();
+    PartitionerContext partitionerContext = new PartitionerContext(context,
+        3);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf,
jobConf);
+    verifyResult(partitions, new String[]{
+      "BCOL = TRUE",
+      "BCOL = FALSE",
+    });
+  }
+
+  public void testVarcharPartition() throws Exception {
+    MutableContext context = new MutableMapContext();
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_MINVALUE, "A");
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_MAXVALUE, "Z");
+
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    Partitioner partitioner = new GenericJdbcImportPartitioner();
+    PartitionerContext partitionerContext = new PartitionerContext(context,
+        25);
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf,
jobConf);
+
+    verifyResult(partitions, new String[] {
+        "'A' <= VCCOL AND VCCOL < 'B'",
+        "'B' <= VCCOL AND VCCOL < 'C'",
+        "'C' <= VCCOL AND VCCOL < 'D'",
+        "'D' <= VCCOL AND VCCOL < 'E'",
+        "'E' <= VCCOL AND VCCOL < 'F'",
+        "'F' <= VCCOL AND VCCOL < 'G'",
+        "'G' <= VCCOL AND VCCOL < 'H'",
+        "'H' <= VCCOL AND VCCOL < 'I'",
+        "'I' <= VCCOL AND VCCOL < 'J'",
+        "'J' <= VCCOL AND VCCOL < 'K'",
+        "'K' <= VCCOL AND VCCOL < 'L'",
+        "'L' <= VCCOL AND VCCOL < 'M'",
+        "'M' <= VCCOL AND VCCOL < 'N'",
+        "'N' <= VCCOL AND VCCOL < 'O'",
+        "'O' <= VCCOL AND VCCOL < 'P'",
+        "'P' <= VCCOL AND VCCOL < 'Q'",
+        "'Q' <= VCCOL AND VCCOL < 'R'",
+        "'R' <= VCCOL AND VCCOL < 'S'",
+        "'S' <= VCCOL AND VCCOL < 'T'",
+        "'T' <= VCCOL AND VCCOL < 'U'",
+        "'U' <= VCCOL AND VCCOL < 'V'",
+        "'V' <= VCCOL AND VCCOL < 'W'",
+        "'W' <= VCCOL AND VCCOL < 'X'",
+        "'X' <= VCCOL AND VCCOL < 'Y'",
+        "'Y' <= VCCOL AND VCCOL <= 'Z'",
+    });
+  }
+
+  public void testVarcharPartitionWithCommonPrefix() throws Exception {
+    MutableContext context = new MutableMapContext();
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNNAME, "VCCOL");
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_COLUMNTYPE, String.valueOf(Types.VARCHAR));
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_MINVALUE, "AAA");
+    context.setString(GenericJdbcConnectorConstants
+        .CONNECTOR_JDBC_PARTITION_MAXVALUE, "AAF");
+
+    ConnectionConfiguration connConf = new ConnectionConfiguration();
+    ImportJobConfiguration jobConf = new ImportJobConfiguration();
+
+    Partitioner partitioner = new GenericJdbcImportPartitioner();
+    PartitionerContext partitionerContext = new PartitionerContext(context,
+        5);
+
+    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connConf,
jobConf);
+
+    verifyResult(partitions, new String[] {
+        "'AAA' <= VCCOL AND VCCOL < 'AAB'",
+        "'AAB' <= VCCOL AND VCCOL < 'AAC'",
+        "'AAC' <= VCCOL AND VCCOL < 'AAD'",
+        "'AAD' <= VCCOL AND VCCOL < 'AAE'",
+        "'AAE' <= VCCOL AND VCCOL <= 'AAF'",
+    });
+
+  }
+
   private void verifyResult(List<Partition> partitions,
       String[] expected) {
     assertEquals(expected.length, partitions.size());


Mime
View raw message