sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject [sis] 13/45: WIP(SQLStore): working on user SQL query execution.
Date Tue, 12 Nov 2019 16:44:40 GMT
This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/sql-store
in repository https://gitbox.apache.org/repos/asf/sis.git

commit 2e5f6d7c423def2162a261338ba11277eac33589
Author: Alexis Manin <amanin@apache.org>
AuthorDate: Tue Sep 24 17:32:33 2019 +0200

    WIP(SQLStore): working on user SQL query execution.
---
 .../apache/sis/internal/sql/feature/Analyzer.java  |  11 +-
 .../apache/sis/internal/sql/feature/Connector.java |  15 +++
 .../sis/internal/sql/feature/FeatureAdapter.java   |  22 ++++
 .../apache/sis/internal/sql/feature/Features.java  |  65 ++++++++++--
 .../apache/sis/internal/sql/feature/Import.java    |  33 ++++++
 .../sis/internal/sql/feature/PrimaryKey.java       |   5 +
 .../sis/internal/sql/feature/QueryBuilder.java     |  14 +++
 .../sis/internal/sql/feature/QueryFeatureSet.java  | 118 ++++++++++++++++++++-
 .../sis/internal/sql/feature/ResultContext.java    |  25 +++++
 .../apache/sis/internal/sql/feature/StreamSQL.java |  60 ++++++++---
 .../org/apache/sis/internal/sql/feature/Table.java |  12 ---
 .../org/apache/sis/storage/sql/SQLStoreTest.java   |   4 +-
 12 files changed, 342 insertions(+), 42 deletions(-)

diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
index 157a823..0fce8d2 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
@@ -444,6 +444,7 @@ final class Analyzer {
 
         private TableMetadata(TableReference source, TableReference importedBy) throws SQLException
{
             this.id = source;
+            this.importedBy = importedBy;
             tableEsc = escape(source.table);
             schemaEsc = escape(source.schema);
 
@@ -457,7 +458,6 @@ final class Analyzer {
             }
 
             try (ResultSet reflect = metadata.getColumns(source.catalog, schemaEsc, tableEsc,
null)) {
-                this.importedBy = importedBy;
 
                 final ArrayList<SQLColumn> tmpList = new ArrayList<>();
                 while (reflect.next()) {
@@ -520,9 +520,14 @@ final class Analyzer {
                     Relation r = new Relation(Analyzer.this, Relation.Direction.IMPORT, reflect);
                     final GenericName foreignTypeName = r.getName(Analyzer.this);
                     final Collection<String> fks = r.getForeignerKeys();
-                    // If the link is composed of a single foreign key, we'll name it after
that name. Otherwise, we'll use
-                    // referenced table name, as it will surely be more explicit than a concatenation
of column names.
+                    /* If the link is composed of a single foreign key, we'll name it after
that name. Otherwise,
+                     * we'll use constraint title if present. As a fallback, we take referenced
table name, as it will
+                     * surely be more explicit than a concatenation of column names.
+                     * In all cases, we set "sis" name space, as we are making arbitrary
choices specific to this
+                     * framework.
+                     */
                     if (fks.size() == 1) r.propertyName = Names.createGenericName(null, ":",
"sis", fks.iterator().next());
+                    else if (r.freeText != null && !r.freeText.isEmpty()) r.propertyName
= Names.createGenericName(null,":","sis", r.freeText);
                     else r.propertyName = Names.createGenericName(null, ":", "sis", foreignTypeName.tip().toString());
                     imports.add(r);
                 } while (!reflect.isClosed());
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
new file mode 100644
index 0000000..c99bd7f
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
@@ -0,0 +1,15 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.stream.Stream;
+
+import org.opengis.feature.Feature;
+
+import org.apache.sis.storage.DataStoreException;
+
+public interface Connector {
+    Stream<Feature> connect(Connection connection) throws SQLException, DataStoreException;
+
+    String estimateStatement(final boolean count);
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
new file mode 100644
index 0000000..4fde073
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
@@ -0,0 +1,22 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.util.List;
+
+import org.opengis.feature.FeatureType;
+
+public class FeatureAdapter {
+
+    private final FeatureType type;
+
+    public FeatureAdapter(FeatureType type) {
+        this.type = type;
+    }
+
+    public FeatureType getType() {
+        return type;
+    }
+
+    public List<Features.SQLFunction<ResultContext.Cell, ?>> getProperties()
{
+        throw new UnsupportedOperationException("");
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
index 204cadd..d2d623b 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
@@ -33,6 +33,8 @@ import java.util.Map;
 import java.util.Spliterator;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import org.opengis.feature.Feature;
 import org.opengis.feature.FeatureType;
@@ -563,12 +565,37 @@ final class Features implements Spliterator<Feature> {
         }
     }
 
-    static class Builder {
+    @FunctionalInterface
+    interface SQLFunction<T, R> {
+        R apply(T t) throws SQLException;
+
+        /**
+         * Returns a composed function that first applies this function to
+         * its input, and then applies the {@code after} function to the result.
+         * If evaluation of either function throws an exception, it is relayed to
+         * the caller of the composed function.
+         *
+         * @param <V> the type of output of the {@code after} function, and of the
+         *           composed function
+         * @param after the function to apply after this function is applied
+         * @return a composed function that first applies this function and then
+         * applies the {@code after} function
+         * @throws NullPointerException if after is null
+         */
+        default <V> SQLFunction<T, V> andThen(Function<? super R, ? extends
V> after) {
+            ensureNonNull("After function", after);
+            return t -> after.apply(apply(t));
+        }
+    }
+
+    static class Builder implements QueryBuilder {
 
         final Table parent;
         long limit, offset;
         SortBy[] sort;
 
+        boolean distinct;
+
         Builder(Table parent) {
             this.parent = parent;
         }
@@ -578,7 +605,7 @@ final class Features implements Spliterator<Feature> {
         }
 
         Connector select(boolean distinct, ColumnRef... columns) {
-            return new Connector(this, distinct, columns);
+            return new TableConnector(this, distinct, columns);
         }
 
         Builder where(final Filter filter) {
@@ -590,9 +617,32 @@ final class Features implements Spliterator<Feature> {
             else this.sort = Arrays.copyOf(sorting, sorting.length);
             return this;
         }
+
+        @Override
+        public QueryBuilder limit(long limit) {
+            this.limit = limit;
+            return this;
+        }
+
+        @Override
+        public QueryBuilder offset(long offset) {
+            this.offset = offset;
+            return this;
+        }
+
+        @Override
+        public QueryBuilder distinct(boolean activate) {
+            this.distinct = activate;
+            return this;
+        }
+
+        @Override
+        public Connector select(ColumnRef... columns) {
+            throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin
(Geomatys)" on 24/09/2019
+        }
     }
 
-    public static final class Connector {
+    static final class TableConnector implements Connector {
         final Builder source;
 
         final boolean distinct;
@@ -600,19 +650,20 @@ final class Features implements Spliterator<Feature> {
 
         final SortBy[] sort;
 
-        public Connector(Builder source, boolean distinct, ColumnRef[] columns) {
+        TableConnector(Builder source, boolean distinct, ColumnRef[] columns) {
             this.source = source;
             this.distinct = distinct;
             this.columns = columns;
             this.sort = source.sort == null ? null : Arrays.copyOf(source.sort, source.sort.length);
         }
 
-        public Features connect(final Connection conn) throws SQLException, DataStoreException
{
-            return new Features(
+        public Stream<Feature> connect(final Connection conn) throws SQLException,
DataStoreException {
+            final Features features = new Features(
                     source.parent, conn,
                     columns == null || columns.length < 1 ? source.parent.attributes :
Arrays.asList(columns),
                     new ArrayList<>(), null, distinct, source.offset, source.limit
             );
+            return StreamSupport.stream(features, false);
         }
 
         /**
@@ -621,7 +672,7 @@ final class Features implements Spliterator<Feature> {
          * @return A text representing (roughly) the SQL query which will be posted.
          * @throws SQLException If we cannot initialize an sql statement builder.
          */
-        String getSnapshot(final boolean count) {
+        public String estimateStatement(final boolean count) {
             final SQLBuilder sql = source.parent.createStatement().append("SELECT ");
             if (count) sql.append("COUNT(");
             if (distinct) sql.append("DISTINCT ");
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Import.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Import.java
new file mode 100644
index 0000000..f5614e1
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Import.java
@@ -0,0 +1,33 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+class Import {
+
+    final String propertyName;
+
+    final List<String> fkColumns;
+
+    final TableReference target;
+
+    public Import(String propertyName, Collection<String> fkColumns, TableReference
target) {
+        this.propertyName = propertyName;
+        this.fkColumns = Collections.unmodifiableList(new ArrayList<>(fkColumns));
+        this.target = target;
+    }
+
+    public String getPropertyName() {
+        return propertyName;
+    }
+
+    public List<String> getFkColumns() {
+        return fkColumns;
+    }
+
+    public TableReference getTarget() {
+        return target;
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
index 25ee889..1e68fd3 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
@@ -7,6 +7,11 @@ import java.util.Optional;
 
 import org.apache.sis.util.ArgumentChecks;
 
+/**
+ * Represents SQL primary key constraint. Main information is columns composing the key.
+ *
+ * @author "Alexis Manin (Geomatys)"
+ */
 interface PrimaryKey {
 
     static Optional<PrimaryKey> create(List<String> cols) {
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
new file mode 100644
index 0000000..24ac4f3
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
@@ -0,0 +1,14 @@
+package org.apache.sis.internal.sql.feature;
+
+interface QueryBuilder {
+
+    QueryBuilder limit(long limit);
+
+    QueryBuilder offset(long offset);
+
+    default QueryBuilder distinct() { return distinct(true); }
+
+    QueryBuilder distinct(boolean activate);
+
+    Connector select(ColumnRef... columns);
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
index b45c2dc..0607d3a 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
@@ -2,8 +2,13 @@ package org.apache.sis.internal.sql.feature;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.function.Consumer;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import javax.sql.DataSource;
 
 import org.opengis.feature.Feature;
@@ -12,6 +17,7 @@ import org.opengis.feature.FeatureType;
 import org.apache.sis.internal.metadata.sql.SQLBuilder;
 import org.apache.sis.internal.storage.AbstractFeatureSet;
 import org.apache.sis.storage.DataStoreException;
+import org.apache.sis.util.collection.BackingStoreException;
 
 public class QueryFeatureSet extends AbstractFeatureSet {
 
@@ -25,6 +31,8 @@ public class QueryFeatureSet extends AbstractFeatureSet {
     private final DataSource source;
     private final FeatureType resultType;
 
+    private final FeatureAdapter adapter = null;
+
     public QueryFeatureSet(SQLBuilder queryBuilder, Analyzer analyzer, DataSource source)
throws DataStoreException {
         super(analyzer.listeners);
         this.queryBuilder = queryBuilder;
@@ -76,6 +84,114 @@ public class QueryFeatureSet extends AbstractFeatureSet {
 
     @Override
     public Stream<Feature> features(boolean parallel) throws DataStoreException {
-        throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin (Geomatys)"
on 19/09/2019
+        return new StreamSQL(new QueryAdapter(queryBuilder), source);
+    }
+
+    private class QueryAdapter implements QueryBuilder {
+
+        SQLBuilder source;
+
+        QueryAdapter(SQLBuilder source) {
+            this.source = source;
+        }
+
+        @Override
+        public QueryBuilder limit(long limit) {
+            throw new UnsupportedOperationException("Not supported yet: modifying user query");
// "Alexis Manin (Geomatys)" on 24/09/2019
+        }
+
+        @Override
+        public QueryBuilder offset(long offset) {
+            throw new UnsupportedOperationException("Not supported yet: modifying user query");
// "Alexis Manin (Geomatys)" on 24/09/2019
+        }
+
+        @Override
+        public QueryBuilder distinct(boolean activate) {
+            throw new UnsupportedOperationException("Not supported yet: modifying user query");
// "Alexis Manin (Geomatys)" on 24/09/2019
+        }
+
+        @Override
+        public Connector select(ColumnRef... columns) {
+            if (columns == null || columns.length < 1)
+                return new PreparedQueryConnector(source.toString());
+            throw new UnsupportedOperationException("Not supported yet: modifying user query");
// "Alexis Manin (Geomatys)" on 24/09/2019
+        }
+    }
+
+    private class PreparedQueryConnector implements Connector {
+
+        final String sql;
+
+        private PreparedQueryConnector(String sql) {
+            this.sql = sql;
+        }
+
+        @Override
+        public Stream<Feature> connect(Connection connection) throws SQLException,
DataStoreException {
+            final PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+            final ResultSet result = statement.executeQuery();
+
+            final Stream<Feature> stream = StreamSupport.stream(new ResultSpliterator(result),
false);
+            return stream.onClose(() -> {
+                try (
+                        final AutoCloseable rc = result::close;
+                        final AutoCloseable sc = statement::close;
+                ) {
+                    // No-op. Using try with resource allows to manage closing of second
resource even if first one throws an error.
+                } catch (Exception e) {
+                    QueryFeatureSet.this.warning(e);
+                }
+            });
+        }
+
+        @Override
+        public String estimateStatement(boolean count) {
+            throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin
(Geomatys)" on 24/09/2019
+        }
+    }
+
+    private class ResultSpliterator implements Spliterator<Feature> {
+
+        final ResultContext result;
+        FeatureAdapter adapter;
+
+        private ResultSpliterator(ResultSet result) {
+            this.result = new ResultContext(result);
+        }
+
+        @Override
+        public boolean tryAdvance(Consumer<? super Feature> action) {
+            try {
+                if (result.source.next()) {
+                    final Feature f = adapter.getType().newInstance();
+                    final List<Features.SQLFunction<ResultContext.Cell, ?>> properties
= adapter.getProperties();
+                    for (int i = 0; i < properties.size() ; i++) {
+                        final Object value = properties.get(i).apply(result.cell(i, null));
+                        if (value != null) f.setPropertyValue(null, value);
+                    }
+                    return true;
+                } else return false;
+            } catch (SQLException e) {
+                throw new BackingStoreException("Cannot advance in SQL query result", e);
+            }
+        }
+
+        @Override
+        public Spliterator<Feature> trySplit() {
+            return null;
+        }
+
+        @Override
+        public long estimateSize() {
+            // TODO: economic size estimation ? A count query seems overkill for the aim
of this API. Howver, we could
+            // analyze user query in search for a limit value.
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public int characteristics() {
+            // TODO: determine if it's sorted by analysing user query. SIZED is not possible,
as limit is an upper threshold.
+            return Spliterator.IMMUTABLE | Spliterator.NONNULL;
+        }
     }
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ResultContext.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ResultContext.java
new file mode 100644
index 0000000..c7ef3df
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ResultContext.java
@@ -0,0 +1,25 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.ResultSet;
+
+class ResultContext {
+    final ResultSet source;
+
+    ResultContext(ResultSet source) {
+        this.source = source;
+    }
+
+    Cell cell(int columnIndex, String propertyName) {
+        return new Cell(columnIndex, propertyName);
+    }
+
+    class Cell {
+        final int colIdx;
+        final String propertyName;
+
+        private Cell(int colIdx, String propertyName) {
+            this.colIdx = colIdx;
+            this.propertyName = propertyName;
+        }
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
index 1768c63..f80cc0d 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
@@ -32,11 +32,13 @@ import java.util.function.Supplier;
 import java.util.function.ToDoubleFunction;
 import java.util.function.ToIntFunction;
 import java.util.function.ToLongFunction;
+import java.util.logging.Level;
 import java.util.stream.DoubleStream;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
+import javax.sql.DataSource;
 
 import org.opengis.feature.Feature;
 
@@ -44,6 +46,7 @@ import org.apache.sis.internal.util.DoubleStreamDecoration;
 import org.apache.sis.internal.util.StreamDecoration;
 import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.util.collection.BackingStoreException;
+import org.apache.sis.util.logging.Logging;
 
 import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
 
@@ -62,19 +65,29 @@ import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
  */
 class StreamSQL extends StreamDecoration<Feature> {
 
-    final Features.Builder queryBuilder;
-    boolean parallel;
+    private final QueryBuilder queryAdapter;
+
+    private final DataSource source;
 
-    private boolean distinct;
+    boolean parallel;
 
     private Consumer<? super Feature> peekAction;
 
+    private long limit = 0, offset = 0;
+
+    private Consumer<SQLException> warningConsumer = e -> Logging.getLogger("sis.sql").log(Level.FINE,
"Cannot properly close a connection", e);
+
     StreamSQL(final Table source) {
-        this(new Features.Builder(source));
+        this(new Features.Builder(source), source.source);
+    }
+
+    StreamSQL(QueryBuilder queryAdapter, DataSource source) {
+        this.queryAdapter = queryAdapter;
+        this.source = source;
     }
 
-    StreamSQL(Features.Builder builder) {
-        this.queryBuilder = builder;
+    public void setWarningConsumer(Consumer<SQLException> warningConsumer) {
+        this.warningConsumer = warningConsumer;
     }
 
     @Override
@@ -111,7 +124,7 @@ class StreamSQL extends StreamDecoration<Feature> {
 
     @Override
     public Stream<Feature> distinct() {
-        distinct = true;
+        queryAdapter.distinct();
         return this;
     }
 
@@ -130,22 +143,24 @@ class StreamSQL extends StreamDecoration<Feature> {
 
     @Override
     public Stream<Feature> limit(long maxSize) {
-        if (queryBuilder.limit < 1) queryBuilder.limit = maxSize;
-        else queryBuilder.limit = Math.min(queryBuilder.limit, maxSize);
+        if (limit < 1) limit = maxSize;
+        else limit = Math.min(limit, maxSize);
+        queryAdapter.limit(limit);
         return this;
     }
 
     @Override
     public Stream<Feature> skip(long n) {
-        queryBuilder.offset += n;
+        offset += n;
+        queryAdapter.offset(offset);
         return this;
     }
 
     @Override
     public long count() {
         // Avoid opening a connection if sql text cannot be evaluated.
-        final String sql = queryBuilder.select(distinct).getSnapshot(true);
-        try (Connection conn = queryBuilder.parent.source.getConnection()) {
+        final String sql = queryAdapter.select().estimateStatement(true);
+        try (Connection conn = source.getConnection()) {
             try (Statement st = conn.createStatement();
                  ResultSet rs = st.executeQuery(sql)) {
                 if (rs.next()) {
@@ -160,20 +175,19 @@ class StreamSQL extends StreamDecoration<Feature> {
     @Override
     protected synchronized Stream<Feature> createDecoratedStream() {
         final AtomicReference<Connection> connectionRef = new AtomicReference<>();
-        Stream<Feature> featureStream = Stream.of(uncheck(() -> QueryFeatureSet.connectReadOnly(queryBuilder.parent.source)))
+        Stream<Feature> featureStream = Stream.of(uncheck(() -> QueryFeatureSet.connectReadOnly(source)))
                 .map(Supplier::get)
                 .peek(connectionRef::set)
                 .flatMap(conn -> {
                     try {
-                        final Features iter = queryBuilder.select(distinct).connect(conn);
-                        return StreamSupport.stream(iter, parallel);
+                        return queryAdapter.select().connect(conn);
                     } catch (SQLException | DataStoreException e) {
                         throw new BackingStoreException(e);
                     }
                 })
-                .onClose(() -> queryBuilder.parent.closeRef(connectionRef, true));
+                .onClose(() -> closeRef(connectionRef, true));
         if (peekAction != null) featureStream = featureStream.peek(peekAction);
-        return featureStream;
+        return parallel? featureStream : featureStream.parallel();
     }
 
     /**
@@ -374,4 +388,16 @@ class StreamSQL extends StreamDecoration<Feature> {
             return o;
         };
     }
+
+    void closeRef(final AtomicReference<Connection> ref, boolean forceRollback) {
+        final Connection conn = ref.get();
+        if (conn != null) {
+            try {
+                if (forceRollback) conn.rollback();
+                conn.close();
+            } catch (SQLException e) {
+                if (warningConsumer != null) warningConsumer.accept(e);
+            }
+        }
+    }
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
index 54df4f9..aacf9c9 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
@@ -419,18 +419,6 @@ final class Table extends AbstractFeatureSet {
         return new StreamSQL(this);
     }
 
-    void closeRef(final AtomicReference<Connection> ref, boolean forceRollback) {
-        final Connection conn = ref.get();
-        if (conn != null) {
-            try {
-                if (forceRollback) conn.rollback();
-                conn.close();
-            } catch (SQLException e) {
-                warning(e);
-            }
-        }
-    }
-
     /**
      * Returns an iterator over the features.
      *
diff --git a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
index 0ab5ccc..ff83c46 100644
--- a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
+++ b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
@@ -142,7 +142,7 @@ public final strictfp class SQLStoreTest extends TestCase {
                         new Object[] {null,             String.class, String.class, "Cities"});
 
                 verifyFeatureType(((FeatureSet) store.findResource("Parks")).getType(),
-                        new String[] {"sis:identifier", "country", "city",       "native_name",
"english_name", "sis:Cities"},
+                        new String[] {"sis:identifier", "country", "city",       "native_name",
"english_name", "sis:FK_City"},
                         new Object[] {null,             String.class,  String.class, String.class,
 String.class, "Cities"});
 
                 try (Stream<Feature> features = cities.features(false)) {
@@ -324,7 +324,7 @@ public final strictfp class SQLStoreTest extends TestCase {
              * Verify the reverse association form Parks to Cities.
              * This create a cyclic graph, but SQLStore is capable to handle it.
              */
-            assertSame("City → Park → City", feature, pf.getPropertyValue("sis:Cities"));
+            assertSame("City → Park → City", feature, pf.getPropertyValue("sis:FK_City"));
         }
     }
 


Mime
View raw message