sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject [sis] 03/22: feat(SQL-Store): Add a feature set to execute custom SQL queries
Date Thu, 14 Nov 2019 11:46:37 GMT
This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/sql-store
in repository https://gitbox.apache.org/repos/asf/sis.git

commit 69ed5456dafa3e5a983cf348a90148c419787e48
Author: Alexis Manin <amanin@apache.org>
AuthorDate: Thu Sep 19 18:03:46 2019 +0200

    feat(SQL-Store): Add a feature set to execute custom SQL queries
    
    It can happen that user have specific needs, and want to execute its own query.
    In such case, we want to provide a feature compliant structure.
---
 .../apache/sis/internal/sql/feature/Analyzer.java  | 337 ++++++++++++++++++++-
 .../apache/sis/internal/sql/feature/ColumnRef.java |  17 +-
 .../apache/sis/internal/sql/feature/Connector.java |  15 +
 .../sis/internal/sql/feature/FeatureAdapter.java   |  78 +++++
 .../apache/sis/internal/sql/feature/Features.java  | 146 ++++++---
 .../apache/sis/internal/sql/feature/Import.java    |  33 ++
 .../sis/internal/sql/feature/PrimaryKey.java       |  53 ++++
 .../sis/internal/sql/feature/QueryBuilder.java     |  17 ++
 .../sis/internal/sql/feature/QueryFeatureSet.java  | 327 ++++++++++++++++++++
 .../apache/sis/internal/sql/feature/Relation.java  |  12 +-
 .../sis/internal/sql/feature/ResultContext.java    |  25 ++
 .../sis/internal/sql/feature/SQLBiFunction.java    |  38 +++
 .../apache/sis/internal/sql/feature/SQLColumn.java |  58 ++++
 .../sis/internal/sql/feature/SQLQueryAdapter.java  |  56 ++++
 .../sis/internal/sql/feature/SQLQueryBuilder.java  |  28 ++
 .../internal/sql/feature/SQLTypeSpecification.java |  35 +++
 .../sis/internal/sql/feature/SpatialFunctions.java |  97 ++++--
 .../apache/sis/internal/sql/feature/StreamSQL.java | 102 ++++---
 .../org/apache/sis/internal/sql/feature/Table.java | 323 ++++----------------
 .../java/org/apache/sis/storage/sql/SQLStore.java  |  22 +-
 .../org/apache/sis/storage/sql/SQLStoreTest.java   | 212 +++++++++++--
 .../org/apache/sis/storage/sql/Features.sql        |   4 +-
 .../apache/sis/internal/storage/SubsetAdapter.java | 131 ++++++++
 .../sis/internal/storage/query/SimpleQuery.java    |  20 +-
 24 files changed, 1755 insertions(+), 431 deletions(-)

diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
index a24d969..ff5f0a7 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
@@ -16,31 +16,38 @@
  */
 package org.apache.sis.internal.sql.feature;
 
-import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Locale;
-import java.util.Objects;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.*;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import javax.sql.DataSource;
-import java.sql.SQLException;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import org.opengis.util.NameSpace;
-import org.opengis.util.NameFactory;
+
+import org.opengis.feature.Feature;
+import org.opengis.feature.PropertyType;
 import org.opengis.util.GenericName;
+import org.opengis.util.NameFactory;
+import org.opengis.util.NameSpace;
+
+import org.apache.sis.feature.builder.AssociationRoleBuilder;
+import org.apache.sis.feature.builder.AttributeRole;
+import org.apache.sis.feature.builder.AttributeTypeBuilder;
+import org.apache.sis.feature.builder.FeatureTypeBuilder;
 import org.apache.sis.internal.metadata.sql.Dialect;
+import org.apache.sis.internal.metadata.sql.Reflection;
 import org.apache.sis.internal.metadata.sql.SQLUtilities;
+import org.apache.sis.internal.sql.feature.FeatureAdapter.PropertyMapper;
 import org.apache.sis.internal.system.DefaultFactories;
-import org.apache.sis.storage.sql.SQLStore;
 import org.apache.sis.storage.DataStore;
+import org.apache.sis.storage.DataStoreContentException;
 import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.storage.InternalDataStoreException;
+import org.apache.sis.storage.sql.SQLStore;
+import org.apache.sis.util.collection.BackingStoreException;
+import org.apache.sis.util.iso.Names;
 import org.apache.sis.util.logging.WarningListeners;
 import org.apache.sis.util.resources.ResourceInternationalString;
 
@@ -296,6 +303,10 @@ final class Analyzer {
         warnings.add(Resources.formatInternational(key, argument));
     }
 
+    private PropertyAdapter analyze(SQLColumn target) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Invoked after we finished to create all tables. This method flush the warnings
      * (omitting duplicated warnings), then returns all tables including dependencies.
@@ -312,4 +323,300 @@ final class Analyzer {
         }
         return tables.values();
     }
+
+    public SQLTypeSpecification create(final TableReference table, final TableReference importedBy) throws SQLException {
+        return new TableMetadata(table, importedBy);
+    }
+
+    public SQLTypeSpecification create(final PreparedStatement target, final String sourceQuery, final GenericName optName) throws SQLException {
+        return new QuerySpecification(target, sourceQuery, optName);
+    }
+
+    public FeatureAdapter buildAdapter(final SQLTypeSpecification spec) throws SQLException {
+        final FeatureTypeBuilder builder = new FeatureTypeBuilder(nameFactory, functions.library, locale);
+        builder.setName(spec.getName() == null ? Names.createGenericName("sis", ":", UUID.randomUUID().toString()) : spec.getName());
+        builder.setDefinition(spec.getDefinition());
+        final String geomCol = spec.getPrimaryGeometryColumn().orElse("");
+        final List pkCols = spec.getPK().map(PrimaryKey::getColumns).orElse(Collections.EMPTY_LIST);
+        List<PropertyMapper> attributes = new ArrayList<>();
+        // JDBC column indices are 1 based.
+        int i = 0;
+        for (SQLColumn col : spec.getColumns()) {
+            i++;
+            final SpatialFunctions.ColumnAdapter<?> colAdapter = functions.toJavaType(col.getType(), col.getTypeName());
+            Class<?> type = colAdapter.javaType;
+            final String colName = col.getName().getColumnName();
+            final String attrName = col.getName().getAttributeName();
+            if (type == null) {
+                warning(Resources.Keys.UnknownType_1, colName);
+                type = Object.class;
+            }
+
+            final AttributeTypeBuilder<?> attribute = builder
+                    .addAttribute(type)
+                    .setName(attrName);
+            if (col.isNullable()) attribute.setMinimumOccurs(0);
+            final int precision = col.getPrecision();
+            /* TODO: we should check column type. Precision for numbers or blobs is meaningfull, but the convention
+             * exposed by SIS does not allow to distinguish such cases.
+             */
+            if (precision > 0) attribute.setMaximalLength(precision);
+
+            col.getCrs().ifPresent(attribute::setCRS);
+            if (geomCol.equals(attrName)) attribute.addRole(AttributeRole.DEFAULT_GEOMETRY);
+
+            if (pkCols.contains(colName)) attribute.addRole(AttributeRole.IDENTIFIER_COMPONENT);
+            attributes.add(new PropertyMapper(attrName, i, colAdapter));
+        }
+
+        addImports(spec, builder);
+
+        addExports(spec, builder);
+
+        return new FeatureAdapter(builder.build(), attributes);
+    }
+
+    private void addExports(SQLTypeSpecification spec, FeatureTypeBuilder builder) throws SQLException {
+        final List<Relation> exports;
+        try {
+            exports = spec.getExports();
+        } catch (DataStoreContentException e) {
+            throw new BackingStoreException(e);
+        }
+
+        for (final Relation r : exports) {
+            try {
+                final GenericName foreignTypeName = r.getName(Analyzer.this);
+                final Table foreignTable = table(r, foreignTypeName, null); // 'null' because exported, not imported.
+                final AssociationRoleBuilder association;
+                if (foreignTable != null) {
+                    r.setSearchTable(Analyzer.this, foreignTable, spec.getPK().map(PrimaryKey::getColumns).map(l -> l.toArray(new String[0])).orElse(null), Relation.Direction.EXPORT);
+                    association = builder.addAssociation(foreignTable.featureType);
+                } else {
+                    association = builder.addAssociation(foreignTypeName);     // May happen in case of cyclic dependency.
+                }
+                association.setName(r.propertyName)
+                        .setMinimumOccurs(0)
+                        .setMaximumOccurs(Integer.MAX_VALUE);
+            } catch (DataStoreException e) {
+                throw new BackingStoreException(e);
+            }
+        }
+    }
+
+    private void addImports(SQLTypeSpecification spec, FeatureTypeBuilder target) throws SQLException {
+        final List<Relation> imports = spec.getImports();
+        // TODO: add an abstraction here, so we can specify source table when origin is one.
+        for (Relation r : imports) {
+            final GenericName foreignTypeName = r.getName(Analyzer.this);
+            final Table foreignTable;
+            try {
+                foreignTable = table(r, foreignTypeName, null);
+            } catch (DataStoreException e) {
+                throw new BackingStoreException(e);
+            }
+            final AssociationRoleBuilder association = foreignTable == null?
+                    target.addAssociation(foreignTypeName) : target.addAssociation(foreignTable.featureType);
+            association.setName(r.propertyName);
+        }
+    }
+
+    private interface PropertyAdapter {
+        PropertyType getType();
+        void fill(ResultSet source, final Feature target);
+    }
+
+    private class TableMetadata implements SQLTypeSpecification {
+        final TableReference id;
+        private final String tableEsc;
+        private final String schemaEsc;
+
+        private final Optional<PrimaryKey> pk;
+
+        private final TableReference importedBy;
+
+        private final List<SQLColumn> columns;
+
+        private TableMetadata(TableReference source, TableReference importedBy) throws SQLException {
+            this.id = source;
+            this.importedBy = importedBy;
+            tableEsc = escape(source.table);
+            schemaEsc = escape(source.schema);
+
+            try (ResultSet reflect = metadata.getPrimaryKeys(id.catalog, id.schema, id.table)) {
+                final List<String> cols = new ArrayList<>();
+                while (reflect.next()) {
+                    cols.add(getUniqueString(reflect, Reflection.COLUMN_NAME));
+                    // The actual Boolean value will be fetched in the loop on columns later.
+                }
+                pk = PrimaryKey.create(cols);
+            }
+
+            try (ResultSet reflect = metadata.getColumns(source.catalog, schemaEsc, tableEsc, null)) {
+
+                final ArrayList<SQLColumn> tmpList = new ArrayList<>();
+                while (reflect.next()) {
+                    final int type = reflect.getInt(Reflection.DATA_TYPE);
+                    final String typeName = reflect.getString(Reflection.TYPE_NAME);
+                    final boolean isNullable = Boolean.TRUE.equals(SQLUtilities.parseBoolean(reflect.getString(Reflection.IS_NULLABLE)));
+                    final ColumnRef name = new ColumnRef(getUniqueString(reflect, Reflection.COLUMN_NAME));
+                    final int precision = reflect.getInt(Reflection.COLUMN_SIZE);
+                    final SQLColumn col = new SQLColumn(type, typeName, isNullable, name, precision);
+                    tmpList.add(col);
+                }
+
+                columns = Collections.unmodifiableList(tmpList);
+            }
+        }
+
+        @Override
+        public GenericName getName() {
+            return id.getName(Analyzer.this);
+        }
+
+        /**
+         * The remarks are opportunistically stored in id.freeText if known by the caller.
+         */
+        @Override
+        public String getDefinition() throws SQLException {
+            String remarks = id.freeText;
+            if (id instanceof Relation) {
+                try (ResultSet reflect = metadata.getTables(id.catalog, schemaEsc, tableEsc, null)) {
+                    while (reflect.next()) {
+                        remarks = getUniqueString(reflect, Reflection.REMARKS);
+                        if (remarks != null) {
+                            remarks = remarks.trim();
+                            if (remarks.isEmpty()) {
+                                remarks = null;
+                            } else break;
+                        }
+                    }
+                }
+            }
+            return remarks;
+        }
+
+        @Override
+        public Optional<PrimaryKey> getPK() throws SQLException {
+            return pk;
+        }
+
+        @Override
+        public List<SQLColumn> getColumns() {
+            return columns;
+        }
+
+        @Override
+        public List<Relation> getImports() throws SQLException {
+            try (ResultSet reflect = metadata.getImportedKeys(id.catalog, id.schema, id.table)) {
+                if (!reflect.next()) return Collections.EMPTY_LIST;
+                final List<Relation> imports = new ArrayList<>(2);
+                do {
+                    Relation r = new Relation(Analyzer.this, Relation.Direction.IMPORT, reflect);
+                    final GenericName foreignTypeName = r.getName(Analyzer.this);
+                    final Collection<String> fks = r.getForeignerKeys();
+                    /* If the link is composed of a single foreign key, we'll name it after that name. Otherwise,
+                     * we'll use constraint title if present. As a fallback, we take referenced table name, as it will
+                     * surely be more explicit than a concatenation of column names.
+                     * In all cases, we set "sis" name space, as we are making arbitrary choices specific to this
+                     * framework.
+                     */
+                    if (fks.size() == 1) r.propertyName = Names.createGenericName(null, ":", "sis", fks.iterator().next());
+                    else if (r.freeText != null && !r.freeText.isEmpty()) r.propertyName = Names.createGenericName(null,":","sis", r.freeText);
+                    else r.propertyName = Names.createGenericName(null, ":", "sis", foreignTypeName.tip().toString());
+                    imports.add(r);
+                } while (!reflect.isClosed());
+                return imports;
+            } catch (DataStoreContentException e) {
+                throw new BackingStoreException(e);
+            }
+        }
+
+        @Override
+        public List<Relation> getExports() throws SQLException, DataStoreContentException {
+            try (ResultSet reflect = metadata.getExportedKeys(id.catalog, id.schema, id.table)) {
+                if (!reflect.next()) return Collections.EMPTY_LIST;
+                final List<Relation> exports = new ArrayList<>(2);
+                do {
+                    final Relation export = new Relation(Analyzer.this, Relation.Direction.EXPORT, reflect);
+                    final GenericName foreignTypeName = export.getName(Analyzer.this);
+                    final String propertyName = foreignTypeName.tip().toString();
+                    export.propertyName = Names.createGenericName(null, ":", "sis", propertyName);
+                    if (!export.equals(importedBy)) {
+                        exports.add(export);
+                    }
+                } while (!reflect.isClosed());
+                return exports;
+            }
+        }
+
+        @Override
+        public Optional<String> getPrimaryGeometryColumn() {
+            return Optional.empty();
+            //throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin (Geomatys)" on 20/09/2019
+        }
+    }
+
+    private class QuerySpecification implements SQLTypeSpecification {
+
+        final int total;
+        final PreparedStatement source;
+        private final ResultSetMetaData meta;
+        private final String query;
+        private final GenericName name;
+
+        private final List<SQLColumn> columns;
+
+        public QuerySpecification(PreparedStatement source, String sourceQuery, GenericName optName) throws SQLException {
+            this.source = source;
+            meta = source.getMetaData();
+            total = meta.getColumnCount();
+            query = sourceQuery;
+            name = optName;
+
+            final ArrayList<SQLColumn> tmpCols = new ArrayList<>(total);
+            for (int i = 1 ; i <= total ; i++) {
+                tmpCols.add(new SQLColumn(
+                        meta.getColumnType(i),
+                        meta.getColumnTypeName(i),
+                        meta.isNullable(i) == ResultSetMetaData.columnNullable,
+                        new ColumnRef(meta.getColumnName(i)).as(meta.getColumnLabel(i)),
+                        meta.getPrecision(i)
+                ));
+            }
+
+            columns = Collections.unmodifiableList(tmpCols);
+        }
+
+        @Override
+        public GenericName getName() throws SQLException {
+            return name;
+        }
+
+        @Override
+        public String getDefinition() throws SQLException {
+            return query;
+        }
+
+        @Override
+        public Optional<PrimaryKey> getPK() throws SQLException {
+            return Optional.empty();
+        }
+
+        @Override
+        public List<SQLColumn> getColumns() {
+            return columns;
+        }
+
+        @Override
+        public List<Relation> getImports() throws SQLException {
+            return Collections.EMPTY_LIST;
+        }
+
+        @Override
+        public List<Relation> getExports() throws SQLException, DataStoreContentException {
+            return Collections.EMPTY_LIST;
+        }
+    }
+
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java
index f2d229d..d02fb8f 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ColumnRef.java
@@ -10,12 +10,12 @@ import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
  * A column reference. Specify name of the column, and optionally an alias to use for public visibility.
  * By default, column has no alias. To create a column with an alias, use {@code ColumnRef myCol = new ColumnRef("colName).as("myAlias");}
  */
-public final class ColumnRef {
-    final String name;
-    final String alias;
-    final String attrName;
+final class ColumnRef {
+    private final String name;
+    private final String alias;
+    private final String attrName;
 
-    public ColumnRef(String name) {
+    ColumnRef(String name) {
         ensureNonNull("Column name", name);
         this.name = this.attrName = name;
         alias = null;
@@ -27,11 +27,13 @@ public final class ColumnRef {
         this.alias = this.attrName = alias;
     }
 
-    ColumnRef as(final String alias) {
+    public ColumnRef as(final String alias) {
+        if (Objects.equals(alias, this.alias)) return this;
+        else if (alias == null || alias.equals(name)) return new ColumnRef(name);
         return new ColumnRef(name, alias);
     }
 
-    SQLBuilder append(final SQLBuilder target) {
+    public SQLBuilder append(final SQLBuilder target) {
         target.appendIdentifier(name);
         if (alias != null) {
             target.append(" AS ").appendIdentifier(alias);
@@ -40,6 +42,7 @@ public final class ColumnRef {
         return target;
     }
 
+    public String getColumnName() { return name; }
     public String getAttributeName() {
         return attrName;
     }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
new file mode 100644
index 0000000..c99bd7f
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
@@ -0,0 +1,15 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.stream.Stream;
+
+import org.opengis.feature.Feature;
+
+import org.apache.sis.storage.DataStoreException;
+
+public interface Connector {
+    Stream<Feature> connect(Connection connection) throws SQLException, DataStoreException;
+
+    String estimateStatement(final boolean count);
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
new file mode 100644
index 0000000..0162e67
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
@@ -0,0 +1,78 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.opengis.feature.Feature;
+import org.opengis.feature.FeatureType;
+
+import org.apache.sis.internal.sql.feature.SpatialFunctions.ColumnAdapter;
+
+import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
+
+class FeatureAdapter {
+
+    final FeatureType type;
+
+    private final List<PropertyMapper> attributeMappers;
+
+    FeatureAdapter(FeatureType type, List<PropertyMapper> attributeMappers) {
+        ensureNonNull("Target feature type", type);
+        ensureNonNull("Attribute mappers", attributeMappers);
+        this.type = type;
+        this.attributeMappers = Collections.unmodifiableList(new ArrayList<>(attributeMappers));
+    }
+
+    Feature read(final ResultSet cursor) throws SQLException {
+        final Feature result = readAttributes(cursor);
+        addImports(result, cursor);
+        addExports(result);
+        return result;
+    }
+
+    private void addImports(final Feature target, final ResultSet cursor) {
+        // TODO: see Features class
+    }
+
+    private void addExports(final Feature target) {
+        // TODO: see Features class
+    }
+
+    private Feature readAttributes(final ResultSet cursor) throws SQLException {
+        final Feature result = type.newInstance();
+        for (PropertyMapper mapper : attributeMappers) mapper.read(cursor, result);
+        return result;
+    }
+
+    List<Feature> prefetch(final int size, final ResultSet cursor) throws SQLException {
+        // TODO: optimize by resolving import associations by  batch import fetching.
+        final ArrayList<Feature> features = new ArrayList<>(size);
+        for (int i = 0 ; i < size && cursor.next() ; i++) {
+            features.add(read(cursor));
+        }
+
+        return features;
+    }
+
+    static class PropertyMapper {
+        // TODO: by using a indexed implementation of Feature, we could avoid the name mapping. However, a JMH benchmark
+        // would be required in order to be sure it's impacting performance positively.
+        final String propertyName;
+        final int columnIndex;
+        final ColumnAdapter fetchValue;
+
+        PropertyMapper(String propertyName, int columnIndex, ColumnAdapter fetchValue) {
+            this.propertyName = propertyName;
+            this.columnIndex = columnIndex;
+            this.fetchValue = fetchValue;
+        }
+
+        private void read(ResultSet cursor, Feature target) throws SQLException {
+            final Object value = fetchValue.apply(cursor, columnIndex);
+            if (value != null) target.setPropertyValue(propertyName, value);
+        }
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
index a596e7b..2950311 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -32,9 +33,14 @@ import java.util.Map;
 import java.util.Spliterator;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import org.opengis.feature.Feature;
 import org.opengis.feature.FeatureType;
+import org.opengis.filter.Filter;
+import org.opengis.filter.sort.SortBy;
+import org.opengis.util.GenericName;
 
 import org.apache.sis.internal.metadata.sql.SQLBuilder;
 import org.apache.sis.storage.DataStoreException;
@@ -46,9 +52,6 @@ import org.apache.sis.util.collection.WeakValueHashMap;
 import static org.apache.sis.util.ArgumentChecks.ensureNonEmpty;
 import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
 
-// Branch-dependent imports
-
-
 /**
  * Iterator over feature instances.
  *
@@ -81,13 +84,13 @@ final class Features implements Spliterator<Feature> {
      * Imported or exported features read by {@code dependencies[i]} will be stored in
      * the association named {@code associationNames[i]}.
      */
-    private final String[] associationNames;
+    private final GenericName[] associationNames;
 
     /**
      * Name of the property where to store the association that we can not handle with other {@link #dependencies}.
      * This deferred association may exist because of circular dependency.
      */
-    private final String deferredAssociation;
+    private final GenericName deferredAssociation;
 
     /**
      * The feature sets referenced through foreigner keys, or {@link #EMPTY} if none.
@@ -159,7 +162,7 @@ final class Features implements Spliterator<Feature> {
      * @param following         the relations that we are following. Used for avoiding never ending loop.
      * @param noFollow          relation to not follow, or {@code null} if none.
      * @param distinct          True if we should return only distinct result, false otherwise.
-     * @param offset            An offset (nuber of rows to skip) in underlying SQL query. A negative or zeero value
+     * @param offset            An offset (number of rows to skip) in underlying SQL query. A negative or zero value
      *                          means no offset will be set.
      * @param limit             Maximum number of rows to return. Corresponds to a LIMIT statement in underlying SQL
      *                          query. A negative or 0 value means no limit will be set.
@@ -174,7 +177,7 @@ final class Features implements Spliterator<Feature> {
         attributeNames = new String[attributeColumns.length];
         int i = 0;
         for (ColumnRef column : columns) {
-            attributeColumns[i] = column.name;
+            attributeColumns[i] = column.getColumnName();
             attributeNames[i++] = column.getAttributeName();
         }
         this.featureType = table.featureType;
@@ -183,7 +186,7 @@ final class Features implements Spliterator<Feature> {
         /*
          * Create a SELECT clause with all columns that are ordinary attributes. Order matter, since 'Features'
          * iterator will map the columns to the attributes listed in the 'attributeNames' array in that order.
-         * Moreover, we optionaly add a "distinct" clause on user request.
+         * Moreover, we optionally add a "distinct" clause on user request.
          */
         final SQLBuilder sql = new SQLBuilder(metadata, true).append("SELECT");
         if (distinct) sql.append(" DISTINCT");
@@ -211,9 +214,9 @@ final class Features implements Spliterator<Feature> {
             foreignerKeyIndices = null;
             deferredAssociation = null;
         } else {
-            String deferredAssociation = null;
+            GenericName deferredAssociation = null;
             final Features[]     dependencies = new Features[totalCount];
-            final String[]   associationNames = new String  [totalCount];
+            final GenericName[]  associationNames = new GenericName[totalCount];
             final int[][] foreignerKeyIndices = new int     [totalCount][];
             /*
              * For each foreigner key to another table, append all columns of that foreigner key
@@ -303,13 +306,16 @@ final class Features implements Spliterator<Feature> {
 
     /**
      * If a limit or an offset is appended, a space will be added beforehand to the given builder.
+     *
+     * @implNote We use ANSI notation to get best possible compatibility with possible drivers.
+     *
      * @param toEdit The builder to add offset and limit to.
      * @param offset The offset to use. If  zero or negative, it will be ignored.
      * @param limit the value for limit parameter. If  zero or negative, it will be ignored.
      */
-    private static void addOffsetLimit(final SQLBuilder toEdit, final long offset, final long limit) {
-        if (limit > 0) toEdit.append(" LIMIT ").append(limit);
-        if (offset > 0) toEdit.append(" OFFSET ").append(offset);
+    static void addOffsetLimit(final SQLBuilder toEdit, final long offset, final long limit) {
+        if (offset > 0) toEdit.append(" OFFSET ").append(offset).append(" ROWS");
+        if (limit > 0) toEdit.append(" FETCH NEXT ").append(limit).append(" ROWS ONLY");
     }
 
     /**
@@ -452,7 +458,7 @@ final class Features implements Spliterator<Feature> {
                     }
                     value = dependency.fetchReferenced(null, feature);
                 }
-                feature.setPropertyValue(associationNames[i], value);
+                feature.setPropertyValue(associationNames[i].toString(), value);
             }
             action.accept(feature);
             if (!all) return true;
@@ -485,7 +491,7 @@ final class Features implements Spliterator<Feature> {
         }
         if (owner != null && deferredAssociation != null) {
             for (final Feature feature : features) {
-                feature.setPropertyValue(deferredAssociation, owner);
+                feature.setPropertyValue(deferredAssociation.toString(), owner);
             }
         }
         Object feature;
@@ -530,18 +536,9 @@ final class Features implements Spliterator<Feature> {
         }
     }
 
-    /**
-     * Useful to customiez value retrieval on result sets. Example:
-     * {@code
-     * SQLBiFunction<ResultSet, Integer, Integer> get = ResultSet::getInt;
-     * }
-     * @param <T>
-     * @param <U>
-     * @param <R>
-     */
     @FunctionalInterface
-    interface SQLBiFunction<T, U, R> {
-        R apply(T t, U u) throws SQLException;
+    interface SQLFunction<T, R> {
+        R apply(T t) throws SQLException;
 
         /**
          * Returns a composed function that first applies this function to
@@ -556,51 +553,120 @@ final class Features implements Spliterator<Feature> {
          * applies the {@code after} function
          * @throws NullPointerException if after is null
          */
-        default <V> SQLBiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
+        default <V> SQLFunction<T, V> andThen(Function<? super R, ? extends V> after) {
             ensureNonNull("After function", after);
-            return (T t, U u) -> after.apply(apply(t, u));
+            return t -> after.apply(apply(t));
         }
     }
 
-    static class Builder {
+    static class Builder implements QueryBuilder {
 
         final Table parent;
         long limit, offset;
+        SortBy[] sort;
+
         boolean distinct;
 
         Builder(Table parent) {
             this.parent = parent;
         }
 
+        Builder where(final Filter filter) {
+            throw new UnsupportedOperationException("TODO");
+        }
+
+        Builder sortBy(final SortBy...sorting) {
+            if (sorting == null || sorting.length < 1) this.sort = null;
+            else this.sort = Arrays.copyOf(sorting, sorting.length);
+            return this;
+        }
+
+        @Override
+        public QueryBuilder limit(long limit) {
+            this.limit = limit;
+            return this;
+        }
+
+        @Override
+        public QueryBuilder offset(long offset) {
+            this.offset = offset;
+            return this;
+        }
+
+        @Override
+        public QueryBuilder distinct(boolean activate) {
+            this.distinct = activate;
+            return this;
+        }
+
+        @Override
+        public Connector select(ColumnRef... columns) {
+            return new TableConnector(this, columns);
+        }
+    }
+
+    static final class TableConnector implements Connector {
+        final Builder source;
+
+        final boolean distinct;
+        final ColumnRef[] columns;
+
+        final SortBy[] sort;
+
+        TableConnector(Builder source, ColumnRef[] columns) {
+            this.source = source;
+            this.distinct = source.distinct;
+            this.columns = columns;
+            this.sort = source.sort == null ? null : Arrays.copyOf(source.sort, source.sort.length);
+        }
+
+        public Stream<Feature> connect(final Connection conn) throws SQLException, DataStoreException {
+            final Features features = new Features(
+                    source.parent, conn,
+                    columns == null || columns.length < 1 ? source.parent.attributes : Arrays.asList(columns),
+                    new ArrayList<>(), null, distinct, source.offset, source.limit
+            );
+            return StreamSupport.stream(features, false);
+        }
+
         /**
          * Warning : This does not work with relations. It is only a rough estimation of the parameterized query.
          * @param count True if a count query must be generated. False for a simple selection.
          * @return A text representing (roughly) the SQL query which will be posted.
          * @throws SQLException If we cannot initialize an sql statement builder.
          */
-        String getSnapshot(final boolean count) throws SQLException {
-            final SQLBuilder sql = new SQLBuilder(parent.dbMeta, true).append("SELECT ");
+        public String estimateStatement(final boolean count) {
+            final SQLBuilder sql = source.parent.createStatement().append("SELECT ");
             if (count) sql.append("COUNT(");
             if (distinct) sql.append("DISTINCT ");
             // If we want a count and no distinct clause is specified, we can query it for a single column.
-            if (count && !distinct) sql.appendIdentifier(parent.attributes.get(0).name);
+            if (count && !distinct) source.parent.attributes.get(0).append(sql);
             else {
-                final Iterator<ColumnRef> it = parent.attributes.iterator();
-                sql.appendIdentifier(it.next().name);
+                final Iterator<ColumnRef> it = source.parent.attributes.iterator();
+                it.next().append(sql);
                 while (it.hasNext()) {
-                    sql.append(',').appendIdentifier(it.next().name);
+                    it.next().append(sql.append(", "));
                 }
             }
 
             if (count) sql.append(')');
-            sql.append(" FROM ").appendIdentifier(parent.name.catalog, parent.name.schema, parent.name.table);
-            addOffsetLimit(sql, offset, limit);
+            sql.append(" FROM ").appendIdentifier(source.parent.name.catalog, source.parent.name.schema, source.parent.name.table);
+
+            if (!count && sort != null && sort.length > 0) {
+                sql.append(" ORDER BY ");
+                append(sql, sort[0]);
+                for (int i = 1 ; i < sort.length ; i++)
+                    append(sql.append(", "), sort[i]);
+            }
+
+            addOffsetLimit(sql, source.offset, source.limit);
 
             return sql.toString();
         }
+    }
 
-        Features build(final Connection conn) throws SQLException, DataStoreException {
-            return new Features(parent, conn, parent.attributes, new ArrayList<>(), null, distinct, offset, limit);
-        }
+    private static void append(SQLBuilder target, SortBy toAppend) {
+        target.appendIdentifier(toAppend.getPropertyName().getPropertyName()).append(" ");
+        if (toAppend.getSortOrder() != null) target.append(toAppend.getSortOrder().toSQL());
     }
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Import.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Import.java
new file mode 100644
index 0000000..f5614e1
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Import.java
@@ -0,0 +1,33 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+class Import {
+
+    final String propertyName;
+
+    final List<String> fkColumns;
+
+    final TableReference target;
+
+    public Import(String propertyName, Collection<String> fkColumns, TableReference target) {
+        this.propertyName = propertyName;
+        this.fkColumns = Collections.unmodifiableList(new ArrayList<>(fkColumns));
+        this.target = target;
+    }
+
+    public String getPropertyName() {
+        return propertyName;
+    }
+
+    public List<String> getFkColumns() {
+        return fkColumns;
+    }
+
+    public TableReference getTarget() {
+        return target;
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
new file mode 100644
index 0000000..1e68fd3
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
@@ -0,0 +1,53 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.sis.util.ArgumentChecks;
+
+/**
+ * Represents SQL primary key constraint. Main information is columns composing the key.
+ *
+ * @author "Alexis Manin (Geomatys)"
+ */
+interface PrimaryKey {
+
+    static Optional<PrimaryKey> create(List<String> cols) {
+        if (cols == null || cols.isEmpty()) return Optional.empty();
+        if (cols.size() == 1) return Optional.of(new Simple(cols.get(0)));
+        return Optional.of(new Composite(cols));
+    }
+
+    //Class<T> getViewType();
+    List<String> getColumns();
+
+    class Simple implements PrimaryKey {
+        final String column;
+
+        public Simple(String column) {
+            this.column = column;
+        }
+
+        @Override
+        public List<String> getColumns() { return Collections.singletonList(column); }
+    }
+
+    class Composite implements PrimaryKey {
+        /**
+         * Name of columns composing primary keys.
+         */
+        private final List<String> columns;
+
+        public Composite(List<String> columns) {
+            ArgumentChecks.ensureNonEmpty("Primary key column names", columns);
+            this.columns = Collections.unmodifiableList(new ArrayList<>(columns));
+        }
+
+        @Override
+        public List<String> getColumns() {
+            return columns;
+        }
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
new file mode 100644
index 0000000..68d798e
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
@@ -0,0 +1,17 @@
+package org.apache.sis.internal.sql.feature;
+
+/**
+ * API to allow overrided SQL Stream to delegate a set of intermediate operations to native driver.
+ */
+interface QueryBuilder {
+
+    QueryBuilder limit(long limit);
+
+    QueryBuilder offset(long offset);
+
+    default QueryBuilder distinct() { return distinct(true); }
+
+    QueryBuilder distinct(boolean activate);
+
+    Connector select(ColumnRef... columns);
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
new file mode 100644
index 0000000..9d027a4
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
@@ -0,0 +1,327 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import javax.sql.DataSource;
+
+import org.opengis.feature.Feature;
+import org.opengis.feature.FeatureType;
+
+import org.apache.sis.internal.metadata.sql.SQLBuilder;
+import org.apache.sis.internal.storage.AbstractFeatureSet;
+import org.apache.sis.storage.DataStoreException;
+import org.apache.sis.util.collection.BackingStoreException;
+
+/**
+ * Stores SQL query given at built time, and execute it when calling {@link #features(boolean) data stream}. Note that
+ * {@link #getType() data type} is defined by analyzing sql statement metadata. Note that user query can be modified at
+ * before execution to adapt various parameters overridable at fetch time as offset and limit through
+ * {@link Stream#skip(long)} and {@link Stream#limit(long)}.
+ *
+ * Note that this component models query result as close as possible, so built data type will be simple feature type (no
+ * association).
+ */
+public class QueryFeatureSet extends AbstractFeatureSet {
+
+    /**
+     * A regex searching for ANSI or PostgreSQL way of defining max number of rows to return. For details, see
+     * <a href="https://www.postgresql.org/docs/current/sql-select.html#SQL-LIMIT">PostgreSQL LIMIT documentation</a>.
+     * Documentation states that value could be a reference to a variable name, so we do not search for a digit.
+     */
+    private static final Pattern LIMIT_PATTERN = Pattern.compile("(?:FETCH|LIMIT)(?:\\s+(?:FIRST|NEXT))?\\s+([^\\s]+)(?:\\s+ROWS?)?(?:\\s+ONLY)?", Pattern.CASE_INSENSITIVE);
+    /**
+     * Search for ANSI or PostgreSQL way of defining a number of rows to skip when returning results. For details, see
+     * <a href="https://www.postgresql.org/docs/current/sql-select.html#SQL-LIMIT">PostgreSQL LIMIT documentation</a>.
+     */
+    private static final Pattern OFFSET_PATTERN = Pattern.compile("OFFSET\\s+([^\\s]+)(?:\\s+ROWS?)?", Pattern.CASE_INSENSITIVE);
+
+    /**
+     * Check for a selection of distinct rows.
+     */
+    private static final Pattern DISTINCT_PATTERN = Pattern.compile("^\\s*SELECT\\s+DISTINCT", Pattern.CASE_INSENSITIVE);
+
+    /**
+     * Keep builder to allow native limit and offset through stream operation.
+     */
+    private final SQLBuilder queryBuilder;
+
+    /**
+     * SQL database handler. Used to open new connections at query time.
+     */
+    private final DataSource source;
+
+    /**
+     * Component in charge of conversion from SQL entry to Feature entity. Also provides output data type.
+     */
+    private final FeatureAdapter adapter;
+
+    /**
+     * Offset and limit defined in user query, if any. If none is found, or we cannot determine safely their value (not
+     * specified as a literal but as a variable name), values will be set to -1.
+     *
+     * @implNote  BE CAREFUL ! We use these fields for optimisations. We remove values from user query if safely
+     * identified, and add them only at query time, with additional skip/offset defined through stream or java query.
+     */
+    private final long originOffset, originLimit;
+
+    /**
+     * A flag indicating that we've safely identified a {@literal DISTINCT} keyword in the user query.
+     */
+    private final boolean distinct;
+
+    /**
+     * Same as {@link #QueryFeatureSet(SQLBuilder, DataSource, Connection)}, except query is provided by a fixed text
+     * instead of a builder.
+     */
+    public QueryFeatureSet(String query, DataSource source, Connection conn) throws SQLException {
+        this(fromQuery(query, conn), source, conn);
+    }
+
+    /**
+     * Create a new feature set whose data is provided by given user query. Note that query will be compiled now, but
+     * only executed when {@link #features(boolean) acquiring a data stream}.
+     *
+     * @param queryBuilder Contains user-defined SQL query. It must contains a valid and complete SQL statement, because
+     *                     it will be compiled at built time to define feature type. Note that we will make a copy of
+     *                     it, so any modification done after this feature set is created won't have any effect on it.
+     * @param source A database pointer we'll keep, to create new connections when {@link #features(boolean) fetching data}.
+     * @param conn Serves for compiling user query, thus creating output data type. The connection is not kept nor
+     *             re-used after constructor, so you can close it immediately after. We require it, so we do not force
+     *             opening a new connection if user already has one ready on its end.
+     * @throws SQLException If input query compiling or analysis of its metadata fails.
+     */
+    public QueryFeatureSet(SQLBuilder queryBuilder, DataSource source, Connection conn) throws SQLException {
+        this(queryBuilder, new Analyzer(source, conn.getMetaData(), null, null), source, conn);
+    }
+
+
+    /**
+     * See {@link #QueryFeatureSet(SQLBuilder, DataSource, Connection)} for details.
+     *
+     * @param analyzer SIS sql analyzer, used for query metadata analysis. Not nullable. If you do not have any, you
+     *                 can use {@link #QueryFeatureSet(SQLBuilder, DataSource, Connection) another constructor}.
+     */
+    QueryFeatureSet(SQLBuilder queryBuilder, Analyzer analyzer, DataSource source, Connection conn) throws SQLException {
+        super(analyzer.listeners);
+        this.source = source;
+
+        String sql = queryBuilder.toString();
+        try (PreparedStatement statement = conn.prepareStatement(sql)) {
+            final SQLTypeSpecification spec = analyzer.create(statement, sql, null);
+            adapter = analyzer.buildAdapter(spec);
+        }
+
+        /* We will now try to parse offset and limit from input query. If we encounter unsupported/ambiguous case,
+         * we will fallback to pure java management of additional limit and offset.
+         * If we successfully retrieve offset and limit, we'll modify user query to take account of additional
+         * parameters given later.
+         */
+        long tmpOffset = 0, tmpLimit = 0;
+        try {
+            Matcher matcher = OFFSET_PATTERN.matcher(sql);
+            if (matcher.find()) tmpOffset = Long.parseLong(matcher.group(1));
+            if (matcher.find()) throw new UnsupportedOperationException("More than one offset in the query.");
+            sql = matcher.replaceFirst("");
+
+            matcher = LIMIT_PATTERN.matcher(sql);
+            if (matcher.find()) tmpLimit = Long.parseLong(matcher.group(1));
+            if (matcher.find()) throw new UnsupportedOperationException("More than one limit in the query.");
+            sql = matcher.replaceFirst("");
+        } catch (RuntimeException e) {
+            sql = source.toString();
+            tmpOffset = -1;
+            tmpLimit = -1;
+        }
+
+        distinct = DISTINCT_PATTERN.matcher(sql).find();
+
+        originOffset = tmpOffset;
+        originLimit = tmpLimit;
+
+        // Defensive copy
+        this.queryBuilder = new SQLBuilder(queryBuilder);
+        this.queryBuilder.append(sql);
+    }
+
+    /**
+     * Acquire a connection over parent database, forcing a few parameters to ensure optimal read performance and
+     * limiting user rights :
+     * <ul>
+     *     <li>{@link Connection#setAutoCommit(boolean) auto-commit} to false</li>
+     *     <li>{@link Connection#setReadOnly(boolean) querying read-only}</li>
+     * </ul>
+     *
+     * @param source Database pointer to create connection from.
+     * @return A new connection to database, with deactivated auto-commit.
+     * @throws SQLException If we cannot create a new connection. See {@link DataSource#getConnection()} for details.
+     */
+    public static Connection connectReadOnly(final DataSource source) throws SQLException {
+        final Connection c = source.getConnection();
+        try {
+            c.setAutoCommit(false);
+            c.setReadOnly(true);
+        } catch (SQLException e) {
+            try {
+                c.close();
+            } catch (RuntimeException|SQLException bis) {
+                e.addSuppressed(bis);
+            }
+            throw e;
+        }
+        return c;
+    }
+
+    @Override
+    public FeatureType getType() {
+        return adapter.type;
+    }
+
+    @Override
+    public Stream<Feature> features(boolean parallel) {
+        return new StreamSQL(new QueryAdapter(queryBuilder), source);
+    }
+
+    private class QueryAdapter implements QueryBuilder {
+
+        private final SQLBuilder source;
+
+        private long additionalOffset, additionalLimit;
+
+        QueryAdapter(SQLBuilder source) {
+            // defensive copy
+            this.source = new SQLBuilder(source);
+            this.source.append(source.toString());
+        }
+
+        @Override
+        public QueryBuilder limit(long limit) {
+            additionalLimit = limit;
+            return this;
+        }
+
+        @Override
+        public QueryBuilder offset(long offset) {
+            additionalOffset = offset;
+            return this;
+        }
+
+        @Override
+        public QueryBuilder distinct(boolean activate) {
+            if (distinct == activate) return this;
+            throw new UnsupportedOperationException("Not supported yet: modifying user query"); // "Alexis Manin (Geomatys)" on 24/09/2019
+        }
+
+        @Override
+        public Connector select(ColumnRef... columns) {
+            if (columns == null || columns.length < 1) {
+                long javaOffset = 0, nativeOffset = 0, javaLimit = 0, nativeLimit = 0;
+                if (originOffset < 0) {
+                    javaOffset = this.additionalOffset;
+                } else if (originOffset > 0 || additionalOffset > 0) {
+                    nativeOffset = originOffset + additionalOffset;
+                }
+
+                if (originLimit < 0) {
+                    javaLimit = this.additionalLimit;
+                } else if (originLimit > 0 || additionalLimit > 0) {
+                    nativeLimit = Math.min(originLimit, additionalLimit);
+                }
+
+                Features.addOffsetLimit(source, nativeOffset, nativeLimit);
+                return new PreparedQueryConnector(source.toString(), javaOffset, javaLimit);
+            }
+            throw new UnsupportedOperationException("Not supported yet: modifying user query"); // "Alexis Manin (Geomatys)" on 24/09/2019
+        }
+    }
+
+    private class PreparedQueryConnector implements Connector {
+
+        final String sql;
+        private long additionalOffset, additionalLimit;
+
+        private PreparedQueryConnector(String sql, long additionalOffset, long additionalLimit) {
+            this.sql = sql;
+            this.additionalOffset = additionalOffset;
+            this.additionalLimit = additionalLimit;
+        }
+
+        @Override
+        public Stream<Feature> connect(Connection connection) throws SQLException, DataStoreException {
+            final PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            final ResultSet result = statement.executeQuery();
+
+            Stream<Feature> stream = StreamSupport.stream(new ResultSpliterator(result), false);
+            if (additionalLimit > 0) stream = stream.limit(additionalLimit);
+            if (additionalOffset > 0) stream = stream.skip(additionalOffset);
+
+            return stream.onClose(() -> {
+                try (
+                        final AutoCloseable rc = result::close;
+                        final AutoCloseable sc = statement::close;
+                ) {
+                    // No-op. Using try with resource allows to manage closing of second resource even if first one throws an error.
+                } catch (Exception e) {
+                    QueryFeatureSet.this.warning(e);
+                }
+            });
+        }
+
+        @Override
+        public String estimateStatement(boolean count) {
+            throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin (Geomatys)" on 24/09/2019
+        }
+    }
+
+    private class ResultSpliterator implements Spliterator<Feature> {
+
+        final ResultContext result;
+
+        private ResultSpliterator(ResultSet result) {
+            this.result = new ResultContext(result);
+        }
+
+        @Override
+        public boolean tryAdvance(Consumer<? super Feature> action) {
+            try {
+                if (result.source.next()) {
+                    final Feature f = adapter.read(result.source);
+                    action.accept(f);
+                    return true;
+                } else return false;
+            } catch (SQLException e) {
+                throw new BackingStoreException("Cannot advance in SQL query result", e);
+            }
+        }
+
+        @Override
+        public Spliterator<Feature> trySplit() {
+            return null;
+        }
+
+        @Override
+        public long estimateSize() {
+            // TODO: economic size estimation ? A count query seems overkill for the aim of this API. Howver, we could
+            // analyze user query in search for a limit value.
+            return originLimit > 0? originLimit : Long.MAX_VALUE;
+        }
+
+        @Override
+        public int characteristics() {
+            // TODO: determine if it's sorted by analysing user query. SIZED is not possible, as limit is an upper threshold.
+            return Spliterator.IMMUTABLE | Spliterator.NONNULL;
+        }
+    }
+
+    private static SQLBuilder fromQuery(final String query, final Connection conn) throws SQLException {
+        return new SQLBuilder(conn.getMetaData(), true)
+                .append(query);
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Relation.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Relation.java
index 140326e..3b7180b 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Relation.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Relation.java
@@ -27,12 +27,16 @@ import java.util.Objects;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.DatabaseMetaData;
+
+import org.opengis.util.GenericName;
+
 import org.apache.sis.internal.util.CollectionsExt;
 import org.apache.sis.internal.metadata.sql.Reflection;
 import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.storage.DataStoreContentException;
 import org.apache.sis.storage.InternalDataStoreException;
 import org.apache.sis.util.collection.TreeTable;
+import org.apache.sis.util.iso.Names;
 import org.apache.sis.util.resources.Errors;
 import org.apache.sis.util.Debug;
 
@@ -58,7 +62,7 @@ import org.apache.sis.util.Debug;
  * @since   1.0
  * @module
  */
-final class Relation extends TableReference {
+public final class Relation extends TableReference {
     /**
      * Whether another table is <em>using</em> or is <em>used by</em> the table containing the {@link Relation}.
      */
@@ -152,7 +156,7 @@ final class Relation extends TableReference {
      * The name of the feature property where the association to {@link #searchTable} table will be stored.
      * Shall be set exactly once.
      */
-    String propertyName;
+    GenericName propertyName;
 
     /**
      * Whether the {@link #columns} map include all primary key columns. This field is set to {@code false}
@@ -216,10 +220,10 @@ final class Relation extends TableReference {
      */
     final void setPropertyName(final String column, final int count) {
         if (columns.size() > 1) {
-            propertyName = freeText;        // Foreigner key name (may be null).
+            propertyName = Names.createGenericName(null, ":", "sis", freeText);        // Foreigner key name (may be null).
         }
         if (propertyName == null) {
-            propertyName = (count == 0) ? column : column + '-' + count;
+            propertyName = Names.createGenericName(null, ":", "sis", (count == 0) ? column : column + '-' + count);
         }
     }
 
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ResultContext.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ResultContext.java
new file mode 100644
index 0000000..c7ef3df
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/ResultContext.java
@@ -0,0 +1,25 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.ResultSet;
+
+class ResultContext {
+    final ResultSet source;
+
+    ResultContext(ResultSet source) {
+        this.source = source;
+    }
+
+    Cell cell(int columnIndex, String propertyName) {
+        return new Cell(columnIndex, propertyName);
+    }
+
+    class Cell {
+        final int colIdx;
+        final String propertyName;
+
+        private Cell(int colIdx, String propertyName) {
+            this.colIdx = colIdx;
+            this.propertyName = propertyName;
+        }
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLBiFunction.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLBiFunction.java
new file mode 100644
index 0000000..82414a7
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLBiFunction.java
@@ -0,0 +1,38 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.SQLException;
+import java.util.function.Function;
+
+import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
+
+/**
+ * Useful to customize value retrieval on result sets. Example:
+ * {@code
+ * SQLBiFunction<ResultSet, Integer, Integer> get = ResultSet::getInt;
+ * }
+ * @param <T> Type of the first arguement of the function.
+ * @param <U> Type of the second argument of the function.
+ * @param <R> Type of the function result.
+ */
+@FunctionalInterface
+interface SQLBiFunction<T, U, R> {
+    R apply(T t, U u) throws SQLException;
+
+    /**
+     * Returns a composed function that first applies this function to
+     * its input, and then applies the {@code after} function to the result.
+     * If evaluation of either function throws an exception, it is relayed to
+     * the caller of the composed function.
+     *
+     * @param <V> the type of output of the {@code after} function, and of the
+     *           composed function
+     * @param after the function to apply after this function is applied
+     * @return a composed function that first applies this function and then
+     * applies the {@code after} function
+     * @throws NullPointerException if after is null
+     */
+    default <V> SQLBiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
+        ensureNonNull("After function", after);
+        return (T t, U u) -> after.apply(apply(t, u));
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLColumn.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLColumn.java
new file mode 100644
index 0000000..d92fe8c
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLColumn.java
@@ -0,0 +1,58 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.ResultSetMetaData;
+import java.util.Optional;
+
+import org.opengis.referencing.crs.CoordinateReferenceSystem;
+
+class SQLColumn {
+    final int type;
+    final String typeName;
+    private final boolean isNullable;
+    private final ColumnRef naming;
+    private final int precision;
+
+    SQLColumn(int type, String typeName, boolean isNullable, ColumnRef naming, int precision) {
+        this.type = type;
+        this.typeName = typeName;
+        this.isNullable = isNullable;
+        this.naming = naming;
+        this.precision = precision;
+    }
+
+    public ColumnRef getName() {
+        return naming;
+    }
+
+    public int getType() {
+        return type;
+    }
+
+    public String getTypeName() {
+        return typeName;
+    }
+
+    public boolean isNullable() {
+        return isNullable;
+    }
+
+    /**
+     * Same as {@link ResultSetMetaData#getPrecision(int)}.
+     * @return 0 if unknown. For texts, maximum number of characters allowed. For numerics, max precision. For blobs,
+     * number of bytes allowed.
+     */
+    public int getPrecision() {
+        return precision;
+    }
+
+    /**
+     * TODO: implement.
+     * Note : This method could be used not only for geometric fields, but also on numeric ones representing 1D
+     * systems.
+     *
+     * @return null for now, implementation needed.
+     */
+    public Optional<CoordinateReferenceSystem> getCrs() {
+        return Optional.empty();
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLQueryAdapter.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLQueryAdapter.java
new file mode 100644
index 0000000..c6b3dc4
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLQueryAdapter.java
@@ -0,0 +1,56 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.opengis.filter.Filter;
+import org.opengis.filter.sort.SortBy;
+
+import org.apache.sis.internal.storage.SubsetAdapter;
+import org.apache.sis.internal.storage.query.SimpleQuery;
+import org.apache.sis.storage.FeatureSet;
+
+public class SQLQueryAdapter implements SubsetAdapter.AdapterBuilder {
+
+    final Table parent;
+
+    private SimpleQuery.Column[] columns;
+    private SortBy[] sorting;
+
+    public SQLQueryAdapter(Table parent) {
+        this.parent = parent;
+    }
+
+    @Override
+    public long offset(long offset) {
+        return offset; // Done by stream overload
+    }
+
+    @Override
+    public long limit(long limit) {
+        return limit; // Done by stream overload
+    }
+
+    @Override
+    public Filter filter(Filter filter) {
+        throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin (Geomatys)" on 18/09/2019
+    }
+
+    @Override
+    public boolean sort(SortBy[] comparison) {
+        sorting = Arrays.copyOf(comparison, comparison.length);
+        return true;
+    }
+
+    @Override
+    public SimpleQuery.Column[] select(List<SimpleQuery.Column> columns) {
+        this.columns = columns.toArray(new SimpleQuery.Column[columns.size()]);
+        return null;
+    }
+
+    @Override
+    public Optional<FeatureSet> build() {
+        throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin (Geomatys)" on 18/09/2019
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLQueryBuilder.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLQueryBuilder.java
new file mode 100644
index 0000000..55337d7
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLQueryBuilder.java
@@ -0,0 +1,28 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import javax.sql.DataSource;
+
+import org.apache.sis.internal.metadata.sql.SQLBuilder;
+import org.apache.sis.storage.DataStoreException;
+import org.apache.sis.storage.FeatureSet;
+
+public class SQLQueryBuilder extends SQLBuilder {
+
+    final DataSource source;
+
+    public SQLQueryBuilder(DataSource source, final DatabaseMetaData metadata, final boolean quoteSchema) throws SQLException {
+        super(metadata, quoteSchema);
+        this.source = source;
+    }
+
+    public FeatureSet build(final Connection connection) throws SQLException, DataStoreException {
+        final Analyzer analyzer = new Analyzer(source, connection.getMetaData(), null, null);
+        // TODO: defensive copy of this builder.
+        return new QueryFeatureSet(this, analyzer, source, connection);
+    }
+
+
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java
new file mode 100644
index 0000000..0e35edf
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java
@@ -0,0 +1,35 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Optional;
+
+import org.opengis.util.GenericName;
+
+import org.apache.sis.storage.DataStoreContentException;
+
+interface SQLTypeSpecification {
+    /**
+     *
+     * @return Name for the feature type to build. Nullable.
+     * @throws SQLException If an error occurs while retrieving information from database.
+     */
+    GenericName getName() throws SQLException;
+
+    /**
+     *
+     * @return A succint description of the data source. Nullable.
+     * @throws SQLException If an error occurs while retrieving information from database.
+     */
+    String getDefinition() throws SQLException;
+
+    Optional<PrimaryKey> getPK() throws SQLException;
+
+    List<SQLColumn> getColumns();
+
+    List<Relation> getImports() throws SQLException;
+
+    List<Relation> getExports() throws SQLException, DataStoreContentException;
+
+    default Optional<String> getPrimaryGeometryColumn() {return Optional.empty();}
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java
index d01dbdf..2975d95 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SpatialFunctions.java
@@ -24,14 +24,20 @@ import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.time.Instant;
+import java.time.LocalTime;
 import java.time.OffsetDateTime;
 import java.time.OffsetTime;
+import java.time.ZoneOffset;
+import java.util.function.Function;
 
 import org.opengis.referencing.crs.CoordinateReferenceSystem;
 
 import org.apache.sis.internal.metadata.sql.Reflection;
 import org.apache.sis.setup.GeometryLibrary;
 
+import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
+
 
 /**
  * Access to functions provided by geospatial databases.
@@ -98,37 +104,61 @@ class SpatialFunctions {
      * @return corresponding java type, or {@code null} if unknown.
      */
     @SuppressWarnings("fallthrough")
-    protected Class<?> toJavaType(final int sqlType, final String sqlTypeName) {
+    protected ColumnAdapter<?> toJavaType(final int sqlType, final String sqlTypeName) {
         switch (sqlType) {
             case Types.BIT:
-            case Types.BOOLEAN:                 return Boolean.class;
-            case Types.TINYINT:                 if (!isByteUnsigned) return Byte.class;         // else fallthrough.
-            case Types.SMALLINT:                return Short.class;
-            case Types.INTEGER:                 return Integer.class;
-            case Types.BIGINT:                  return Long.class;
-            case Types.REAL:                    return Float.class;
+            case Types.BOOLEAN:                 return forceCast(Boolean.class);
+            case Types.TINYINT:                 if (!isByteUnsigned) return forceCast(Byte.class);  // else fallthrough.
+            case Types.SMALLINT:                return forceCast(Short.class);
+            case Types.INTEGER:                 return forceCast(Integer.class);
+            case Types.BIGINT:                  return forceCast(Long.class);
+            case Types.REAL:                    return forceCast(Float.class);
             case Types.FLOAT:                   // Despite the name, this is implemented as DOUBLE in major databases.
-            case Types.DOUBLE:                  return Double.class;
+            case Types.DOUBLE:                  return forceCast(Double.class);
             case Types.NUMERIC:                 // Similar to DECIMAL except that it uses exactly the specified precision.
-            case Types.DECIMAL:                 return BigDecimal.class;
+            case Types.DECIMAL:                 return forceCast(BigDecimal.class);
             case Types.CHAR:
             case Types.VARCHAR:
-            case Types.LONGVARCHAR:             return String.class;
-            case Types.DATE:                    return Date.class;
-            case Types.TIME:                    return Time.class;
-            case Types.TIMESTAMP:               return Timestamp.class;
-            case Types.TIME_WITH_TIMEZONE:      return OffsetTime.class;
-            case Types.TIMESTAMP_WITH_TIMEZONE: return OffsetDateTime.class;
+            case Types.LONGVARCHAR:             return new ColumnAdapter<>(String.class, ResultSet::getString);
+            case Types.DATE:                    return new ColumnAdapter<>(Date.class, ResultSet::getDate);
+            case Types.TIME:                    return new ColumnAdapter<>(LocalTime.class, SpatialFunctions::toLocalTime);
+            case Types.TIMESTAMP:               return new ColumnAdapter<>(Instant.class, SpatialFunctions::toInstant);
+            case Types.TIME_WITH_TIMEZONE:      return new ColumnAdapter<>(OffsetTime.class, SpatialFunctions::toOffsetTime);
+            case Types.TIMESTAMP_WITH_TIMEZONE: return new ColumnAdapter<>(OffsetDateTime.class, SpatialFunctions::toODT);
             case Types.BINARY:
             case Types.VARBINARY:
-            case Types.LONGVARBINARY:           return byte[].class;
-            case Types.ARRAY:                   return Object[].class;
+            case Types.LONGVARBINARY:           return new ColumnAdapter<>(byte[].class, ResultSet::getBytes);
+            case Types.ARRAY:                   return forceCast(Object[].class);
             case Types.OTHER:                   // Database-specific accessed via getObject and setObject.
-            case Types.JAVA_OBJECT:             return Object.class;
+            case Types.JAVA_OBJECT:             return new ColumnAdapter<>(Object.class, ResultSet::getObject);
             default:                            return null;
         }
     }
 
+    private static LocalTime toLocalTime(ResultSet source, int columnIndex) throws SQLException {
+        final Time time = source.getTime(columnIndex);
+        return time == null ? null : time.toLocalTime();
+    }
+
+    private static Instant toInstant(ResultSet source, int columnIndex) throws SQLException {
+        final Timestamp t = source.getTimestamp(columnIndex);
+        return t == null ? null : t.toInstant();
+    }
+
+    private static OffsetDateTime toODT(ResultSet source, int columnIndex) throws SQLException {
+        final Timestamp t = source.getTimestamp(columnIndex);
+        final int offsetMinute = t.getTimezoneOffset();
+        return t == null ? null : t.toInstant()
+                .atOffset(ZoneOffset.ofHoursMinutes(offsetMinute / 60, offsetMinute % 60));
+    }
+
+    private static OffsetTime toOffsetTime(ResultSet source, int columnIndex) throws SQLException {
+        final Time t = source.getTime(columnIndex);
+        final int offsetMinute = t.getTimezoneOffset();
+        return t == null ? null : t.toLocalTime()
+                .atOffset(ZoneOffset.ofHoursMinutes(offsetMinute / 60, offsetMinute % 60));
+    }
+
     /**
      * Creates the Coordinate Reference System associated to the the geometry SRID of a given column.
      * The {@code reflect} argument is the result of a call to {@link DatabaseMetaData#getColumns
@@ -143,4 +173,35 @@ class SpatialFunctions {
     protected CoordinateReferenceSystem createGeometryCRS(ResultSet reflect) throws SQLException {
         return null;
     }
+
+    private static <T> ColumnAdapter<T> forceCast(final Class<T> targetType) {
+        return new ColumnAdapter<>(targetType, (r, i) -> forceCast(targetType, r, i));
+    }
+
+    private static <T> T forceCast(final Class<T> targetType, ResultSet source, final Integer columnIndex) throws SQLException {
+        final Object value = source.getObject(columnIndex);
+        return value == null ? null : targetType.cast(value);
+    }
+
+    protected static class ColumnAdapter<T> implements SQLBiFunction<ResultSet, Integer, T> {
+        final Class<T> javaType;
+        private final SQLBiFunction<ResultSet, Integer, T> fetchValue;
+
+        protected ColumnAdapter(Class<T> javaType, SQLBiFunction<ResultSet, Integer, T> fetchValue) {
+            ensureNonNull("Result java type", javaType);
+            ensureNonNull("Function for value retrieval", fetchValue);
+            this.javaType = javaType;
+            this.fetchValue = fetchValue;
+        }
+
+        @Override
+        public T apply(ResultSet resultSet, Integer integer) throws SQLException {
+            return fetchValue.apply(resultSet, integer);
+        }
+
+        @Override
+        public <V> SQLBiFunction<ResultSet, Integer, V> andThen(Function<? super T, ? extends V> after) {
+            return fetchValue.andThen(after);
+        }
+    }
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
index 7a295b2..925da4e 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
@@ -32,12 +32,12 @@ import java.util.function.Supplier;
 import java.util.function.ToDoubleFunction;
 import java.util.function.ToIntFunction;
 import java.util.function.ToLongFunction;
+import java.util.logging.Level;
 import java.util.stream.DoubleStream;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
-
 import javax.sql.DataSource;
 
 import org.opengis.feature.Feature;
@@ -46,8 +46,10 @@ import org.apache.sis.internal.util.DoubleStreamDecoration;
 import org.apache.sis.internal.util.StreamDecoration;
 import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.util.collection.BackingStoreException;
+import org.apache.sis.util.logging.Logging;
 
 import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
+import static org.apache.sis.util.ArgumentChecks.ensurePositive;
 
 /**
  * Manages query lifecycle and optimizations. Operations like {@link #count()}, {@link #distinct()}, {@link #skip(long)}
@@ -64,17 +66,29 @@ import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
  */
 class StreamSQL extends StreamDecoration<Feature> {
 
-    final Features.Builder queryBuilder;
+    private final QueryBuilder queryAdapter;
+
+    private final DataSource source;
+
     boolean parallel;
 
     private Consumer<? super Feature> peekAction;
 
+    private long limit = 0, offset = 0;
+
+    private Consumer<SQLException> warningConsumer = e -> Logging.getLogger("sis.sql").log(Level.FINE, "Cannot properly close a connection", e);
+
     StreamSQL(final Table source) {
-        this(new Features.Builder(source));
+        this(new Features.Builder(source), source.source);
     }
 
-    StreamSQL(Features.Builder builder) {
-        this.queryBuilder = builder;
+    StreamSQL(QueryBuilder queryAdapter, DataSource source) {
+        this.queryAdapter = queryAdapter;
+        this.source = source;
+    }
+
+    public void setWarningConsumer(Consumer<SQLException> warningConsumer) {
+        this.warningConsumer = warningConsumer;
     }
 
     @Override
@@ -111,8 +125,13 @@ class StreamSQL extends StreamDecoration<Feature> {
 
     @Override
     public Stream<Feature> distinct() {
-        queryBuilder.distinct = true;
-        return this;
+        try {
+            queryAdapter.distinct();
+            return this;
+        } catch (UnsupportedOperationException e) {
+            // TODO: emit warning
+            return super.distinct();
+        }
     }
 
     @Override
@@ -130,27 +149,30 @@ class StreamSQL extends StreamDecoration<Feature> {
 
     @Override
     public Stream<Feature> limit(long maxSize) {
-        if (queryBuilder.limit < 1) queryBuilder.limit = maxSize;
-        else queryBuilder.limit = Math.min(queryBuilder.limit, maxSize);
+        ensurePositive("Limit", maxSize);
+        if (limit < 1) limit = maxSize;
+        else limit = Math.min(limit, maxSize);
         return this;
     }
 
     @Override
     public Stream<Feature> skip(long n) {
-        queryBuilder.offset += n;
+        ensurePositive("Offset", n);
+        offset += n;
         return this;
     }
 
+    private Connector select() {
+        queryAdapter.offset(offset);
+        queryAdapter.limit(limit);
+        return queryAdapter.select();
+    }
+
     @Override
     public long count() {
         // Avoid opening a connection if sql text cannot be evaluated.
-        final String sql;
-        try {
-            sql = queryBuilder.getSnapshot(true);
-        } catch (SQLException e) {
-            throw new BackingStoreException("Cannot create SQL COUNT query", e);
-        }
-        try (Connection conn = queryBuilder.parent.source.getConnection()) {
+        final String sql = select().estimateStatement(true);
+        try (Connection conn = source.getConnection()) {
             try (Statement st = conn.createStatement();
                  ResultSet rs = st.executeQuery(sql)) {
                 if (rs.next()) {
@@ -165,33 +187,19 @@ class StreamSQL extends StreamDecoration<Feature> {
     @Override
     protected synchronized Stream<Feature> createDecoratedStream() {
         final AtomicReference<Connection> connectionRef = new AtomicReference<>();
-        Stream<Feature> featureStream = Stream.of(uncheck(this::connectNoAuto))
+        Stream<Feature> featureStream = Stream.of(uncheck(() -> QueryFeatureSet.connectReadOnly(source)))
                 .map(Supplier::get)
                 .peek(connectionRef::set)
                 .flatMap(conn -> {
                     try {
-                        final Features iter = queryBuilder.build(conn);
-                        return StreamSupport.stream(iter, parallel);
+                        return select().connect(conn);
                     } catch (SQLException | DataStoreException e) {
                         throw new BackingStoreException(e);
                     }
                 })
-                .onClose(() -> queryBuilder.parent.closeRef(connectionRef, true));
+                .onClose(() -> closeRef(connectionRef, true));
         if (peekAction != null) featureStream = featureStream.peek(peekAction);
-        return featureStream;
-    }
-
-    /**
-     * Acquire a connection over {@link Table parent table} database, forcing
-     * {@link Connection#setAutoCommit(boolean) auto-commit} to false.
-     *
-     * @return A new connection to {@link Table parent table} database, with deactivated auto-commit.
-     * @throws SQLException If we cannot create a new connection. See {@link DataSource#getConnection()} for details.
-     */
-    private Connection connectNoAuto() throws SQLException {
-        final Connection conn = queryBuilder.parent.source.getConnection();
-        conn.setAutoCommit(false);
-        return conn;
+        return parallel? featureStream : featureStream.parallel();
     }
 
     /**
@@ -236,12 +244,6 @@ class StreamSQL extends StreamDecoration<Feature> {
         }
 
         @Override
-        public Stream<O> distinct() {
-            source = source.distinct();
-            return this;
-        }
-
-        @Override
         public Stream<O> limit(long maxSize) {
             source = source.limit(maxSize);
             return this;
@@ -332,12 +334,6 @@ class StreamSQL extends StreamDecoration<Feature> {
         }
 
         @Override
-        public DoubleStream distinct() {
-            source = source.distinct();
-            return this;
-        }
-
-        @Override
         public DoubleStream limit(long maxSize) {
             source = source.limit(maxSize);
             return this;
@@ -392,4 +388,16 @@ class StreamSQL extends StreamDecoration<Feature> {
             return o;
         };
     }
+
+    void closeRef(final AtomicReference<Connection> ref, boolean forceRollback) {
+        final Connection conn = ref.get();
+        if (conn != null) {
+            try {
+                if (forceRollback) conn.rollback();
+                conn.close();
+            } catch (SQLException e) {
+                if (warningConsumer != null) warningConsumer.accept(e);
+            }
+        }
+    }
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
index 5a9c555..27c96bc 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
@@ -20,15 +20,12 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.sql.DataSource;
 
@@ -36,26 +33,19 @@ import org.opengis.feature.AttributeType;
 import org.opengis.feature.Feature;
 import org.opengis.feature.FeatureAssociationRole;
 import org.opengis.feature.FeatureType;
-import org.opengis.referencing.crs.CoordinateReferenceSystem;
+import org.opengis.feature.PropertyType;
 import org.opengis.util.GenericName;
 
-import org.apache.sis.feature.builder.AssociationRoleBuilder;
-import org.apache.sis.feature.builder.AttributeRole;
-import org.apache.sis.feature.builder.AttributeTypeBuilder;
-import org.apache.sis.feature.builder.FeatureTypeBuilder;
-import org.apache.sis.internal.feature.Geometries;
 import org.apache.sis.internal.metadata.sql.Reflection;
 import org.apache.sis.internal.metadata.sql.SQLBuilder;
-import org.apache.sis.internal.metadata.sql.SQLUtilities;
 import org.apache.sis.internal.storage.AbstractFeatureSet;
-import org.apache.sis.internal.util.CollectionsExt;
-import org.apache.sis.storage.DataStoreContentException;
+import org.apache.sis.internal.storage.query.SimpleQuery;
 import org.apache.sis.storage.DataStoreException;
+import org.apache.sis.storage.FeatureSet;
 import org.apache.sis.storage.InternalDataStoreException;
-import org.apache.sis.util.CharSequences;
-import org.apache.sis.util.Classes;
+import org.apache.sis.storage.Query;
+import org.apache.sis.storage.UnsupportedQueryException;
 import org.apache.sis.util.Debug;
-import org.apache.sis.util.Numbers;
 import org.apache.sis.util.collection.TreeTable;
 import org.apache.sis.util.collection.WeakValueHashMap;
 
@@ -123,6 +113,8 @@ final class Table extends AbstractFeatureSet {
      */
     final Class<?> primaryKeyClass;
 
+    private final SQLTypeSpecification specification;
+
     /**
      * Feature instances already created for given primary keys. This map is used only when requesting feature
      * instances by identifiers (not for iterating over all features) and those identifiers are primary keys.
@@ -145,6 +137,14 @@ final class Table extends AbstractFeatureSet {
     final DatabaseMetaData dbMeta;
 
     /**
+     * An SQL builder whose sole purpose is to allow creation of new builders without metadata analysis. It allows to
+     * reduce error eventuality, and re-use already  computed information.
+     */
+    private final SQLBuilder sqlTemplate;
+
+    private final FeatureAdapter adapter;
+
+    /**
      * Creates a description of the table of the given name.
      * The table is identified by {@code id}, which contains a (catalog, schema, name) tuple.
      * The catalog and schema parts are optional and can be null, but the table is mandatory.
@@ -159,29 +159,9 @@ final class Table extends AbstractFeatureSet {
     {
         super(analyzer.listeners);
         this.dbMeta = analyzer.metadata;
+        this.sqlTemplate = new SQLBuilder(this.dbMeta, true);
         this.source = analyzer.source;
         this.name   = id;
-        final String tableEsc  = analyzer.escape(id.table);
-        final String schemaEsc = analyzer.escape(id.schema);
-        /*
-         * Get a list of primary keys. We need to know them before to create the attributes,
-         * in order to detect which attributes are used as components of Feature identifiers.
-         * In the 'primaryKeys' map, the boolean tells whether the column uses auto-increment,
-         * with null value meaning that we don't know.
-         *
-         * Note: when a table contains no primary keys, we could still look for index columns
-         * with unique constraint using metadata.getIndexInfo(catalog, schema, table, true).
-         * We don't do that for now because of uncertainties (which index to use if there is
-         * many? If they are suitable as identifiers why they are not primary keys?).
-         */
-        final Map<String,Boolean> primaryKeys = new LinkedHashMap<>();
-        try (ResultSet reflect = analyzer.metadata.getPrimaryKeys(id.catalog, id.schema, id.table)) {
-            while (reflect.next()) {
-                primaryKeys.put(analyzer.getUniqueString(reflect, Reflection.COLUMN_NAME), null);
-                // The actual Boolean value will be fetched in the loop on columns later.
-            }
-        }
-        this.primaryKeys = primaryKeys.isEmpty() ? null : primaryKeys.keySet().toArray(new String[primaryKeys.size()]);
         /*
          * Creates a list of associations between the table read by this method and other tables.
          * The associations are defined by the foreigner keys referencing primary keys. Note that
@@ -193,229 +173,53 @@ final class Table extends AbstractFeatureSet {
          * navigability because the database designer's choice may be driven by the need to support
          * multi-occurrences.
          */
-        final List<Relation> importedKeys = new ArrayList<>();
-        final Map<String, List<Relation>> foreignerKeys = new HashMap<>();
-        try (ResultSet reflect = analyzer.metadata.getImportedKeys(id.catalog, id.schema, id.table)) {
-            if (reflect.next()) do {
-                Relation relation = new Relation(analyzer, Relation.Direction.IMPORT, reflect);
-                importedKeys.add(relation);
-                for (final String column : relation.getForeignerKeys()) {
-                    CollectionsExt.addToMultiValuesMap(foreignerKeys, column, relation);
-                    relation = null;     // Only the first column will be associated.
-                }
-            } while (!reflect.isClosed());
-        }
-        final List<Relation> exportedKeys = new ArrayList<>();
-        try (ResultSet reflect = analyzer.metadata.getExportedKeys(id.catalog, id.schema, id.table)) {
-            if (reflect.next()) do {
-                final Relation export = new Relation(analyzer, Relation.Direction.EXPORT, reflect);
-                if (!export.equals(importedBy)) {
-                    exportedKeys.add(export);
-                }
-            } while (!reflect.isClosed());
-        }
         /*
          * For each column in the table that is not a foreigner key, create an AttributeType of the same name.
          * The Java type is inferred from the SQL type, and the attribute multiplicity in inferred from the SQL
          * nullability. Attribute names are added in the 'attributeNames' and 'attributeColumns' list. Those
          * names are usually the same, except when a column is used both as a primary key and as foreigner key.
          */
-        Class<?> primaryKeyClass   = null;
-        boolean  primaryKeyNonNull = true;
-        boolean  hasGeometry       = false;
-        int startWithLowerCase     = 0;
-        final List<ColumnRef> attributes = new ArrayList<>();
-        final FeatureTypeBuilder feature = new FeatureTypeBuilder(analyzer.nameFactory, analyzer.functions.library, analyzer.locale);
-        try (ResultSet reflect = analyzer.metadata.getColumns(id.catalog, schemaEsc, tableEsc, null)) {
-            while (reflect.next()) {
-                final String         column       = analyzer.getUniqueString(reflect, Reflection.COLUMN_NAME);
-                final boolean        mandatory    = Boolean.FALSE.equals(SQLUtilities.parseBoolean(reflect.getString(Reflection.IS_NULLABLE)));
-                final boolean        isPrimaryKey = primaryKeys.containsKey(column);
-                final List<Relation> dependencies = foreignerKeys.get(column);
-                /*
-                 * Heuristic rule for determining if the column names starts with lower case or upper case.
-                 * Words that are all upper-case are ignored on the assumption that they are acronyms.
-                 */
-                if (!column.isEmpty()) {
-                    final int firstLetter = column.codePointAt(0);
-                    if (Character.isLowerCase(firstLetter)) {
-                        startWithLowerCase++;
-                    } else if (Character.isUpperCase(firstLetter) && !CharSequences.isUpperCase(column)) {
-                        startWithLowerCase--;
-                    }
-                }
 
-                ColumnRef colRef = new ColumnRef(column);
-                /*
-                 * Add the column as an attribute. Foreign keys are excluded (they will be replaced by associations),
-                 * except if the column is also a primary key. In the later case we need to keep that column because
-                 * it is needed for building the feature identifier.
-                 */
-                AttributeTypeBuilder<?> attribute = null;
-                if (isPrimaryKey || dependencies == null) {
-                    final String typeName = reflect.getString(Reflection.TYPE_NAME);
-                    Class<?> type = analyzer.functions.toJavaType(reflect.getInt(Reflection.DATA_TYPE), typeName);
-                    if (type == null) {
-                        analyzer.warning(Resources.Keys.UnknownType_1, typeName);
-                        type = Object.class;
-                    }
-                    attribute = feature.addAttribute(type).setName(column);
-                    if (CharSequence.class.isAssignableFrom(type)) {
-                        final int size = reflect.getInt(Reflection.COLUMN_SIZE);
-                        if (!reflect.wasNull()) {
-                            attribute.setMaximalLength(size);
-                        }
-                    }
-                    if (!mandatory) {
-                        attribute.setMinimumOccurs(0);
-                    }
-                    /*
-                     * Some columns have special purposes: components of primary keys will be used for creating
-                     * identifiers, some columns may contain a geometric object. Adding a role on those columns
-                     * may create synthetic columns, for example "sis:identifier".
-                     */
-                    if (isPrimaryKey) {
-                        attribute.addRole(AttributeRole.IDENTIFIER_COMPONENT);
-                        primaryKeyNonNull &= mandatory;
-                        primaryKeyClass = Classes.findCommonClass(primaryKeyClass, type);
-                        if (primaryKeys.put(column, SQLUtilities.parseBoolean(reflect.getString(Reflection.IS_AUTOINCREMENT))) != null) {
-                            throw new DataStoreContentException(Resources.forLocale(analyzer.locale)
-                                    .getString(Resources.Keys.DuplicatedColumn_1, column));
-                        }
-                    }
-                    if (Geometries.isKnownType(type)) {
-                        final CoordinateReferenceSystem crs = analyzer.functions.createGeometryCRS(reflect);
-                        if (crs != null) {
-                            attribute.setCRS(crs);
-                        }
-                        if (!hasGeometry) {
-                            hasGeometry = true;
-                            attribute.addRole(AttributeRole.DEFAULT_GEOMETRY);
-                        }
-                    }
-                }
-                /*
-                 * If the column is a foreigner key, insert an association to another feature instead.
-                 * If the foreigner key uses more than one column, only one of those columns will become
-                 * an association and other columns will be omitted from the FeatureType (but there will
-                 * still be used in SQL queries). Note that columns may be used by more than one relation.
-                 */
-                if (dependencies != null) {
-                    int count = 0;
-                    for (final Relation dependency : dependencies) {
-                        if (dependency != null) {
-                            final GenericName typeName = dependency.getName(analyzer);
-                            final Table table = analyzer.table(dependency, typeName, id);
-                            /*
-                             * Use the column name as the association name, provided that the foreigner key
-                             * use only that column. If the foreigner key use more than one column, then we
-                             * do not know which column describes better the association (often there is none).
-                             * In such case we use the foreigner key name as a fallback.
-                             */
-                            dependency.setPropertyName(column, count++);
-                            final AssociationRoleBuilder association;
-                            if (table != null) {
-                                dependency.setSearchTable(analyzer, table, table.primaryKeys, Relation.Direction.IMPORT);
-                                association = feature.addAssociation(table.featureType);
-                            } else {
-                                association = feature.addAssociation(typeName);     // May happen in case of cyclic dependency.
-                            }
-                            association.setName(dependency.propertyName);
-                            if (!mandatory) {
-                                association.setMinimumOccurs(0);
-                            }
-                            /*
-                             * If the column is also used in the primary key, then we have a name clash.
-                             * Rename the primary key column with the addition of a "pk:" scope. We rename
-                             * the primary key column instead than this association because the primary key
-                             * column should rarely be used directly.
-                             */
-                            if (attribute != null) {
-                                attribute.setName(analyzer.nameFactory.createGenericName(null, "pk", column));
-                                colRef = colRef.as(attribute.getName().toString());
-                                attribute = null;
-                            }
-                        }
-                    }
-                }
-
-                attributes.add(colRef);
-            }
-        }
-        /*
-         * Add the associations created by other tables having foreigner keys to this table.
-         * We infer the column name from the target type. We may have a name clash with other
-         * columns, in which case an arbitrary name change is applied.
-         */
-        int count = 0;
-        for (final Relation dependency : exportedKeys) {
-            if (dependency != null) {
-                final GenericName typeName = dependency.getName(analyzer);
-                String propertyName = typeName.tip().toString();
-                if (startWithLowerCase > 0) {
-                    final CharSequence words = CharSequences.camelCaseToWords(propertyName, true);
-                    final int first = Character.codePointAt(words, 0);
-                    propertyName = new StringBuilder(words.length())
-                            .appendCodePoint(Character.toLowerCase(first))
-                            .append(words, Character.charCount(first), words.length())
-                            .toString();
-                }
-                final String base = propertyName;
-                while (feature.isNameUsed(propertyName)) {
-                    propertyName = base + '-' + ++count;
-                }
-                dependency.propertyName = propertyName;
-                final Table table = analyzer.table(dependency, typeName, null);   // 'null' because exported, not imported.
-                final AssociationRoleBuilder association;
-                if (table != null) {
-                    dependency.setSearchTable(analyzer, table, this.primaryKeys, Relation.Direction.EXPORT);
-                    association = feature.addAssociation(table.featureType);
-                } else {
-                    association = feature.addAssociation(typeName);     // May happen in case of cyclic dependency.
-                }
-                association.setName(propertyName)
-                           .setMinimumOccurs(0)
-                           .setMaximumOccurs(Integer.MAX_VALUE);
-            }
-        }
         /*
          * If the primary keys uses more than one column, we will need an array to store it.
          * If all columns are non-null numbers, use primitive arrays instead than array of wrappers.
          */
-        if (primaryKeys.size() > 1) {
-            if (primaryKeyNonNull) {
-                primaryKeyClass = Numbers.wrapperToPrimitive(primaryKeyClass);
-            }
-            primaryKeyClass = Classes.changeArrayDimension(primaryKeyClass, 1);
-        }
-        /*
-         * Global information on the feature type (name, remarks).
-         * The remarks are opportunistically stored in id.freeText if known by the caller.
+        this.specification = analyzer.create(id, importedBy);
+        primaryKeys = (String[]) specification.getPK()
+                .map(PrimaryKey::getColumns)
+                .orElse(Collections.EMPTY_LIST)
+                .toArray(new String[0]);
+        this.adapter          = analyzer.buildAdapter(specification);
+        this.featureType      = adapter.type;
+        this.importedKeys     = toArray(specification.getImports());
+        this.exportedKeys     = toArray(specification.getExports());
+        this.primaryKeyClass  = primaryKeys.length < 2? Object.class : Object[].class;
+        this.hasGeometry      = specification.getPrimaryGeometryColumn().isPresent();
+        this.attributes       = Collections.unmodifiableList(
+                specification.getColumns().stream()
+                        .map(SQLColumn::getName)
+                        .collect(Collectors.toList())
+        );
+    }
+
+    @Override
+    public FeatureSet subset(Query query) throws UnsupportedQueryException, DataStoreException {
+        if (!(query instanceof SimpleQuery)) return super.subset(query);
+        boolean remainingQuery = true;
+        final SimpleQuery q = (SimpleQuery) query;
+        FeatureSet subset = this;
+        final List<SimpleQuery.Column> cols = q.getColumns();
+
+        /**
+         * Once filter has been taken care of, we will be able to check columns to filter. Note that all filters
+         * managed by database engine can use non-returned columns, but it is not the case of remaining ones, which
+         * are applied after feature creation, therefore with only filtered columns accessible.
          */
-        feature.setName(id.getName(analyzer));
-        String remarks = id.freeText;
-        if (id instanceof Relation) {
-            try (ResultSet reflect = analyzer.metadata.getTables(id.catalog, schemaEsc, tableEsc, null)) {
-                while (reflect.next()) {
-                    remarks = analyzer.getUniqueString(reflect, Reflection.REMARKS);
-                    if (remarks != null) {
-                        remarks = remarks.trim();
-                        if (remarks.isEmpty()) {
-                            remarks = null;
-                        } else break;
-                    }
-                }
-            }
-        }
-        if (remarks != null) {
-            feature.setDefinition(remarks);
+        if (cols != null && !cols.isEmpty()) {
+
         }
-        this.featureType      = feature.build();
-        this.importedKeys     = toArray(importedKeys);
-        this.exportedKeys     = toArray(exportedKeys);
-        this.primaryKeyClass  = primaryKeyClass;
-        this.hasGeometry      = hasGeometry;
-        this.attributes = Collections.unmodifiableList(attributes);
+
+        return remainingQuery? subset.subset(q) : subset;
     }
 
     /**
@@ -445,7 +249,14 @@ final class Table extends AbstractFeatureSet {
                 for (final Relation relation : relations) {
                     if (!relation.isSearchTableDefined()) {
                         // A ClassCastException below would be a bug since 'relation.propertyName' shall be for an association.
-                        FeatureAssociationRole association = (FeatureAssociationRole) featureType.getProperty(relation.propertyName);
+                        final PropertyType property = featureType.getProperty(relation.propertyName.toString());
+                        if (!(property instanceof FeatureAssociationRole)) {
+                            throw new IllegalStateException(String.format(
+                                    "We expect a feature association for %s relation %s. Duplicate key ?",
+                                    direction.name(), relation.propertyName
+                            ));
+                        }
+                        FeatureAssociationRole association = (FeatureAssociationRole) property;
                         final Table table = tables.get(association.getValueType().getName());
                         if (table == null) {
                             throw new InternalDataStoreException(association.toString());
@@ -594,6 +405,10 @@ final class Table extends AbstractFeatureSet {
         return count;
     }
 
+    public SQLBuilder createStatement() {
+        return new SQLBuilder(sqlTemplate);
+    }
+
     /**
      * Returns a stream of all features contained in this dataset.
      *
@@ -606,18 +421,6 @@ final class Table extends AbstractFeatureSet {
         return new StreamSQL(this);
     }
 
-    void closeRef(final AtomicReference<Connection> ref, boolean forceRollback) {
-        final Connection conn = ref.get();
-        if (conn != null) {
-            try {
-                if (forceRollback) conn.rollback();
-                conn.close();
-            } catch (SQLException e) {
-                warning(e);
-            }
-        }
-    }
-
     /**
      * Returns an iterator over the features.
      *
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/storage/sql/SQLStore.java b/storage/sis-sqlstore/src/main/java/org/apache/sis/storage/sql/SQLStore.java
index b7b1061..f4c20ca 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/storage/sql/SQLStore.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/storage/sql/SQLStore.java
@@ -16,27 +16,29 @@
  */
 package org.apache.sis.storage.sql;
 
-import java.util.Optional;
-import java.util.Collection;
-import javax.sql.DataSource;
+import java.lang.reflect.Method;
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.lang.reflect.Method;
-import org.opengis.util.GenericName;
+import java.util.Collection;
+import java.util.Optional;
+import javax.sql.DataSource;
+
 import org.opengis.metadata.Metadata;
-import org.opengis.parameter.ParameterValueGroup;
 import org.opengis.metadata.spatial.SpatialRepresentationType;
-import org.apache.sis.storage.Resource;
+import org.opengis.parameter.ParameterValueGroup;
+import org.opengis.util.GenericName;
+
+import org.apache.sis.internal.sql.feature.Database;
+import org.apache.sis.internal.sql.feature.Resources;
+import org.apache.sis.internal.storage.MetadataBuilder;
 import org.apache.sis.storage.Aggregate;
 import org.apache.sis.storage.DataStore;
 import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.storage.IllegalNameException;
+import org.apache.sis.storage.Resource;
 import org.apache.sis.storage.StorageConnector;
 import org.apache.sis.storage.event.ChangeEvent;
 import org.apache.sis.storage.event.ChangeListener;
-import org.apache.sis.internal.sql.feature.Database;
-import org.apache.sis.internal.sql.feature.Resources;
-import org.apache.sis.internal.storage.MetadataBuilder;
 import org.apache.sis.util.ArgumentChecks;
 import org.apache.sis.util.Exceptions;
 
diff --git a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
index 271bdb7..2929194 100644
--- a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
+++ b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
@@ -16,20 +16,31 @@
  */
 package org.apache.sis.storage.sql;
 
+import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import javax.sql.DataSource;
 
 import org.opengis.feature.AttributeType;
 import org.opengis.feature.Feature;
 import org.opengis.feature.FeatureAssociationRole;
 import org.opengis.feature.FeatureType;
 import org.opengis.feature.PropertyType;
+import org.opengis.util.GenericName;
 
+import org.apache.sis.internal.feature.AttributeConvention;
+import org.apache.sis.internal.metadata.sql.SQLBuilder;
+import org.apache.sis.internal.sql.feature.QueryFeatureSet;
 import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.storage.FeatureSet;
 import org.apache.sis.storage.StorageConnector;
@@ -45,6 +56,8 @@ import static org.apache.sis.test.Assert.assertNotNull;
 import static org.apache.sis.test.Assert.assertSame;
 import static org.apache.sis.test.Assert.assertTrue;
 import static org.apache.sis.test.Assert.fail;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
 
 // Branch-dependent imports
 
@@ -134,16 +147,16 @@ public final strictfp class SQLStoreTest extends TestCase {
             {
                 final FeatureSet cities = (FeatureSet) store.findResource("Cities");
                 verifyFeatureType(cities.getType(),
-                        new String[] {"sis:identifier", "pk:country", "country",   "native_name", "english_name", "population",  "parks"},
-                        new Object[] {null,             String.class, "Countries", String.class,  String.class,   Integer.class, "Parks"});
+                        new String[] {"sis:identifier", "country",   "native_name", "english_name", "population", "sis:country", "sis:Parks"},
+                        new Object[] {null,             String.class, String.class,  String.class,   Integer.class, "Countries", "Parks"});
 
                 verifyFeatureType(((FeatureSet) store.findResource("Countries")).getType(),
-                        new String[] {"sis:identifier", "code",       "native_name"},
-                        new Object[] {null,             String.class, String.class});
+                        new String[] {"sis:identifier", "code",       "native_name",  "sis:Cities"},
+                        new Object[] {null,             String.class, String.class, "Cities"});
 
                 verifyFeatureType(((FeatureSet) store.findResource("Parks")).getType(),
-                        new String[] {"sis:identifier", "pk:country", "FK_City", "city",       "native_name", "english_name"},
-                        new Object[] {null,             String.class, "Cities",  String.class, String.class,  String.class});
+                        new String[] {"sis:identifier", "country", "city",       "native_name", "english_name", "sis:FK_City"},
+                        new Object[] {null,             String.class,  String.class, String.class,  String.class, "Cities"});
 
                 try (Stream<Feature> features = cities.features(false)) {
                     features.forEach((f) -> verifyContent(f));
@@ -152,6 +165,7 @@ public final strictfp class SQLStoreTest extends TestCase {
                 // Now, we'll check that overloaded stream operations are functionally stable, even stacked.
                 verifyStreamOperations(cities);
 
+                verifyQueries(tmp.source);
             }
         }
         assertEquals(Integer.valueOf(2), countryCount.remove("CAN"));
@@ -160,18 +174,177 @@ public final strictfp class SQLStoreTest extends TestCase {
         assertTrue  (countryCount.isEmpty());
     }
 
+    private void verifyQueries(DataSource source) throws Exception {
+        verifyFetchCityTableAsQuery(source);
+        verifyLimitOffsetAndColumnSelectionFromQuery(source);
+        verifyDistinctQuery(source);
+    }
+
+    private void verifyFetchCityTableAsQuery(DataSource source) throws Exception {
+        final QueryFeatureSet allCities;
+        final QueryFeatureSet canadaCities;
+        try (Connection conn = source.getConnection()) {
+            final SQLBuilder builder = new SQLBuilder(conn.getMetaData(), false)
+                    .append("SELECT * FROM ").appendIdentifier("features", "Cities");
+            allCities = new QueryFeatureSet(builder, source, conn);
+            /* By re-using the same builder, we ensure a defensive copy is done at feature set creation, avoiding
+             * potential concurrent or security issue due to afterward modification of the query.
+             */
+            builder.append(" WHERE ").appendIdentifier("country").append("='CAN'");
+            canadaCities = new QueryFeatureSet(builder, source, conn);
+        }
+
+        final HashMap<String, Class> expectedAttrs = new HashMap<>();
+        expectedAttrs.put("country",      String.class);
+        expectedAttrs.put("native_name",  String.class);
+        expectedAttrs.put("english_name", String.class);
+        expectedAttrs.put("population",   Integer.class);
+
+        checkQueryType(expectedAttrs, allCities.getType());
+        checkQueryType(expectedAttrs, canadaCities.getType());
+
+        Set<Map<String, Object>> expectedResults = new HashSet<>();
+        expectedResults.add(city("CAN", "Montréal", "Montreal", 1704694));
+        expectedResults.add(city("CAN", "Québec", "Quebec", 531902));
+
+        Set<Map<String, Object>> result = canadaCities.features(false)
+                .map(SQLStoreTest::asMap)
+                .collect(Collectors.toSet());
+        assertEquals("Query result is not consistent with expected one", expectedResults, result);
+
+        expectedResults.add(city("FRA", "Paris",    "Paris",    2206488));
+        expectedResults.add(city("JPN", "東京",     "Tōkyō",   13622267));
+
+        result = allCities.features(false)
+                .map(SQLStoreTest::asMap)
+                .collect(Collectors.toSet());
+        assertEquals("Query result is not consistent with expected one", expectedResults, result);
+    }
+
+    private static Map<String, Object> city(String country, String nativeName, String enName, int population) {
+        final Map<String, Object> result = new HashMap<>();
+        result.put("country", country);
+        result.put("native_name", nativeName);
+        result.put("english_name", enName);
+        result.put("population", population);
+        return result;
+    }
+    
     /**
-     * Checks that operations stacked on feature stream are well executed. This test focus on mapping and peeking
-     * actions overloaded by sql streams. We'd like to test skip and limit operations too, but ignore it for now,
-     * because ordering of results matters for such a test.
+     * Differs from {@link #verifyFeatureType(FeatureType, String[], Object[])} because
+     * @param expectedAttrs
+     * @param target
+     */
+    private static void checkQueryType(final Map<String, Class> expectedAttrs, final FeatureType target) {
+        final Collection<? extends PropertyType> props = target.getProperties(true);
+        assertEquals("Number of attributes", expectedAttrs.size(), props.size());
+        for (PropertyType p : props) {
+            assertTrue("Query type should contain only attributes", p instanceof AttributeType);
+            final String pName = p.getName().toString();
+            final Class expectedClass = expectedAttrs.get(pName);
+            assertNotNull("Unexpected property: "+pName, expectedClass);
+            assertEquals("Unepected type for property: "+pName, expectedClass, ((AttributeType)p).getValueClass());
+        }
+    }
+
+    private static Map<String, Object> asMap(final Feature source) {
+        return source.getType().getProperties(true).stream()
+                .map(PropertyType::getName)
+                .map(GenericName::toString)
+                .collect(Collectors.toMap(n->n, source::getPropertyValue));
+    }
+
+    /**
+     * Test limit and offset. The logic is: if user provided an offset, stream {@link Stream#skip(long) skip operator}
+     * does NOT override it, but stack on it (which is logic: the feature set provide user defined result, and the
+     * stream navigate through it).
      *
-     * @implNote Most of stream operations used here are meaningless. We just want to ensure that the pipeline does not
-     * skip any operation.
+     * Moreover, we also check filtering of columns and label usage.
      *
-     * @param cities The feature set to read from. We expect a feature set containing all cities defined for the test
-     *               class.
-     * @throws DataStoreException Let's propagate any error raised by input feature set.
+     * @param source Database connection provider.
+     */
+    private void verifyLimitOffsetAndColumnSelectionFromQuery(final DataSource source) throws Exception {
+        // Ensure multiline text is accepted
+        final String query = "SELECT \"english_name\" as \"title\" \n\r" +
+                "FROM features.\"Parks\" \n" +
+                "ORDER BY \"english_name\" ASC \n" +
+                "OFFSET 2 ROWS FETCH NEXT 3 ROWS ONLY";
+        final QueryFeatureSet qfs;
+        try (Connection conn = source.getConnection()) {
+            qfs = new QueryFeatureSet(query, source, conn);
+        }
+
+        final FeatureType type = qfs.getType();
+        final Iterator<? extends PropertyType> props = type.getProperties(true).iterator();
+        assertTrue("Built feature set has at least one property", props.hasNext());
+        final AttributeType attr = (AttributeType) props.next();
+        assertEquals("Property name should be label defined in query", "title", attr.getName().toString());
+        assertEquals("Attribute should be a string", String.class, attr.getValueClass());
+        assertTrue("Column should be nullable.", attr.getMinimumOccurs() == 0);
+        final Object precision = attr.characteristics().get(AttributeConvention.MAXIMAL_LENGTH_CHARACTERISTIC.toString());
+        assertNotNull("Length constraint should be visible from feature type", precision);
+        assertEquals("Column length constraint should be visible from attribute type.", 20, ((AttributeType)precision).getDefaultValue());
+        assertFalse("Built feature type should have exactly one attribute.", props.hasNext());
+
+        Function<Stream<Feature>, String[]> getNames = in -> in
+                .map(f -> f.getPropertyValue("title").toString())
+                .toArray(size -> new String[size]);
+
+        String[] parkNames = getNames.apply(
+                qfs.features(false)
+                        // Get third row in the table, as query starts on second one, and we want to skip one entry from there
+                        .skip(1)
+                        // Tries to increase limit. The test will ensure it's not possible.
+                        .limit(4)
+        );
+
+        assertArrayEquals(
+                "Should get fourth and fifth park names from ascending order",
+                new String[]{"Tuileries Garden", "Yoyogi-kōen"},
+                parkNames
+        );
+
+        parkNames = getNames.apply(qfs.features(false)
+                .skip(0)
+                .limit(1)
+        );
+
+        assertArrayEquals("Only second third name should be returned", new String[]{"Shinjuku Gyoen"}, parkNames);
+    }
+
+    /**
+     * Check that a {@link Stream#distinct()} gives coherent results. For now, no optimisation is done to delegate it to
+     * database, but this test allows for non-regression test, so when an optimisation is done, we'll immediately test
+     * its validity.
      */
+    private void verifyDistinctQuery(DataSource source) throws SQLException {
+        // Ensure multiline text is accepted
+        final String query = "SELECT \"country\" FROM features.\"Parks\" ORDER BY \"country\"";
+        final QueryFeatureSet qfs;
+        try (Connection conn = source.getConnection()) {
+            qfs = new QueryFeatureSet(query, source, conn);
+        }
+
+        final Object[] expected = qfs.features(false)
+                .distinct() 
+                .map(f -> f.getPropertyValue("country"))
+                .toArray();
+
+        assertArrayEquals("Distinct country names, sorted in ascending order", new String[]{"CAN", "FRA", "JPN"}, expected);
+    }
+
+    /**
+    * Checks that operations stacked on feature stream are well executed. This test focus on mapping and peeking
+    * actions overloaded by sql streams. We'd like to test skip and limit operations too, but ignore it for now,
+    * because ordering of results matters for such a test.
+    *
+    * @implNote Most of stream operations used here are meaningless. We just want to ensure that the pipeline does not
+    * skip any operation.
+    *
+    * @param cities The feature set to read from. We expect a feature set containing all cities defined for the test
+    *               class.
+    * @throws DataStoreException Let's propagate any error raised by input feature set.
+    */
     private static void verifyStreamOperations(final FeatureSet cities) throws DataStoreException {
         try (Stream<Feature> features = cities.features(false)) {
             final AtomicInteger peekCount = new AtomicInteger();
@@ -213,6 +386,7 @@ public final strictfp class SQLStoreTest extends TestCase {
     private static void verifyFeatureType(final FeatureType type, final String[] expectedNames, final Object[] expectedTypes) {
         int i = 0;
         for (PropertyType pt : type.getProperties(false)) {
+            if (i >= expectedNames.length) fail("Returned feature-type contains more properties than expected. Example: "+pt.getName());
             assertEquals("name", expectedNames[i], pt.getName().toString());
             final Object expectedType = expectedTypes[i];
             if (expectedType != null) {
@@ -285,7 +459,7 @@ public final strictfp class SQLStoreTest extends TestCase {
         /*
          * Verify attributes. They are the easiest properties to read.
          */
-        assertEquals("pk:country",     country,              feature.getPropertyValue("pk:country"));
+        assertEquals("country",        country,              feature.getPropertyValue("country"));
         assertEquals("sis:identifier", country + ':' + city, feature.getPropertyValue("sis:identifier"));
         assertEquals("english_name",   englishName,          feature.getPropertyValue("english_name"));
         assertEquals("population",     population,           feature.getPropertyValue("population"));
@@ -293,9 +467,9 @@ public final strictfp class SQLStoreTest extends TestCase {
          * Associations using Relation.Direction.IMPORT.
          * Those associations should be cached; we verify with "Canada" case.
          */
-        assertEquals("country", countryName, getIndirectPropertyValue(feature, "country", "native_name"));
+        assertEquals("country", countryName, getIndirectPropertyValue(feature, "sis:country", "native_name"));
         if (isCanada) {
-            final Feature f = (Feature) feature.getPropertyValue("country");
+            final Feature f = (Feature) feature.getPropertyValue("sis:country");
             if (canada == null) {
                 canada = f;
             } else {
@@ -307,7 +481,7 @@ public final strictfp class SQLStoreTest extends TestCase {
          * Associations using Relation.Direction.EXPORT.
          * Contrarily to the IMPORT case, those associations can contain many values.
          */
-        final Collection<?> actualParks = (Collection<?>) feature.getPropertyValue("parks");
+        final Collection<?> actualParks = (Collection<?>) feature.getPropertyValue("sis:Parks");
         assertNotNull("parks", actualParks);
         assertEquals("parks.length", parks.length, actualParks.size());
         final Collection<String> expectedParks = new HashSet<>(Arrays.asList(parks));
@@ -323,7 +497,7 @@ public final strictfp class SQLStoreTest extends TestCase {
              * Verify the reverse association form Parks to Cities.
              * This create a cyclic graph, but SQLStore is capable to handle it.
              */
-            assertSame("City → Park → City", feature, pf.getPropertyValue("FK_City"));
+            assertSame("City → Park → City", feature, pf.getPropertyValue("sis:FK_City"));
         }
     }
 
diff --git a/storage/sis-sqlstore/src/test/resources/org/apache/sis/storage/sql/Features.sql b/storage/sis-sqlstore/src/test/resources/org/apache/sis/storage/sql/Features.sql
index 148c076..27e4d65 100644
--- a/storage/sis-sqlstore/src/test/resources/org/apache/sis/storage/sql/Features.sql
+++ b/storage/sis-sqlstore/src/test/resources/org/apache/sis/storage/sql/Features.sql
@@ -65,5 +65,5 @@ INSERT INTO features."Parks" ("country", "city", "native_name", "english_name")
     ('CAN', 'Montréal', 'Mont Royal',           'Mount Royal'),
     ('FRA', 'Paris',    'Jardin des Tuileries', 'Tuileries Garden'),
     ('FRA', 'Paris',    'Jardin du Luxembourg', 'Luxembourg Garden'),
-    ('JPN', '東京',     '代々木公園',           'Yoyogi-kōen'),
-    ('JPN', '東京',     '新宿御苑',             'Shinjuku Gyoen');
+    ('JPN', '東京',      '代々木公園',            'Yoyogi-kōen'),
+    ('JPN', '東京',      '新宿御苑',              'Shinjuku Gyoen');
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/SubsetAdapter.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/SubsetAdapter.java
new file mode 100644
index 0000000..08e4310
--- /dev/null
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/SubsetAdapter.java
@@ -0,0 +1,131 @@
+package org.apache.sis.internal.storage;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+import org.opengis.filter.Filter;
+import org.opengis.filter.sort.SortBy;
+
+import org.apache.sis.internal.storage.query.SimpleQuery;
+import org.apache.sis.storage.FeatureSet;
+
+import static org.apache.sis.internal.storage.query.SimpleQuery.UNLIMITED;
+
+public final class SubsetAdapter {
+
+    final Function<FeatureSet, AdapterBuilder> driverSupplier;
+
+    public SubsetAdapter(Function<FeatureSet, AdapterBuilder> driverSupplier) {
+        this.driverSupplier = driverSupplier;
+    }
+
+    public final FeatureSet subset(final FeatureSet source, SimpleQuery query) {
+        final AdapterBuilder driver = driverSupplier.apply(source);
+
+        final SimpleQuery remaining = new SimpleQuery();
+
+        final long offset = query.getOffset();
+        if (offset > 0) remaining.setOffset(driver.offset(offset));
+
+        final long limit = query.getLimit();
+        if (limit != UNLIMITED) remaining.setLimit(driver.limit(limit));
+
+        if (filteringRequired(query)) remaining.setFilter(driver.filter(query.getFilter()));
+
+        if (sortRequired(query) && !driver.sort(query.getSortBy())) remaining.setSortBy(query.getSortBy());
+
+        if (!allColumnsIncluded(query)) {
+            final SimpleQuery.Column[] remainingCols = driver.select(query.getColumns());
+            if (remainingCols != null && remainingCols.length > 0)
+                remaining.setColumns(remainingCols);
+        }
+
+        final FeatureSet driverSubset = driver.build().orElse(source);
+
+        return isNoOp(remaining)? driverSubset : remaining.execute(driverSubset);
+    }
+
+    protected final static boolean isNoOp(final SimpleQuery in) {
+        return in.getOffset() <= 0
+                && in.getLimit() == UNLIMITED
+                && allColumnsIncluded(in)
+                && !filteringRequired(in)
+                && !sortRequired(in);
+    }
+
+    protected final static boolean sortRequired(final SimpleQuery in) {
+        final SortBy[] sortBy = in.getSortBy();
+        return sortBy != null && sortBy.length > 0 && Arrays.stream(sortBy).anyMatch(Objects::nonNull);
+    }
+
+    protected final static boolean allColumnsIncluded(final SimpleQuery in) {
+        final List<SimpleQuery.Column> cols = in.getColumns();
+        return cols == null || cols.isEmpty();
+    }
+
+    protected final static boolean filteringRequired(SimpleQuery in) {
+        final Filter filter = in.getFilter();
+        return filter != Filter.INCLUDE;
+    }
+
+    public interface AdapterBuilder {
+
+        /**
+         * Specify an offset to use in custom query.
+         *
+         * @param offset The offset to handle.
+         * @return 0 if this builder can handle completely given offset. The input value if underlying driver cannot
+         * manage the offset itself. Note that you can return another value in case the driver and default query system
+         * must be stacked. Imagine the case of a partitioned storage, Maybe the driver can handle fixed offsets, and
+         * let default implementation managed remaining part of the offset downstream. For example, in a storage where
+         * features are chunked 10 by 10, when querying an offset of 12, the inner driver can configure the second
+         * partition to be loaded (element 10 to 20), and let default query skip 2 elements after that.
+         *
+         * @throws IllegalArgumentException If given value is illegal for the driver.
+         */
+        long offset(long offset);
+
+        /**
+         * Set a maximum number of elements to retrieve from custom query.
+         *
+         * @param limit The count of features to handle.
+         * @return {@link SimpleQuery#UNLIMITED} if this builder can handle completely given limit. The input value if
+         * underlying driver cannot do it itself, or must be stacked with default query system. Imagine the case of a
+         * partitioned storage, Maybe the driver can load entire chunks of data, and let default implementation cut last
+         * returned chunk. For example, in a storage where features are chunked 10 by 10, when querying a limit of 12,
+         * the inner driver can return two complete partitions (20 elements), and let default query stop processing
+         * after 12 elements have gone through.
+         *
+         * @throws IllegalArgumentException If given value is illegal for the driver.
+         */
+        long limit(long limit);
+
+        Filter filter(final Filter filter);
+
+        /**
+         * Submit a sort subquery to the driver.
+         *
+         * @param comparison The columns to sort, as specified in {{@link SimpleQuery#getSortBy()}}.
+         * @return True if driver handles the comparison. If false, it means that driver won't perform any sort, and the
+         * default implementation (i.e {@link SimpleQuery} must handle it.
+         */
+        boolean sort(final SortBy[] comparison);
+
+        /**
+         * Specify a subset of columns to return to the driver.
+         * @param columns The columns
+         * @return
+         */
+        SimpleQuery.Column[] select(List<SimpleQuery.Column> columns);
+
+        /**
+         * Take a snapshot of all parameters given to query adaptation.
+         *
+         * @return A custom driver query. If custom query is a no-op, returns an empty shell.
+         */
+        Optional<FeatureSet> build();
+    }
+}
diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/query/SimpleQuery.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/query/SimpleQuery.java
index 17d6912..271e62c 100644
--- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/query/SimpleQuery.java
+++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/query/SimpleQuery.java
@@ -20,6 +20,16 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.StringJoiner;
+
+import org.opengis.feature.AttributeType;
+import org.opengis.feature.FeatureAssociationRole;
+import org.opengis.feature.FeatureType;
+import org.opengis.feature.PropertyType;
+import org.opengis.filter.Filter;
+import org.opengis.filter.expression.Expression;
+import org.opengis.filter.sort.SortBy;
+import org.opengis.util.GenericName;
+
 import org.apache.sis.feature.builder.FeatureTypeBuilder;
 import org.apache.sis.internal.feature.FeatureExpression;
 import org.apache.sis.internal.util.UnmodifiableArrayList;
@@ -29,14 +39,6 @@ import org.apache.sis.util.ArgumentChecks;
 import org.apache.sis.util.Classes;
 import org.apache.sis.util.iso.Names;
 import org.apache.sis.util.resources.Errors;
-import org.opengis.feature.AttributeType;
-import org.opengis.feature.FeatureAssociationRole;
-import org.opengis.feature.FeatureType;
-import org.opengis.feature.PropertyType;
-import org.opengis.filter.Filter;
-import org.opengis.filter.expression.Expression;
-import org.opengis.filter.sort.SortBy;
-import org.opengis.util.GenericName;
 
 
 /**
@@ -54,7 +56,7 @@ public class SimpleQuery extends Query {
      * Sentinel limit value for queries of unlimited length.
      * This value can be given to {@link #setLimit(long)} or retrieved from {@link #getLimit()}.
      */
-    private static final long UNLIMITED = -1;
+    public static final long UNLIMITED = -1;
 
     /**
      * The columns to retrieve, or {@code null} if all columns shall be included in the query.


Mime
View raw message