sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject [sis] 05/22: fix(SQLStore): better handling of parallelization flag. Add a benchmark to test query spliterator flavors.
Date Thu, 14 Nov 2019 11:46:39 GMT
This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/sql-store
in repository https://gitbox.apache.org/repos/asf/sis.git

commit 708faa26882f98eb743c94ff37d6950a3c03e3f5
Author: Alexis Manin <amanin@apache.org>
AuthorDate: Thu Sep 26 19:19:33 2019 +0200

    fix(SQLStore): better handling of parallelization flag. Add a benchmark to test query
spliterator flavors.
---
 storage/sis-sqlstore/pom.xml                       |  14 +-
 .../apache/sis/internal/sql/feature/Analyzer.java  |  26 ++--
 .../apache/sis/internal/sql/feature/Connector.java |  23 +++-
 .../sis/internal/sql/feature/FeatureAdapter.java   |   2 +-
 .../sis/internal/sql/feature/PrimaryKey.java       |  12 +-
 .../sis/internal/sql/feature/QueryBuilder.java     |   1 +
 .../sis/internal/sql/feature/QueryFeatureSet.java  | 149 +++++++++++++++++---
 .../sql/feature/QuerySpliteratorsBench.java        | 150 +++++++++++++++++++++
 .../internal/sql/feature/SQLTypeSpecification.java |  62 ++++++++-
 .../apache/sis/internal/sql/feature/StreamSQL.java |  19 ++-
 .../org/apache/sis/internal/sql/feature/Table.java |   2 +-
 .../sis/internal/sql/feature/package-info.java     |   5 +
 12 files changed, 418 insertions(+), 47 deletions(-)

diff --git a/storage/sis-sqlstore/pom.xml b/storage/sis-sqlstore/pom.xml
index 45231fe..1e905f2 100644
--- a/storage/sis-sqlstore/pom.xml
+++ b/storage/sis-sqlstore/pom.xml
@@ -120,7 +120,7 @@
     <dependency>
       <groupId>org.apache.derby</groupId>
       <artifactId>derby</artifactId>
-      <scope>test</scope>
+      <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>org.hsqldb</groupId>
@@ -132,6 +132,18 @@
       <artifactId>postgresql</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-core</artifactId>
+      <version>1.21</version>
+    </dependency>
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-generator-annprocess</artifactId>
+      <version>1.21</version>
+    </dependency>
+
   </dependencies>
 
 </project>
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
index 6ac8fd7..9348d01 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Analyzer.java
@@ -22,6 +22,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.*;
+import java.util.function.Supplier;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import javax.sql.DataSource;
@@ -140,6 +141,7 @@ final class Analyzer {
      * The namespace created with {@link #catalog} and {@link #schema}.
      */
     private transient NameSpace namespace;
+    public static final Supplier<GenericName> RANDOME_NAME = () -> Names.createGenericName("sis",
":", UUID.randomUUID().toString());
 
     /**
      * Creates a new analyzer for the database described by given metadata.
@@ -334,9 +336,9 @@ final class Analyzer {
 
     public FeatureAdapter buildAdapter(final SQLTypeSpecification spec) throws SQLException
{
         final FeatureTypeBuilder builder = new FeatureTypeBuilder(nameFactory, functions.library,
locale);
-        builder.setName(spec.getName() == null ? Names.createGenericName("sis", ":", UUID.randomUUID().toString())
: spec.getName());
-        builder.setDefinition(spec.getDefinition());
-        final String geomCol = spec.getPrimaryGeometryColumn().orElse("");
+        builder.setName(spec.getName().orElseGet(RANDOME_NAME));
+        spec.getDefinition().ifPresent(builder::setDefinition);
+        final String geomCol = spec.getPrimaryGeometryColumn().map(ColumnRef::getAttributeName).orElse("");
         final List pkCols = spec.getPK().map(PrimaryKey::getColumns).orElse(Collections.EMPTY_LIST);
         List<PropertyMapper> attributes = new ArrayList<>();
         // JDBC column indices are 1 based.
@@ -470,15 +472,15 @@ final class Analyzer {
         }
 
         @Override
-        public GenericName getName() {
-            return id.getName(Analyzer.this);
+        public Optional<GenericName> getName() {
+            return Optional.of(id.getName(Analyzer.this));
         }
 
         /**
          * The remarks are opportunistically stored in id.freeText if known by the caller.
          */
         @Override
-        public String getDefinition() throws SQLException {
+        public Optional<String> getDefinition() throws SQLException {
             String remarks = id.freeText;
             if (id instanceof Relation) {
                 try (ResultSet reflect = metadata.getTables(id.catalog, schemaEsc, tableEsc,
null)) {
@@ -493,7 +495,7 @@ final class Analyzer {
                     }
                 }
             }
-            return remarks;
+            return Optional.ofNullable(remarks);
         }
 
         @Override
@@ -551,7 +553,7 @@ final class Analyzer {
         }
 
         @Override
-        public Optional<String> getPrimaryGeometryColumn() {
+        public Optional<ColumnRef> getPrimaryGeometryColumn() {
             return Optional.empty();
             //throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin
(Geomatys)" on 20/09/2019
         }
@@ -589,13 +591,13 @@ final class Analyzer {
         }
 
         @Override
-        public GenericName getName() throws SQLException {
-            return name;
+        public Optional<GenericName> getName() throws SQLException {
+            return Optional.of(name);
         }
 
         @Override
-        public String getDefinition() throws SQLException {
-            return query;
+        public Optional<String> getDefinition() throws SQLException {
+            return Optional.of(query);
         }
 
         @Override
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
index c99bd7f..9ce296c 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
@@ -8,8 +8,29 @@ import org.opengis.feature.Feature;
 
 import org.apache.sis.storage.DataStoreException;
 
-public interface Connector {
+/**
+ * Simple abstraction to describe a component capable of loading data from an SQL connection.
Used
+ */
+interface Connector {
+    /**
+     * Triggers Loading of data through an existing connection.
+     *
+     * @param connection The database connection to use for data loading. Note its the caller
responsability to close
+     *                   the connection, and it should not be done before stream terminal
operation is over.
+     * @return Features loaded from input connection. It is recommended to implement lazy
solutions, however it's an
+     * implementation dependant choice.
+     * @throws SQLException If an error occurs while exchanging information with the database.
+     * @throws DataStoreException If a data model dependant error occurs.
+     */
     Stream<Feature> connect(Connection connection) throws SQLException, DataStoreException;
 
+    /**
+     * Provides an approximate query to resume data loaded.
+     *
+     * @param count If the query estimation is needed for a count operation, in which case
the returned query should be
+     *              a count query.
+     * @return SQL query describing the way this component load data. Never null. However,
implementations are free to
+     * throw {@link UnsupportedOperationException} if they do not support such operation.
+     */
     String estimateStatement(final boolean count);
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
index 0162e67..17a345a 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
@@ -57,7 +57,7 @@ class FeatureAdapter {
         return features;
     }
 
-    static class PropertyMapper {
+    static final class PropertyMapper {
         // TODO: by using a indexed implementation of Feature, we could avoid the name mapping.
However, a JMH benchmark
         // would be required in order to be sure it's impacting performance positively.
         final String propertyName;
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
index 1e68fd3..e220abc 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/PrimaryKey.java
@@ -10,6 +10,9 @@ import org.apache.sis.util.ArgumentChecks;
 /**
  * Represents SQL primary key constraint. Main information is columns composing the key.
  *
+ * @implNote For now, only list of columns composing the key are returned. However, in the
future it would be possible
+ * to add other information, as a value type to describe how to expose primary key value.
+ *
  * @author "Alexis Manin (Geomatys)"
  */
 interface PrimaryKey {
@@ -20,13 +23,16 @@ interface PrimaryKey {
         return Optional.of(new Composite(cols));
     }
 
-    //Class<T> getViewType();
+    /**
+     *
+     * @return List of column names composing the key. Should neither be null nor empty.
+     */
     List<String> getColumns();
 
     class Simple implements PrimaryKey {
         final String column;
 
-        public Simple(String column) {
+        Simple(String column) {
             this.column = column;
         }
 
@@ -40,7 +46,7 @@ interface PrimaryKey {
          */
         private final List<String> columns;
 
-        public Composite(List<String> columns) {
+        Composite(List<String> columns) {
             ArgumentChecks.ensureNonEmpty("Primary key column names", columns);
             this.columns = Collections.unmodifiableList(new ArrayList<>(columns));
         }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
index 68d798e..a78ed2a 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
@@ -2,6 +2,7 @@ package org.apache.sis.internal.sql.feature;
 
 /**
  * API to allow overrided SQL Stream to delegate a set of intermediate operations to native
driver.
+ * TODO: move as inner interface of {@link StreamSQL} ?
  */
 interface QueryBuilder {
 
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
index 0565036..ffbdc8b 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
@@ -4,6 +4,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
 import java.util.Spliterator;
 import java.util.function.Consumer;
 import java.util.regex.Matcher;
@@ -78,6 +79,21 @@ public class QueryFeatureSet extends AbstractFeatureSet {
     private final boolean distinct;
 
     /**
+     * Debug flag to activate (use {@link PrefetchSpliterator}) or de-activate (use {@link
ResultSpliterator})
+     * batch loading of results.
+     */
+    boolean allowBatchLoading = true;
+    /**
+     * Profiling variable. Define the fraction (0 none, 1 all) of a single fetch (as defined
by {@link ResultSet#getFetchSize()}
+     * that {@link PrefetchSpliterator} will load in one go.
+     */
+    float fetchRatio = 0.5f;
+    /**
+     * Profiling variable, serves to define {{@link PreparedStatement#setFetchSize(int)}
SQL result fetch size}.
+     */
+    int fetchSize = 100;
+
+    /**
      * Same as {@link #QueryFeatureSet(SQLBuilder, DataSource, Connection)}, except query
is provided by a fixed text
      * instead of a builder.
      */
@@ -186,19 +202,21 @@ public class QueryFeatureSet extends AbstractFeatureSet {
 
     @Override
     public Stream<Feature> features(boolean parallel) {
-        return new StreamSQL(new QueryAdapter(queryBuilder), source);
+        return new StreamSQL(new QueryAdapter(queryBuilder, parallel), source, parallel);
     }
 
     private final class QueryAdapter implements QueryBuilder {
 
         private final SQLBuilder source;
+        private final boolean parallel;
 
         private long additionalOffset, additionalLimit;
 
-        QueryAdapter(SQLBuilder source) {
+        QueryAdapter(SQLBuilder source, boolean parallel) {
             // defensive copy
             this.source = new SQLBuilder(source);
             this.source.append(source.toString());
+            this.parallel = parallel;
         }
 
         @Override
@@ -236,7 +254,7 @@ public class QueryFeatureSet extends AbstractFeatureSet {
                 }
 
                 Features.addOffsetLimit(source, nativeOffset, nativeLimit);
-                return new PreparedQueryConnector(source.toString(), javaOffset, javaLimit);
+                return new PreparedQueryConnector(source.toString(), javaOffset, javaLimit,
parallel);
             }
             throw new UnsupportedOperationException("Not supported yet: modifying user query");
// "Alexis Manin (Geomatys)" on 24/09/2019
         }
@@ -246,19 +264,25 @@ public class QueryFeatureSet extends AbstractFeatureSet {
 
         final String sql;
         private long additionalOffset, additionalLimit;
+        private final boolean parallel;
 
-        private PreparedQueryConnector(String sql, long additionalOffset, long additionalLimit)
{
+        private PreparedQueryConnector(String sql, long additionalOffset, long additionalLimit,
boolean parallel) {
             this.sql = sql;
             this.additionalOffset = additionalOffset;
             this.additionalLimit = additionalLimit;
+            this.parallel = parallel;
         }
 
         @Override
         public Stream<Feature> connect(Connection connection) throws SQLException,
DataStoreException {
             final PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+            statement.setFetchSize(fetchSize);
             final ResultSet result = statement.executeQuery();
-
-            Stream<Feature> stream = StreamSupport.stream(new ResultSpliterator(result),
false);
+            final int fetchSize = result.getFetchSize();
+            final boolean withPrefetch = !allowBatchLoading || (fetchSize < 1 || fetchSize
>= Integer.MAX_VALUE);
+            final Spliterator<Feature> spliterator = withPrefetch ?
+                    new ResultSpliterator(result) : new PrefetchSpliterator(result, fetchRatio);
+            Stream<Feature> stream = StreamSupport.stream(spliterator, parallel &&
withPrefetch);
             if (additionalLimit > 0) stream = stream.limit(additionalLimit);
             if (additionalOffset > 0) stream = stream.skip(additionalOffset);
 
@@ -276,19 +300,49 @@ public class QueryFeatureSet extends AbstractFeatureSet {
 
         @Override
         public String estimateStatement(boolean count) {
+            // Require analysis. Things could get complicated if original user query is already
a count.
             throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin
(Geomatys)" on 24/09/2019
         }
     }
 
-    private final class ResultSpliterator implements Spliterator<Feature> {
+    /**
+     * Base class for loading SQL query result loading through {@link Spliterator} API. Concrete
implementations comes
+     * in two experimental flavors :
+     * <ul>
+     *     <li>Sequential streaming: {@link ResultSpliterator}</li>
+     *     <li>Parallelizable batch  streaming: {@link PrefetchSpliterator}</li>
+     * </ul>
+     *
+     * {@link QuerySpliteratorsBench A benchmark is available} to compare both implementations,
which could be useful in
+     * the future to determine which implementation to priorize. For now results does not
show much difference.
+     */
+    private abstract class QuerySpliterator  implements java.util.Spliterator<Feature>
{
 
         final ResultSet result;
 
-        private ResultSpliterator(ResultSet result) {
+        private QuerySpliterator(ResultSet result) {
             this.result = result;
         }
 
         @Override
+        public long estimateSize() {
+            return originLimit > 0 ? originLimit : Long.MAX_VALUE;
+        }
+
+        @Override
+        public int characteristics() {
+            // TODO: determine if it's order by analysing user query. SIZED is not possible,
as limit is an upper threshold.
+            return Spliterator.IMMUTABLE | Spliterator.NONNULL | (distinct ? Spliterator.DISTINCT
: 0);
+        }
+    }
+
+    private final class ResultSpliterator extends QuerySpliterator {
+
+        private ResultSpliterator(ResultSet result) {
+            super(result);
+        }
+
+        @Override
         public boolean tryAdvance(Consumer<? super Feature> action) {
             try {
                 if (result.next()) {
@@ -305,23 +359,84 @@ public class QueryFeatureSet extends AbstractFeatureSet {
         public Spliterator<Feature> trySplit() {
             return null;
         }
+    }
+
+    private static SQLBuilder fromQuery(final String query, final Connection conn) throws
SQLException {
+        return new SQLBuilder(conn.getMetaData(), true)
+                .append(query);
+    }
+
+    /**
+     * An attempt to optimize feature loading through batching and potential parallelization.
For now, it looks like
+     * there's not much improvement regarding to naive streaming approach. IMHO, two improvements
would really impact
+     * performance positively if done:
+     * <ul>
+     *     <li>Optimisation of batch loading through {@link FeatureAdapter#prefetch(int,
ResultSet)}</li>
+     *     <li>Better splitting balance, as stated by {@link Spliterator#trySplit()}</li>
+     * </ul>
+     */
+    private final class PrefetchSpliterator extends QuerySpliterator {
+
+        final int fetchSize;
+
+        int idx;
+        List<Feature> chunk;
+        /**
+         * According to {@link Spliterator#trySplit()} documentation, the original size estimation
must be reduced after
+         * split to remain consistent.
+         */
+        long splittedAmount = 0;
+
+        private PrefetchSpliterator(ResultSet result) throws SQLException {
+            this(result, 0.5f);
+        }
+
+        private PrefetchSpliterator(ResultSet result, float fetchRatio) throws SQLException
{
+            super(result);
+            this.fetchSize = Math.max((int) (result.getFetchSize()*fetchRatio), 1);
+        }
+
+        @Override
+        public boolean tryAdvance(Consumer<? super Feature> action) {
+            if (ensureChunkAvailable()) {
+                action.accept(chunk.get(idx++));
+                return true;
+            }
+            return false;
+        }
+
+        public Spliterator<Feature> trySplit() {
+            if (!ensureChunkAvailable()) return null;
+            final List<Feature> remainingChunk = chunk.subList(idx, chunk.size());
+            splittedAmount += remainingChunk.size();
+            final Spliterator<Feature> chunkSpliterator = idx == 0 ?
+                    chunk.spliterator() : remainingChunk.spliterator();
+            chunk = null;
+            idx = 0;
+            return chunkSpliterator;
+        }
 
         @Override
         public long estimateSize() {
-            // TODO: economic size estimation ? A count query seems overkill for the aim
of this API. Howver, we could
-            // analyze user query in search for a limit value.
-            return originLimit > 0 ? originLimit : Long.MAX_VALUE;
+            return super.estimateSize() - splittedAmount;
         }
 
         @Override
         public int characteristics() {
-            // TODO: determine if it's sorted by analysing user query. SIZED is not possible,
as limit is an upper threshold.
-            return Spliterator.IMMUTABLE | Spliterator.NONNULL;
+            return super.characteristics() | Spliterator.CONCURRENT;
         }
-    }
 
-    private static SQLBuilder fromQuery(final String query, final Connection conn) throws
SQLException {
-        return new SQLBuilder(conn.getMetaData(), true)
-                .append(query);
+        private boolean ensureChunkAvailable() {
+            if (chunk == null || idx >= chunk.size()) {
+                idx = 0;
+                try {
+                    chunk = adapter.prefetch(fetchSize, result);
+                } catch (SQLException e) {
+                    throw new BackingStoreException(e);
+                }
+                return chunk != null && !chunk.isEmpty();
+            }
+            return true;
+        }
     }
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java
new file mode 100644
index 0000000..bb519d9
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java
@@ -0,0 +1,150 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sis.internal.metadata.sql.Initializer;
+
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static org.apache.sis.util.ArgumentChecks.ensureStrictlyPositive;
+
+@Fork(value = 2, jvmArgs = {"-server", "-Xmx2g"} )
+@Warmup(iterations = 2, time = 4, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 2, time = 4, timeUnit = TimeUnit.SECONDS)
+public class QuerySpliteratorsBench {
+
+    @State(Scope.Benchmark)
+    public static class DatabaseInput {
+
+        @Param({"1000", "100000"})
+        int numRows;
+
+        @Param({"true", "false"})
+        boolean parallel;
+
+        @Param({"true", "false"})
+        boolean prefetch;
+
+        @Param({"10", "100", "1000"})
+        int fetchSize;
+
+        @Param({"0.5", "1", "2"})
+        float fetchRatio;
+
+        EmbeddedDataSource db;
+        QueryFeatureSet fs;
+
+        public DatabaseInput() {}
+
+        @Setup(Level.Trial)
+        public void setup() throws SQLException {
+            ensureStrictlyPositive("Number of rows", numRows);
+
+            db = new EmbeddedDataSource();
+            db.setDatabaseName("memory:spliterators");
+            db.setDataSourceName("Apache SIS test database");
+            db.setCreateDatabase("create");
+
+            try (Connection c = db.getConnection()) {
+                c.createStatement().execute(
+                        "CREATE TABLE TEST (str CHARACTER VARYING(20), myInt INTEGER, myDouble
DOUBLE)"
+                );
+                final PreparedStatement st = c.prepareStatement("INSERT INTO TEST values
(?, ?, ?)");
+
+                final Random rand = new Random();
+                int rows = 1;
+                final byte[] txt = new byte[20];
+                do {
+                    for (int i = 0; i < 500 ; i++, rows++) {
+                        rand.nextBytes(txt);
+                        st.setString(1, new String(txt, StandardCharsets.US_ASCII));
+                        st.setInt(2, rand.nextInt());
+                        st.setDouble(3, rand.nextDouble());
+                        st.addBatch();
+                    }
+                    st.executeBatch();
+                    st.clearBatch();
+                } while (rows < numRows);
+
+                fs = new QueryFeatureSet("SELECT * FROM TEST", db, c);
+                fs.allowBatchLoading = prefetch;
+                fs.fetchSize = fetchSize;
+                fs.fetchRatio = fetchRatio;
+            }
+        }
+
+        @TearDown
+        public void dropDatabase() throws SQLException {
+            db.setCreateDatabase("no");
+            db.setConnectionAttributes("drop=true");
+            try {
+                db.getConnection().close();
+            } catch (SQLException e) {                          // This is the expected exception.
+                if (!Initializer.isSuccessfulShutdown(e)) {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    @Benchmark
+    public void test(DatabaseInput input) throws SQLException {
+        final int sum = input.fs.features(input.parallel).mapToInt(f -> 1).sum();
+        if (sum != input.numRows) throw new AssertionError("..." + sum + "..." + "WTF ?!");
+    }
+/*
+    @Test
+    public void benchmark() throws Exception {
+        System.out.println("COMMON POOL: "+ ForkJoinPool.getCommonPoolParallelism());
+        final DatabaseInput db = new DatabaseInput();
+        db.numRows = 100000;
+        db.parallel = true;
+
+        long start = System.nanoTime();
+        db.setup();
+        System.out.println("Insertion time: "+((System.nanoTime()-start)/1e6)+" ms");
+
+        // warmup
+        for (int i = 0 ;  i < 5 ; i++) {
+            test(db);
+            test(db);
+        }
+
+        // go
+        long prefetch = 0, noprefetch = 0;
+        for (int i = 0 ; i < 100 ; i++) {
+            start = System.nanoTime();
+            test(db);
+            prefetch += System.nanoTime()-start;
+
+            start = System.nanoTime();
+            test(db);
+            noprefetch += System.nanoTime()-start;
+        }
+
+        System.out.println(String.format(
+                "Performances:%nP: %d%nI: %d",
+                (long) (prefetch / 1e7), (long) (noprefetch / 1e8)
+        ));
+    }
+*/
+    public static void main(String... args) throws Exception {
+        org.openjdk.jmh.Main.main(args);
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java
index 0e35edf..e54823d 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/SQLTypeSpecification.java
@@ -1,35 +1,87 @@
 package org.apache.sis.internal.sql.feature;
 
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Optional;
 
 import org.opengis.util.GenericName;
 
+import org.apache.sis.feature.builder.FeatureTypeBuilder;
+import org.apache.sis.internal.feature.AttributeConvention;
 import org.apache.sis.storage.DataStoreContentException;
 
+/**
+ * Defines an application schema inferred from an SQL database (query, table, etc.). Implementations
will be used by
+ * {@link Analyzer} to create an {@link FeatureAdapter adaptation layer to the feature model}.
Default implementations
+ * can be retrieved for tables and queries respectively through {@link Analyzer#create(TableReference,
TableReference)}
+ * and {@link Analyzer#create(PreparedStatement, String, GenericName)} methods.
+ */
 interface SQLTypeSpecification {
     /**
+     * Identifying name for the application schema. It is strongly recommended to be present,
for SIS engine to be
+     * capable to create insightful models. However, in corner cases where no proper names
could be provided, an empty
+     * value is allowed.
      *
-     * @return Name for the feature type to build. Nullable.
+     * @implNote SIS {@link FeatureTypeBuilder feature type builder} <em>requires</em>
a name, and current
+     * {@link Analyzer#buildAdapter(SQLTypeSpecification) analysis implementation} will create
a random UUID if
+     * necessary.
+     *
+     * @return Name for the feature type to build.
      * @throws SQLException If an error occurs while retrieving information from database.
      */
-    GenericName getName() throws SQLException;
+    Optional<GenericName> getName() throws SQLException;
 
     /**
+     * Gives an optional description of the application schema.This information is not necessary
for any kind of
+     * computation, but allows to give end-user global information about the schema (s)he's
manipulating.
      *
-     * @return A succint description of the data source. Nullable.
+     * @return A brief description of the data source.
      * @throws SQLException If an error occurs while retrieving information from database.
      */
-    String getDefinition() throws SQLException;
+    Optional<String> getDefinition() throws SQLException;
 
+    /**
+     * Primary key definition of source schema. Can be empty if no primary key is defined
(Example: query definition).
+     *
+     * @return Primary key definition if any, otherwise an empty shell.
+     * @throws SQLException If an error occurs while exchanging information with underlying
database.
+     */
     Optional<PrimaryKey> getPK() throws SQLException;
 
+    /**
+     *
+     * @return Ordered list of columns in application schema. Order is important, and will
be relied upon to retrieve
+     *  {@link ResultSet#getObject(int) result values by index}.
+     */
     List<SQLColumn> getColumns();
 
+    /**
+     *
+     * @return All identified relations based on a foreign key in <em>current</em>
application schema (1..1 or n..1).
+     * Corresponds to {@link Relation.Direction#IMPORT}. Can be empty but not null.
+     *
+     * @throws SQLException If an error occurs while exchanging information with underlying
database.
+     */
     List<Relation> getImports() throws SQLException;
 
+    /**
+     *
+     * @return All identified relations based on foreign key located in <em>another</em>
application schema (1..n).
+     * Corresponds to {@link Relation.Direction#EXPORT}. Can be empty but not null.
+     * @throws SQLException If an error occurs while exchanging information with underlying
database.
+     * @throws DataStoreContentException If a schema problem is encountered.
+     */
     List<Relation> getExports() throws SQLException, DataStoreContentException;
 
-    default Optional<String> getPrimaryGeometryColumn() {return Optional.empty();}
+    /**
+     * In case target schema contains geographic information, this serves to identify without
ambiguity which column
+     * contains what could be considered main geolocation (as stated by {@link AttributeConvention#GEOMETRY_PROPERTY}).
+     * This is a very important information in case application schema contains multiple
geometric fields.
+     *
+     * @return The name of the column/attribute to be considered as main geometry information,
or an empty shell if
+     * unknown.
+     */
+    default Optional<ColumnRef> getPrimaryGeometryColumn() {return Optional.empty();}
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
index 890a9dd..a0e9888 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
@@ -78,13 +78,14 @@ class StreamSQL extends StreamDecoration<Feature> {
 
     private Consumer<SQLException> warningConsumer = e -> Logging.getLogger("sis.sql").log(Level.FINE,
"Cannot properly close a connection", e);
 
-    StreamSQL(final Table source) {
-        this(new Features.Builder(source), source.source);
+    StreamSQL(final Table source, boolean parallel) {
+        this(new Features.Builder(source), source.source, parallel);
     }
 
-    StreamSQL(QueryBuilder queryAdapter, DataSource source) {
+    StreamSQL(QueryBuilder queryAdapter, DataSource source, boolean parallel) {
         this.queryAdapter = queryAdapter;
         this.source = source;
+        this.parallel = parallel;
     }
 
     public void setWarningConsumer(Consumer<SQLException> warningConsumer) {
@@ -171,8 +172,14 @@ class StreamSQL extends StreamDecoration<Feature> {
     @Override
     public long count() {
         // Avoid opening a connection if sql text cannot be evaluated.
-        final String sql = select().estimateStatement(true);
-        try (Connection conn = source.getConnection()) {
+        final String sql;
+        try {
+            sql = select().estimateStatement(true);
+        } catch (UnsupportedOperationException e) {
+            // If underlying connector does not support query estimation, we will fallback
on brut-force counting.
+            return super.count();
+        }
+        try (Connection conn = QueryFeatureSet.connectReadOnly(source)) {
             try (Statement st = conn.createStatement();
                  ResultSet rs = st.executeQuery(sql)) {
                 if (rs.next()) {
@@ -199,7 +206,7 @@ class StreamSQL extends StreamDecoration<Feature> {
                 })
                 .onClose(() -> closeRef(connectionRef, true));
         if (peekAction != null) featureStream = featureStream.peek(peekAction);
-        return parallel ? featureStream : featureStream.parallel();
+        return parallel ? featureStream.parallel() : featureStream;
     }
 
     /**
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
index 80fc52b..81b8c75 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
@@ -418,7 +418,7 @@ final class Table extends AbstractFeatureSet {
      */
     @Override
     public Stream<Feature> features(final boolean parallel) throws DataStoreException
{
-        return new StreamSQL(this);
+        return new StreamSQL(this, parallel);
     }
 
     /**
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java
index 2c198a8..19c3151 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/package-info.java
@@ -25,8 +25,13 @@
  * This package is for internal use by SIS only. Classes in this package
  * may change in incompatible ways in any future version without notice.
  *
+ * @implNote Feature type analysis is done through {@link org.apache.sis.internal.sql.feature.Analyzer}
class.
+ * It relies on internal {@link org.apache.sis.internal.sql.feature.SQLTypeSpecification}
API to fetch SQL schema
+ * information, and build {@link org.apache.sis.internal.sql.feature.FeatureAdapter an adapter
to feature model from it}.
+ *
  * @author  Johann Sorel (Geomatys)
  * @author  Martin Desruisseaux (Geomatys)
+ * @author  Alexis Manin (Geomatys)
  * @version 1.0
  * @since   1.0
  * @module


Mime
View raw message