sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject [sis] 03/45: feat(SQL-Store): add support for limit, offset and distinct operations via SQL calls through java.util.Stream API
Date Tue, 12 Nov 2019 16:44:30 GMT
This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/sql-store
in repository https://gitbox.apache.org/repos/asf/sis.git

commit 2e4946ccb239188cca7b3012588bac3e22a82ef5
Author: Alexis Manin <amanin@apache.org>
AuthorDate: Tue Aug 20 17:55:56 2019 +0200

    feat(SQL-Store): add support for limit, offset and distinct operations via SQL calls through
java.util.Stream API
    
    Still need to allow mixing map operations though
---
 .../sis/internal/metadata/sql/SQLBuilder.java      |  11 ++
 ...{DecoratedStream.java => StreamDecoration.java} |  94 ++++++-----
 .../java/org/apache/sis/util/ArgumentChecks.java   |  30 +++-
 .../apache/sis/internal/sql/feature/ColumnRef.java |  60 +++++++
 .../apache/sis/internal/sql/feature/Features.java  | 176 +++++++++++++++++----
 .../sis/internal/sql/feature/SpatialFunctions.java |  22 +--
 .../apache/sis/internal/sql/feature/StreamSQL.java | 144 +++++++++++++++++
 .../org/apache/sis/internal/sql/feature/Table.java | 110 +++----------
 8 files changed, 471 insertions(+), 176 deletions(-)

diff --git a/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java
b/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java
index c4ab4c4..1cb7e8a 100644
--- a/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java
+++ b/core/sis-metadata/src/main/java/org/apache/sis/internal/metadata/sql/SQLBuilder.java
@@ -121,6 +121,17 @@ public class SQLBuilder {
     }
 
     /**
+     * Appends the given long.
+     *
+     * @param  n  the long to append.
+     * @return this builder, for method call chaining.
+     */
+    public final SQLBuilder append(final long n) {
+        buffer.append(n);
+        return this;
+    }
+
+    /**
      * Appends the given character.
      *
      * @param  c  the character to append.
diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java
b/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java
similarity index 63%
rename from core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java
rename to core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java
index eb398d9..53637b2 100644
--- a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java
+++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java
@@ -21,223 +21,219 @@ import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 import java.util.stream.Stream;
 
-public class DecoratedStream<T> implements Stream<T> {
-
-    final Stream<T> source;
-
-    protected DecoratedStream(Stream<T> source) {
-        this.source = source;
-    }
+public abstract class StreamDecoration<T> implements Stream<T> {
 
     @Override
     public Stream<T> filter(Predicate<? super T> predicate) {
-        return source.filter(predicate);
+        return getDecoratedStream().filter(predicate);
     }
 
     @Override
     public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
-        return source.map(mapper);
+        return getDecoratedStream().map(mapper);
     }
 
     @Override
     public IntStream mapToInt(ToIntFunction<? super T> mapper) {
-        return source.mapToInt(mapper);
+        return getDecoratedStream().mapToInt(mapper);
     }
 
     @Override
     public LongStream mapToLong(ToLongFunction<? super T> mapper) {
-        return source.mapToLong(mapper);
+        return getDecoratedStream().mapToLong(mapper);
     }
 
     @Override
     public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
-        return source.mapToDouble(mapper);
+        return getDecoratedStream().mapToDouble(mapper);
     }
 
     @Override
     public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<?
extends R>> mapper) {
-        return source.flatMap(mapper);
+        return getDecoratedStream().flatMap(mapper);
     }
 
     @Override
     public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper)
{
-        return source.flatMapToInt(mapper);
+        return getDecoratedStream().flatMapToInt(mapper);
     }
 
     @Override
     public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper)
{
-        return source.flatMapToLong(mapper);
+        return getDecoratedStream().flatMapToLong(mapper);
     }
 
     @Override
     public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream>
mapper) {
-        return source.flatMapToDouble(mapper);
+        return getDecoratedStream().flatMapToDouble(mapper);
     }
 
     @Override
     public Stream<T> distinct() {
-        return source.distinct();
+        return getDecoratedStream().distinct();
     }
 
     @Override
     public Stream<T> sorted() {
-        return source.sorted();
+        return getDecoratedStream().sorted();
     }
 
     @Override
     public Stream<T> sorted(Comparator<? super T> comparator) {
-        return source.sorted(comparator);
+        return getDecoratedStream().sorted(comparator);
     }
 
     @Override
     public Stream<T> peek(Consumer<? super T> action) {
-        return source.peek(action);
+        return getDecoratedStream().peek(action);
     }
 
     @Override
     public Stream<T> limit(long maxSize) {
-        return source.limit(maxSize);
+        return getDecoratedStream().limit(maxSize);
     }
 
     @Override
     public Stream<T> skip(long n) {
-        return source.skip(n);
+        return getDecoratedStream().skip(n);
     }
 
 /*
     @Override
     public Stream<T> takeWhile(Predicate<? super T> predicate) {
-        return source.takeWhile(predicate);
+        return getDecoratedStream().takeWhile(predicate);
     }
 
     @Override
     public Stream<T> dropWhile(Predicate<? super T> predicate) {
-        return source.dropWhile(predicate);
+        return getDecoratedStream().dropWhile(predicate);
     }
 */
 
     @Override
     public void forEach(Consumer<? super T> action) {
-        source.forEach(action);
+        getDecoratedStream().forEach(action);
     }
 
     @Override
     public void forEachOrdered(Consumer<? super T> action) {
-        source.forEachOrdered(action);
+        getDecoratedStream().forEachOrdered(action);
     }
 
     @Override
     public Object[] toArray() {
-        return source.toArray();
+        return getDecoratedStream().toArray();
     }
 
     @Override
     public <A> A[] toArray(IntFunction<A[]> generator) {
-        return source.toArray(generator);
+        return getDecoratedStream().toArray(generator);
     }
 
     @Override
     public T reduce(T identity, BinaryOperator<T> accumulator) {
-        return source.reduce(identity, accumulator);
+        return getDecoratedStream().reduce(identity, accumulator);
     }
 
     @Override
     public Optional<T> reduce(BinaryOperator<T> accumulator) {
-        return source.reduce(accumulator);
+        return getDecoratedStream().reduce(accumulator);
     }
 
     @Override
     public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner) {
-        return source.reduce(identity, accumulator, combiner);
+        return getDecoratedStream().reduce(identity, accumulator, combiner);
     }
 
     @Override
     public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T>
accumulator, BiConsumer<R, R> combiner) {
-        return source.collect(supplier, accumulator, combiner);
+        return getDecoratedStream().collect(supplier, accumulator, combiner);
     }
 
     @Override
     public <R, A> R collect(Collector<? super T, A, R> collector) {
-        return source.collect(collector);
+        return getDecoratedStream().collect(collector);
     }
 
     @Override
     public Optional<T> min(Comparator<? super T> comparator) {
-        return source.min(comparator);
+        return getDecoratedStream().min(comparator);
     }
 
     @Override
     public Optional<T> max(Comparator<? super T> comparator) {
-        return source.max(comparator);
+        return getDecoratedStream().max(comparator);
     }
 
     @Override
     public long count() {
-        return source.count();
+        return getDecoratedStream().count();
     }
 
     @Override
     public boolean anyMatch(Predicate<? super T> predicate) {
-        return source.anyMatch(predicate);
+        return getDecoratedStream().anyMatch(predicate);
     }
 
     @Override
     public boolean allMatch(Predicate<? super T> predicate) {
-        return source.allMatch(predicate);
+        return getDecoratedStream().allMatch(predicate);
     }
 
     @Override
     public boolean noneMatch(Predicate<? super T> predicate) {
-        return source.noneMatch(predicate);
+        return getDecoratedStream().noneMatch(predicate);
     }
 
     @Override
     public Optional<T> findFirst() {
-        return source.findFirst();
+        return getDecoratedStream().findFirst();
     }
 
     @Override
     public Optional<T> findAny() {
-        return source.findAny();
+        return getDecoratedStream().findAny();
     }
 
     @Override
     public Iterator<T> iterator() {
-        return source.iterator();
+        return getDecoratedStream().iterator();
     }
 
     @Override
     public Spliterator<T> spliterator() {
-        return source.spliterator();
+        return getDecoratedStream().spliterator();
     }
 
     @Override
     public boolean isParallel() {
-        return source.isParallel();
+        return getDecoratedStream().isParallel();
     }
 
     @Override
     public Stream<T> sequential() {
-        return source.sequential();
+        return getDecoratedStream().sequential();
     }
 
     @Override
     public Stream<T> parallel() {
-        return source.parallel();
+        return getDecoratedStream().parallel();
     }
 
     @Override
     public Stream<T> unordered() {
-        return source.unordered();
+        return getDecoratedStream().unordered();
     }
 
     @Override
     public Stream<T> onClose(Runnable closeHandler) {
-        return source.onClose(closeHandler);
+        return getDecoratedStream().onClose(closeHandler);
     }
 
     @Override
     public void close() {
-        source.close();
+        getDecoratedStream().close();
     }
+
+    protected abstract Stream<T> getDecoratedStream();
 }
diff --git a/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java b/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java
index 0c22f0f..a4e3146 100644
--- a/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java
+++ b/core/sis-utility/src/main/java/org/apache/sis/util/ArgumentChecks.java
@@ -16,15 +16,17 @@
  */
 package org.apache.sis.util;
 
-import java.util.Map;                                               // For javadoc
 import java.util.BitSet;
-import org.opengis.referencing.cs.CoordinateSystem;
-import org.opengis.referencing.crs.CoordinateReferenceSystem;
-import org.opengis.geometry.Envelope;
+import java.util.Collection;
+import java.util.Map;
+
 import org.opengis.geometry.DirectPosition;
+import org.opengis.geometry.Envelope;
 import org.opengis.geometry.MismatchedDimensionException;
-import org.apache.sis.internal.util.Strings;
+import org.opengis.referencing.crs.CoordinateReferenceSystem;
+import org.opengis.referencing.cs.CoordinateSystem;
 
+import org.apache.sis.internal.util.Strings;
 import org.apache.sis.util.resources.Errors;
 
 
@@ -190,6 +192,24 @@ public final class ArgumentChecks extends Static {
     }
 
     /**
+     * Makes sure that given collection is non-null and non-empty. If it is null, then a
{@link NullArgumentException}
+     * is thrown. Otherwise if it {@link Collection#isEmpty() is empty}, then an {@link IllegalArgumentException}
is thrown.
+     *
+     * @param name the name of the argument to be checked. Used only if an exception is thrown.
+     * @param toCheck the user argument to check against null value and empty collection.
+     * @throws NullArgumentException if {@code toCheck} is null.
+     * @throws IllegalArgumentException if {@code toCheck} is empty.
+     */
+    public static void ensureNonEmpty(final String name, final Collection<?> toCheck)
{
+        if (toCheck == null) {
+            throw new NullArgumentException(Errors.format(Errors.Keys.NullArgument_1, name));
+        }
+        if (toCheck.isEmpty()) {
+            throw new IllegalArgumentException(Errors.format(Errors.Keys.EmptyArgument_1,
name));
+        }
+    }
+
+    /**
      * Ensures that the given {@code values} array is non-null and non-empty. This method
can also ensures that all values
      * are between the given bounds (inclusive) and are distinct. The distinct values requirement
is useful for validating
      * arrays of spatiotemporal dimension indices, where dimensions can not be repeated.
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java
new file mode 100644
index 0000000..f2d229d
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java
@@ -0,0 +1,60 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.util.Objects;
+
+import org.apache.sis.internal.metadata.sql.SQLBuilder;
+
+import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
+
+/**
+ * A column reference. Specify name of the column, and optionally an alias to use for public
visibility.
+ * By default, column has no alias. To create a column with an alias, use {@code ColumnRef
myCol = new ColumnRef("colName).as("myAlias");}
+ */
+public final class ColumnRef {
+    final String name;
+    final String alias;
+    final String attrName;
+
+    public ColumnRef(String name) {
+        ensureNonNull("Column name", name);
+        this.name = this.attrName = name;
+        alias = null;
+    }
+
+    private ColumnRef(final String name, final String alias) {
+        ensureNonNull("Column alias", alias);
+        this.name = name;
+        this.alias = this.attrName = alias;
+    }
+
+    ColumnRef as(final String alias) {
+        return new ColumnRef(name, alias);
+    }
+
+    SQLBuilder append(final SQLBuilder target) {
+        target.appendIdentifier(name);
+        if (alias != null) {
+            target.append(" AS ").appendIdentifier(alias);
+        }
+
+        return target;
+    }
+
+    public String getAttributeName() {
+        return attrName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof ColumnRef)) return false;
+        ColumnRef columnRef = (ColumnRef) o;
+        return name.equals(columnRef.name) &&
+                Objects.equals(alias, columnRef.alias);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, alias);
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
index 57e9c1f..0934bf7 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
@@ -16,29 +16,37 @@
  */
 package org.apache.sis.internal.sql.feature;
 
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Collection;
-import java.util.Spliterator;
-import java.util.function.Consumer;
+import java.lang.reflect.Array;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
-import java.sql.Statement;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.lang.reflect.Array;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.opengis.feature.Feature;
+import org.opengis.feature.FeatureType;
+
 import org.apache.sis.internal.metadata.sql.SQLBuilder;
+import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.storage.InternalDataStoreException;
+import org.apache.sis.util.ArraysExt;
 import org.apache.sis.util.collection.BackingStoreException;
 import org.apache.sis.util.collection.WeakValueHashMap;
-import org.apache.sis.util.ArraysExt;
+
+import static org.apache.sis.util.ArgumentChecks.ensureNonEmpty;
+import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
 
 // Branch-dependent imports
-import org.opengis.feature.Feature;
-import org.opengis.feature.FeatureType;
 
 
 /**
@@ -141,25 +149,45 @@ final class Features implements Spliterator<Feature>, Runnable
{
 
     /**
      * Creates a new iterator over the feature instances.
+     * TODO: This object is far too complicated. A builder of some sort should be used. We
should even consider a
+     * third-party tool like JOOQ, which is a great abstraction for SQL query building.
      *
      * @param table             the table for which we are creating an iterator.
      * @param connection        connection to the database.
-     * @param attributeNames    value of {@link Table#attributeNames}:   where to store simple
values.
-     * @param attributeColumns  value of {@link Table#attributeColumns}: often the same as
attribute names.
-     * @param importedKeys      value of {@link Table#importedKeys}:     targets of this
table foreign keys.
-     * @param exportedKeys      value of {@link Table#exportedKeys}:     foreigner keys of
other tables.
+     * @param columns           Names of columns to read, along with an eventual alias. The
alias (or name if no alias
+     *                          is provided) must match a property name in output feature
type.
      * @param following         the relations that we are following. Used for avoiding never
ending loop.
      * @param noFollow          relation to not follow, or {@code null} if none.
+     * @param distinct          True if we should return only distinct result, false otherwise.
+     * @param offset            An offset (nuber of rows to skip) in underlying SQL query.
A negative or zeero value
+     *                          means no offset will be set.
+     * @param limit             Maximum number of rows to return. Corresponds to a LIMIT
statement in underlying SQL
+     *                          query. A negative or 0 value means no limit will be set.
      */
-    Features(final Table table, final Connection connection, final String[] attributeNames,
final String[] attributeColumns,
-             final Relation[] importedKeys, final Relation[] exportedKeys, final List<Relation>
following, final Relation noFollow)
+    Features(final Table table, final Connection connection, final Collection<ColumnRef>
columns,
+             final List<Relation> following, final Relation noFollow,
+             boolean distinct, final long offset, final long limit)
              throws SQLException, InternalDataStoreException
     {
+        ensureNonEmpty("Columns to fetch", columns);
+        String[] attributeColumns = new String[columns.size()];
+        attributeNames = new String[attributeColumns.length];
+        int i = 0;
+        for (ColumnRef column : columns) {
+            attributeColumns[i] = column.name;
+            attributeNames[i++] = column.getAttributeName();
+        }
         this.featureType = table.featureType;
-        this.attributeNames = attributeNames;
         final DatabaseMetaData metadata = connection.getMetaData();
         estimatedSize = following.isEmpty() ? table.countRows(metadata, true) : 0;
+        /*
+         * Create a SELECT clause with all columns that are ordinary attributes. Order matter,
since 'Features'
+         * iterator will map the columns to the attributes listed in the 'attributeNames'
array in that order.
+         * Moreover, we optionaly add a "distinct" clause on user request.
+         */
         final SQLBuilder sql = new SQLBuilder(metadata, true).append("SELECT");
+        if (distinct) sql.append(" DISTINCT");
+
         final Map<String,Integer> columnIndices = new HashMap<>();
         /*
          * Create a SELECT clause with all columns that are ordinary attributes.
@@ -169,12 +197,13 @@ final class Features implements Spliterator<Feature>, Runnable
{
         for (String column : attributeColumns) {
             appendColumn(sql, column, columnIndices);
         }
+
         /*
          * Collect information about associations in local arrays before to assign
          * them to the final fields, because some array lengths may be adjusted.
          */
-        int importCount = (importedKeys != null) ? importedKeys.length : 0;
-        int exportCount = (exportedKeys != null) ? exportedKeys.length : 0;
+        int importCount = (table.importedKeys != null) ? table.importedKeys.length : 0;
+        int exportCount = (table.exportedKeys != null) ? table.exportedKeys.length : 0;
         int totalCount  = importCount + exportCount;
         if (totalCount == 0) {
             dependencies        = EMPTY;
@@ -192,7 +221,7 @@ final class Features implements Spliterator<Feature>, Runnable {
              */
             if (importCount != 0) {
                 importCount = 0;                                                    // We
will recount.
-                for (final Relation dependency : importedKeys) {
+                for (final Relation dependency : table.importedKeys) {
                     if (dependency != noFollow) {
                         dependency.startFollowing(following);                       // Safety
against never-ending recursivity.
                         associationNames   [importCount] = dependency.propertyName;
@@ -212,8 +241,8 @@ final class Features implements Spliterator<Feature>, Runnable {
              * associations we need to iterate over all "Parks" rows referencing the city.
              */
             if (exportCount != 0) {
-                int i = importCount;
-                for (final Relation dependency : exportedKeys) {
+                i = importCount;
+                for (final Relation dependency : table.exportedKeys) {
                     dependency.startFollowing(following);                   // Safety against
never-ending recursivity.
                     final Table foreigner  = dependency.getSearchTable();
                     final Relation inverse = foreigner.getInverseOf(dependency, table.name);
@@ -241,7 +270,13 @@ final class Features implements Spliterator<Feature>, Runnable
{
             statement = null;
             instances = null;       // A future SIS version could use the map opportunistically
if it exists.
             keyComponentClass = null;
-            result = connection.createStatement().executeQuery(sql.toString());
+            addOffsetLimit(sql, offset, limit);
+            final Statement statement = connection.createStatement();
+            /* Why this parameter ? See: https://gitlab.geomatys.com/geomatys-group/knowledge-base/wikis/cookbook/jdbc
+             * TODO : allow parameterization ?
+             */
+            statement.setFetchSize(100);
+            result = statement.executeQuery(sql.toString());
         } else {
             final Relation componentOf = following.get(following.size() - 1);
             String separator = " WHERE ";
@@ -249,6 +284,7 @@ final class Features implements Spliterator<Feature>, Runnable {
                 sql.append(separator).appendIdentifier(primaryKey).append("=?");
                 separator = " AND ";
             }
+            addOffsetLimit(sql, offset, limit);
             statement = connection.prepareStatement(sql.toString());
             /*
              * Following assumes that the foreigner key references the primary key of this
table,
@@ -266,11 +302,22 @@ final class Features implements Spliterator<Feature>, Runnable
{
     }
 
     /**
+     * If a limit or an offset is appended, a space will be added beforehand to the given
builder.
+     * @param toEdit The builder to add offset and limit to.
+     * @param offset The offset to use. If  zero or negative, it will be ignored.
+     * @param limit the value for limit parameter. If  zero or negative, it will be ignored.
+     */
+    private static void addOffsetLimit(final SQLBuilder toEdit, final long offset, final
long limit) {
+        if (limit > 0) toEdit.append(" LIMIT ").append(limit);
+        if (offset > 0) toEdit.append(" OFFSET ").append(offset);
+    }
+
+    /**
      * Appends a columns in the given builder and remember the column indices.
      * An exception is thrown if the column has already been added (should never happen).
      */
     private static int appendColumn(final SQLBuilder sql, final String column,
-            final Map<String,Integer> columnIndices) throws InternalDataStoreException
+                                    final Map<String,Integer> columnIndices) throws
InternalDataStoreException
     {
         int columnCount = columnIndices.size();
         if (columnCount != 0) sql.append(',');
@@ -283,8 +330,7 @@ final class Features implements Spliterator<Feature>, Runnable {
      * Computes the 1-based indices of given columns, adding the columns in the given builder
if necessary.
      */
     private static int[] getColumnIndices(final SQLBuilder sql, final Collection<String>
columns,
-            final Map<String,Integer> columnIndices) throws InternalDataStoreException
-    {
+                                          final Map<String,Integer> columnIndices)
throws InternalDataStoreException {
         int i = 0;
         final int[] indices = new int[columns.size()];
         for (final String column : columns) {
@@ -496,4 +542,78 @@ final class Features implements Spliterator<Feature>, Runnable
{
             throw new BackingStoreException(e);
         }
     }
+
+    /**
+     * Useful to customiez value retrieval on result sets. Example:
+     * {@code
+     * SQLBiFunction<ResultSet, Integer, Integer> get = ResultSet::getInt;
+     * }
+     * @param <T>
+     * @param <U>
+     * @param <R>
+     */
+    @FunctionalInterface
+    interface SQLBiFunction<T, U, R> {
+        R apply(T t, U u) throws SQLException;
+
+        /**
+         * Returns a composed function that first applies this function to
+         * its input, and then applies the {@code after} function to the result.
+         * If evaluation of either function throws an exception, it is relayed to
+         * the caller of the composed function.
+         *
+         * @param <V> the type of output of the {@code after} function, and of the
+         *           composed function
+         * @param after the function to apply after this function is applied
+         * @return a composed function that first applies this function and then
+         * applies the {@code after} function
+         * @throws NullPointerException if after is null
+         */
+        default <V> SQLBiFunction<T, U, V> andThen(Function<? super R, ? extends
V> after) {
+            ensureNonNull("After function", after);
+            return (T t, U u) -> after.apply(apply(t, u));
+        }
+    }
+
+    static class Builder {
+
+        final Table parent;
+        long limit, offset;
+        boolean distinct;
+
+        Builder(Table parent) {
+            this.parent = parent;
+        }
+
+        /**
+         * Warning : This does not work with relations. It is only a rough estimation of
the parameterized query.
+         * @param count True if a count query must be generated. False for a simple selection.
+         * @return A text representing (roughly) the SQL query which will be posted.
+         * @throws SQLException If we cannot initialize an sql statement builder.
+         */
+        String getSnapshot(final boolean count) throws SQLException {
+            final SQLBuilder sql = new SQLBuilder(parent.dbMeta, true).append("SELECT ");
+            if (count) sql.append("COUNT(");
+            if (distinct) sql.append("DISTINCT ");
+            // If we want a count and no distinct clause is specified, we can query it for
a single column.
+            if (count && !distinct) sql.appendIdentifier(parent.attributes.get(0).name);
+            else {
+                final Iterator<ColumnRef> it = parent.attributes.iterator();
+                sql.appendIdentifier(it.next().name);
+                while (it.hasNext()) {
+                    sql.append(',').appendIdentifier(it.next().name);
+                }
+            }
+
+            if (count) sql.append(')');
+            sql.append(" FROM ").appendIdentifier(parent.name.catalog, parent.name.schema,
parent.name.table);
+            addOffsetLimit(sql, offset, limit);
+
+            return sql.toString();
+        }
+
+        Features build(final Connection conn) throws SQLException, DataStoreException {
+            return new Features(parent, conn, parent.attributes, new ArrayList<>(),
null, distinct, offset, limit);
+        }
+    }
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java
index 7ef4e90..d01dbdf 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java
@@ -17,16 +17,18 @@
 package org.apache.sis.internal.sql.feature;
 
 import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.time.LocalDateTime;
-import java.time.OffsetTime;
-import java.time.OffsetDateTime;
-import java.sql.Types;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.DatabaseMetaData;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.time.OffsetDateTime;
+import java.time.OffsetTime;
+
 import org.opengis.referencing.crs.CoordinateReferenceSystem;
+
 import org.apache.sis.internal.metadata.sql.Reflection;
 import org.apache.sis.setup.GeometryLibrary;
 
@@ -112,9 +114,9 @@ class SpatialFunctions {
             case Types.CHAR:
             case Types.VARCHAR:
             case Types.LONGVARCHAR:             return String.class;
-            case Types.DATE:                    return LocalDate.class;
-            case Types.TIME:                    return LocalTime.class;
-            case Types.TIMESTAMP:               return LocalDateTime.class;
+            case Types.DATE:                    return Date.class;
+            case Types.TIME:                    return Time.class;
+            case Types.TIMESTAMP:               return Timestamp.class;
             case Types.TIME_WITH_TIMEZONE:      return OffsetTime.class;
             case Types.TIMESTAMP_WITH_TIMEZONE: return OffsetDateTime.class;
             case Types.BINARY:
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
new file mode 100644
index 0000000..7e948a3
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
@@ -0,0 +1,144 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.ToDoubleFunction;
+import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.opengis.feature.Feature;
+
+import org.apache.sis.internal.util.StreamDecoration;
+import org.apache.sis.storage.DataStoreException;
+import org.apache.sis.util.collection.BackingStoreException;
+
+class StreamSQL extends StreamDecoration<Feature> {
+
+    final Features.Builder queryBuilder;
+    boolean parallel;
+
+    StreamSQL(final Table source) {
+        this(new Features.Builder(source));
+    }
+
+    StreamSQL(Features.Builder builder) {
+        this.queryBuilder = builder;
+    }
+
+    @Override
+    public <R> Stream<R> map(Function<? super Feature, ? extends R> mapper)
{
+        return super.map(mapper);
+    }
+
+    @Override
+    public IntStream mapToInt(ToIntFunction<? super Feature> mapper) {
+        return super.mapToInt(mapper);
+    }
+
+    @Override
+    public LongStream mapToLong(ToLongFunction<? super Feature> mapper) {
+        return super.mapToLong(mapper);
+    }
+
+    @Override
+    public DoubleStream mapToDouble(ToDoubleFunction<? super Feature> mapper) {
+        return super.mapToDouble(mapper);
+    }
+
+    @Override
+    public Stream<Feature> parallel() {
+        parallel = true;
+        return this;
+    }
+
+    @Override
+    public Stream<Feature> sequential() {
+        parallel = false;
+        return this;
+    }
+
+    @Override
+    public Stream<Feature> distinct() {
+        queryBuilder.distinct = true;
+        return this;
+    }
+
+    @Override
+    public Stream<Feature> peek(Consumer<? super Feature> action) {
+        return super.peek(action);
+    }
+
+    @Override
+    public Stream<Feature> limit(long maxSize) {
+        if (queryBuilder.limit < 1) queryBuilder.limit = maxSize;
+        else queryBuilder.limit = Math.min(queryBuilder.limit, maxSize);
+        return this;
+    }
+
+    @Override
+    public Stream<Feature> skip(long n) {
+        queryBuilder.offset += n;
+        return this;
+    }
+
+    @Override
+    public long count() {
+        // Avoid opening a connection if sql text cannot be evaluated.
+        final String sql;
+        try {
+            sql = queryBuilder.getSnapshot(true);
+        } catch (SQLException e) {
+            throw new BackingStoreException("Cannot create SQL COUNT query", e);
+        }
+        try (Connection conn = queryBuilder.parent.source.getConnection()) {
+            try (final Statement st = conn.createStatement(); final ResultSet rs = st.executeQuery(sql))
{
+                if (rs.next()) {
+                    return rs.getLong(1);
+                } else return 0;
+            }
+        } catch (SQLException e) {
+            throw new BackingStoreException("Cannot estimate feature set size using SQL COUNT
query", e);
+        }
+    }
+
+    @Override
+    protected synchronized Stream<Feature> getDecoratedStream() {
+            final AtomicReference<Connection> connectionRef = new AtomicReference<>();
+            return Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection()))
+                    .map(Supplier::get)
+                    .peek(connectionRef::set)
+                    .flatMap(conn -> {
+                        try {
+                            final Features iter = queryBuilder.build(conn);
+                            return StreamSupport.stream(iter, parallel).onClose(iter);
+                        } catch (SQLException | DataStoreException e) {
+                            throw new BackingStoreException(e);
+                        }
+                    })
+                    .onClose(() -> queryBuilder.parent.closeRef(connectionRef));
+    }
+
+    private static <T> Supplier<T> uncheck(final Callable<T> generator)
{
+        return () -> {
+            try {
+                return generator.call();
+            } catch (RuntimeException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new BackingStoreException(e);
+            }
+        };
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
index 380669b..321561c 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
@@ -20,19 +20,16 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
 import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
 import javax.sql.DataSource;
 
 import org.opengis.feature.AttributeType;
@@ -52,7 +49,6 @@ import org.apache.sis.internal.metadata.sql.SQLBuilder;
 import org.apache.sis.internal.metadata.sql.SQLUtilities;
 import org.apache.sis.internal.storage.AbstractFeatureSet;
 import org.apache.sis.internal.util.CollectionsExt;
-import org.apache.sis.internal.util.DecoratedStream;
 import org.apache.sis.storage.DataStoreContentException;
 import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.storage.InternalDataStoreException;
@@ -60,7 +56,6 @@ import org.apache.sis.util.CharSequences;
 import org.apache.sis.util.Classes;
 import org.apache.sis.util.Debug;
 import org.apache.sis.util.Numbers;
-import org.apache.sis.util.collection.BackingStoreException;
 import org.apache.sis.util.collection.TreeTable;
 import org.apache.sis.util.collection.WeakValueHashMap;
 
@@ -83,7 +78,7 @@ final class Table extends AbstractFeatureSet {
     /**
      * Provider of (pooled) connections to the database.
      */
-    private final DataSource source;
+    final DataSource source;
 
     /**
      * The structure of this table represented as a feature. Each feature attribute is a
table column,
@@ -98,18 +93,11 @@ final class Table extends AbstractFeatureSet {
     final TableReference name;
 
     /**
-     * Name of attributes in feature instances, excluding operations and associations to
other tables.
-     * Those names are in the order of columns declared in the {@code SELECT <columns}
statement.
-     * This array shall not be modified after construction.
+     * Name of all columns to fetch from database, optionally amended with an alias. Alias
is used for feature type
+     * attributes which have been renamed to avoid name collisions. In any case, a call to
{@link ColumnRef#getAttributeName()}}
+     * will return the name available in target feature type.
      */
-    private final String[] attributeNames;
-
-    /**
-     * Name of columns corresponding to each {@link #attributeNames}. This is often a reference
to the
-     * same array than {@link #attributeNames}, but may be different if some attributes have
been renamed
-     * for avoiding name collisions.
-     */
-    private final String[] attributeColumns;
+    final List<ColumnRef> attributes;
 
     /**
      * The columns that constitute the primary key, or {@code null} if there is no primary
key.
@@ -120,13 +108,13 @@ final class Table extends AbstractFeatureSet {
      * The primary keys of other tables that are referenced by this table foreign key columns.
      * They are 0:1 relations. May be {@code null} if there is no imported keys.
      */
-    private final Relation[] importedKeys;
+    final Relation[] importedKeys;
 
     /**
      * The foreign keys of other tables that reference this table primary key columns.
      * They are 0:N relations. May be {@code null} if there is no exported keys.
      */
-    private final Relation[] exportedKeys;
+    final Relation[] exportedKeys;
 
     /**
      * The class of primary key values, or {@code null} if there is no primary keys.
@@ -152,6 +140,11 @@ final class Table extends AbstractFeatureSet {
     final boolean hasGeometry;
 
     /**
+     * Keep a reference of target database metadata, to ease creation of {@link SQLBuilder}.
+     */
+    final DatabaseMetaData dbMeta;
+
+    /**
      * Creates a description of the table of the given name.
      * The table is identified by {@code id}, which contains a (catalog, schema, name) tuple.
      * The catalog and schema parts are optional and can be null, but the table is mandatory.
@@ -165,6 +158,7 @@ final class Table extends AbstractFeatureSet {
             throws SQLException, DataStoreException
     {
         super(analyzer.listeners);
+        this.dbMeta = analyzer.metadata;
         this.source = analyzer.source;
         this.name   = id;
         final String tableEsc  = analyzer.escape(id.table);
@@ -230,8 +224,7 @@ final class Table extends AbstractFeatureSet {
         boolean  primaryKeyNonNull = true;
         boolean  hasGeometry       = false;
         int startWithLowerCase     = 0;
-        final List<String> attributeNames = new ArrayList<>();
-        final List<String> attributeColumns = new ArrayList<>();
+        final List<ColumnRef> attributes = new ArrayList<>();
         final FeatureTypeBuilder feature = new FeatureTypeBuilder(analyzer.nameFactory, analyzer.functions.library,
analyzer.locale);
         try (ResultSet reflect = analyzer.metadata.getColumns(id.catalog, schemaEsc, tableEsc,
null)) {
             while (reflect.next()) {
@@ -251,6 +244,8 @@ final class Table extends AbstractFeatureSet {
                         startWithLowerCase--;
                     }
                 }
+
+                ColumnRef colRef = new ColumnRef(column);
                 /*
                  * Add the column as an attribute. Foreign keys are excluded (they will be
replaced by associations),
                  * except if the column is also a primary key. In the later case we need
to keep that column because
@@ -258,8 +253,6 @@ final class Table extends AbstractFeatureSet {
                  */
                 AttributeTypeBuilder<?> attribute = null;
                 if (isPrimaryKey || dependencies == null) {
-                    attributeNames.add(column);
-                    attributeColumns.add(column);
                     final String typeName = reflect.getString(Reflection.TYPE_NAME);
                     Class<?> type = analyzer.functions.toJavaType(reflect.getInt(Reflection.DATA_TYPE),
typeName);
                     if (type == null) {
@@ -339,12 +332,14 @@ final class Table extends AbstractFeatureSet {
                              */
                             if (attribute != null) {
                                 attribute.setName(analyzer.nameFactory.createGenericName(null,
"pk", column));
-                                attributeNames.set(attributeNames.size() - 1, attribute.getName().toString());
+                                colRef = colRef.as(attribute.getName().toString());
                                 attribute = null;
                             }
                         }
                     }
                 }
+
+                attributes.add(colRef);
             }
         }
         /*
@@ -420,9 +415,7 @@ final class Table extends AbstractFeatureSet {
         this.exportedKeys     = toArray(exportedKeys);
         this.primaryKeyClass  = primaryKeyClass;
         this.hasGeometry      = hasGeometry;
-        this.attributeNames   = attributeNames.toArray(new String[attributeNames.size()]);
-        this.attributeColumns = attributeColumns.equals(attributeNames) ? this.attributeNames
-                              : attributeColumns.toArray(new String[attributeColumns.size()]);
+        this.attributes = Collections.unmodifiableList(attributes);
     }
 
     /**
@@ -501,8 +494,8 @@ final class Table extends AbstractFeatureSet {
     @Debug
     final void appendTo(TreeTable.Node parent) {
         parent = Relation.newChild(parent, featureType.getName().toString());
-        for (final String attribute : attributeNames) {
-            TableReference.newChild(parent, attribute);
+        for (final ColumnRef attribute : attributes) {
+            TableReference.newChild(parent, attribute.getAttributeName());
         }
         appendAll(parent, importedKeys, " → ");
         appendAll(parent, exportedKeys, " ← ");
@@ -610,22 +603,10 @@ final class Table extends AbstractFeatureSet {
      */
     @Override
     public Stream<Feature> features(final boolean parallel) throws DataStoreException
{
-        final AtomicReference<Connection> connectionRef = new AtomicReference<>();
-        final Stream<Feature> featureStream = Stream.generate(uncheck(() -> source.getConnection()))
-                .peek(connectionRef::set)
-                .flatMap(conn -> {
-                    try {
-                        final Features iter = features(conn, new ArrayList<>(), null);
-                        return StreamSupport.stream(iter, parallel).onClose(iter);
-                    } catch (SQLException | InternalDataStoreException e) {
-                        throw new BackingStoreException(e);
-                    }
-                })
-                .onClose(() -> closeRef(connectionRef));
-        return new CountOverload<>(featureStream);
+        return new StreamSQL(this);
     }
 
-    private void closeRef(final AtomicReference<Connection> ref) {
+    void closeRef(final AtomicReference<Connection> ref) {
         final Connection conn = ref.get();
         if (conn != null) {
             try {
@@ -646,45 +627,6 @@ final class Table extends AbstractFeatureSet {
     final Features features(final Connection connection, final List<Relation> following,
final Relation noFollow)
             throws SQLException, InternalDataStoreException
     {
-        return new Features(this, connection, attributeNames, attributeColumns, importedKeys,
exportedKeys, following, noFollow);
-    }
-
-    private class CountOverload<T> extends DecoratedStream<T> {
-
-        CountOverload(Stream<T> source) {
-            super(source);
-        }
-
-        @Override
-        public long count() {
-            try (Connection conn = Table.this.source.getConnection()) {
-                final String query = new SQLBuilder(conn.getMetaData(), true)
-                        .append("SELECT COUNT(")
-                        .appendIdentifier(attributeColumns[0])
-                        .append(')')
-                        .append(" FROM ")
-                        .appendIdentifier(name.catalog, name.schema, name.table)
-                        .toString();
-                try (final Statement st = conn.createStatement(); final ResultSet rs = st.executeQuery(query))
{
-                    if (rs.next()) {
-                        return rs.getLong(1);
-                    } else return 0;
-                }
-            } catch (SQLException e) {
-                throw new BackingStoreException("Cannot estimate feature set size using SQL
COUNT query", e);
-            }
-        }
-    }
-
-    private static <T> Supplier<T> uncheck(final Callable<T> generator)
{
-        return () -> {
-            try {
-                return generator.call();
-            } catch (RuntimeException e) {
-                throw e;
-            } catch (Exception e) {
-                throw new BackingStoreException(e);
-            }
-        };
+        return new Features(this, connection, attributes, following, noFollow, false, -1,
-1);
     }
 }


Mime
View raw message