sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject [sis] 18/45: fix(SQLStore): better handling of parallelization flag. Add a benchmark to test query spliterator flavors.
Date Tue, 12 Nov 2019 16:44:45 GMT
This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/sql-store
in repository https://gitbox.apache.org/repos/asf/sis.git

commit ef0f925a62fbfe0d3936ef2c9e321126db875181
Author: Alexis Manin <amanin@apache.org>
AuthorDate: Thu Sep 26 19:19:33 2019 +0200

    fix(SQLStore): better handling of parallelization flag. Add a benchmark to test query
spliterator flavors.
---
 storage/sis-sqlstore/pom.xml                       |  14 +-
 .../apache/sis/internal/sql/feature/Connector.java |  23 +++-
 .../sis/internal/sql/feature/FeatureAdapter.java   |   2 +-
 .../sis/internal/sql/feature/QueryBuilder.java     |   1 +
 .../sis/internal/sql/feature/QueryFeatureSet.java  | 111 +++++++++++++---
 .../sql/feature/QuerySpliteratorsBench.java        | 142 +++++++++++++++++++++
 .../apache/sis/internal/sql/feature/StreamSQL.java |  19 ++-
 .../org/apache/sis/internal/sql/feature/Table.java |   2 +-
 8 files changed, 286 insertions(+), 28 deletions(-)

diff --git a/storage/sis-sqlstore/pom.xml b/storage/sis-sqlstore/pom.xml
index 45231fe..1e905f2 100644
--- a/storage/sis-sqlstore/pom.xml
+++ b/storage/sis-sqlstore/pom.xml
@@ -120,7 +120,7 @@
     <dependency>
       <groupId>org.apache.derby</groupId>
       <artifactId>derby</artifactId>
-      <scope>test</scope>
+      <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>org.hsqldb</groupId>
@@ -132,6 +132,18 @@
       <artifactId>postgresql</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-core</artifactId>
+      <version>1.21</version>
+    </dependency>
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-generator-annprocess</artifactId>
+      <version>1.21</version>
+    </dependency>
+
   </dependencies>
 
 </project>
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
index c99bd7f..9ce296c 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Connector.java
@@ -8,8 +8,29 @@ import org.opengis.feature.Feature;
 
 import org.apache.sis.storage.DataStoreException;
 
-public interface Connector {
+/**
+ * Simple abstraction to describe a component capable of loading data from an SQL connection.
Used
+ */
+interface Connector {
+    /**
+     * Triggers Loading of data through an existing connection.
+     *
+     * @param connection The database connection to use for data loading. Note its the caller
responsability to close
+     *                   the connection, and it should not be done before stream terminal
operation is over.
+     * @return Features loaded from input connection. It is recommended to implement lazy
solutions, however it's an
+     * implementation dependant choice.
+     * @throws SQLException If an error occurs while exchanging information with the database.
+     * @throws DataStoreException If a data model dependant error occurs.
+     */
     Stream<Feature> connect(Connection connection) throws SQLException, DataStoreException;
 
+    /**
+     * Provides an approximate query to resume data loaded.
+     *
+     * @param count If the query estimation is needed for a count operation, in which case
the returned query should be
+     *              a count query.
+     * @return SQL query describing the way this component load data. Never null. However,
implementations are free to
+     * throw {@link UnsupportedOperationException} if they do not support such operation.
+     */
     String estimateStatement(final boolean count);
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
index 0162e67..17a345a 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/FeatureAdapter.java
@@ -57,7 +57,7 @@ class FeatureAdapter {
         return features;
     }
 
-    static class PropertyMapper {
+    static final class PropertyMapper {
         // TODO: by using a indexed implementation of Feature, we could avoid the name mapping.
However, a JMH benchmark
         // would be required in order to be sure it's impacting performance positively.
         final String propertyName;
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
index 68d798e..a78ed2a 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryBuilder.java
@@ -2,6 +2,7 @@ package org.apache.sis.internal.sql.feature;
 
 /**
  * API to allow overrided SQL Stream to delegate a set of intermediate operations to native
driver.
+ * TODO: move as inner interface of {@link StreamSQL} ?
  */
 interface QueryBuilder {
 
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
index 0565036..fdf518d 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QueryFeatureSet.java
@@ -4,6 +4,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
 import java.util.Spliterator;
 import java.util.function.Consumer;
 import java.util.regex.Matcher;
@@ -78,6 +79,13 @@ public class QueryFeatureSet extends AbstractFeatureSet {
     private final boolean distinct;
 
     /**
+     * Debug flag to activate (use {@link PrefetchSpliterator}) or de-activate (use {@link
ResultSpliterator})
+     * batch loading of results.
+     */
+    boolean allowBatchLoading = true;
+    float fetchRatio = 0.5f;
+
+    /**
      * Same as {@link #QueryFeatureSet(SQLBuilder, DataSource, Connection)}, except query
is provided by a fixed text
      * instead of a builder.
      */
@@ -186,19 +194,21 @@ public class QueryFeatureSet extends AbstractFeatureSet {
 
     @Override
     public Stream<Feature> features(boolean parallel) {
-        return new StreamSQL(new QueryAdapter(queryBuilder), source);
+        return new StreamSQL(new QueryAdapter(queryBuilder, parallel), source, parallel);
     }
 
     private final class QueryAdapter implements QueryBuilder {
 
         private final SQLBuilder source;
+        private final boolean parallel;
 
         private long additionalOffset, additionalLimit;
 
-        QueryAdapter(SQLBuilder source) {
+        QueryAdapter(SQLBuilder source, boolean parallel) {
             // defensive copy
             this.source = new SQLBuilder(source);
             this.source.append(source.toString());
+            this.parallel = parallel;
         }
 
         @Override
@@ -236,7 +246,7 @@ public class QueryFeatureSet extends AbstractFeatureSet {
                 }
 
                 Features.addOffsetLimit(source, nativeOffset, nativeLimit);
-                return new PreparedQueryConnector(source.toString(), javaOffset, javaLimit);
+                return new PreparedQueryConnector(source.toString(), javaOffset, javaLimit,
parallel);
             }
             throw new UnsupportedOperationException("Not supported yet: modifying user query");
// "Alexis Manin (Geomatys)" on 24/09/2019
         }
@@ -246,19 +256,25 @@ public class QueryFeatureSet extends AbstractFeatureSet {
 
         final String sql;
         private long additionalOffset, additionalLimit;
+        private final boolean parallel;
 
-        private PreparedQueryConnector(String sql, long additionalOffset, long additionalLimit)
{
+        private PreparedQueryConnector(String sql, long additionalOffset, long additionalLimit,
boolean parallel) {
             this.sql = sql;
             this.additionalOffset = additionalOffset;
             this.additionalLimit = additionalLimit;
+            this.parallel = parallel;
         }
 
         @Override
         public Stream<Feature> connect(Connection connection) throws SQLException,
DataStoreException {
             final PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+            statement.setFetchSize(100);
             final ResultSet result = statement.executeQuery();
-
-            Stream<Feature> stream = StreamSupport.stream(new ResultSpliterator(result),
false);
+            final int fetchSize = result.getFetchSize();
+            final boolean withPrefetch = !allowBatchLoading || (fetchSize < 1 || fetchSize
>= Integer.MAX_VALUE);
+            final Spliterator<Feature> spliterator = withPrefetch ?
+                    new ResultSpliterator(result) : new PrefetchSpliterator(result, fetchRatio);
+            Stream<Feature> stream = StreamSupport.stream(spliterator, parallel &&
withPrefetch);
             if (additionalLimit > 0) stream = stream.limit(additionalLimit);
             if (additionalOffset > 0) stream = stream.skip(additionalOffset);
 
@@ -276,19 +292,38 @@ public class QueryFeatureSet extends AbstractFeatureSet {
 
         @Override
         public String estimateStatement(boolean count) {
+            // Require analysis. Things could get complicated if original user query is already
a count.
             throw new UnsupportedOperationException("Not supported yet"); // "Alexis Manin
(Geomatys)" on 24/09/2019
         }
     }
 
-    private final class ResultSpliterator implements Spliterator<Feature> {
+    private abstract class QuerySpliterator  implements java.util.Spliterator<Feature>
{
 
         final ResultSet result;
 
-        private ResultSpliterator(ResultSet result) {
+        private QuerySpliterator(ResultSet result) {
             this.result = result;
         }
 
         @Override
+        public long estimateSize() {
+            return originLimit > 0 ? originLimit : Long.MAX_VALUE;
+        }
+
+        @Override
+        public int characteristics() {
+            // TODO: determine if it's order by analysing user query. SIZED is not possible,
as limit is an upper threshold.
+            return Spliterator.IMMUTABLE | Spliterator.NONNULL | (distinct ? Spliterator.DISTINCT
: 0);
+        }
+    }
+
+    private final class ResultSpliterator extends QuerySpliterator {
+
+        private ResultSpliterator(ResultSet result) {
+            super(result);
+        }
+
+        @Override
         public boolean tryAdvance(Consumer<? super Feature> action) {
             try {
                 if (result.next()) {
@@ -305,23 +340,63 @@ public class QueryFeatureSet extends AbstractFeatureSet {
         public Spliterator<Feature> trySplit() {
             return null;
         }
+    }
+
+    private static SQLBuilder fromQuery(final String query, final Connection conn) throws
SQLException {
+        return new SQLBuilder(conn.getMetaData(), true)
+                .append(query);
+    }
+
+    private final class PrefetchSpliterator extends QuerySpliterator {
+
+        final int fetchSize;
+
+        int idx;
+        List<Feature> chunk;
+
+        private PrefetchSpliterator(ResultSet result) throws SQLException {
+            this(result, 0.5f);
+        }
+
+        private PrefetchSpliterator(ResultSet result, float fetchRatio) throws SQLException
{
+            super(result);
+            this.fetchSize = Math.max((int) (result.getFetchSize()*fetchRatio), 1);
+        }
 
         @Override
-        public long estimateSize() {
-            // TODO: economic size estimation ? A count query seems overkill for the aim
of this API. Howver, we could
-            // analyze user query in search for a limit value.
-            return originLimit > 0 ? originLimit : Long.MAX_VALUE;
+        public boolean tryAdvance(Consumer<? super Feature> action) {
+            if (ensureChunkAvailable()) {
+                action.accept(chunk.get(idx++));
+                return true;
+            }
+            return false;
+        }
+
+        public Spliterator<Feature> trySplit() {
+            if (!ensureChunkAvailable()) return null;
+            final Spliterator<Feature> chunkSpliterator = idx == 0 ?
+                    chunk.spliterator() : chunk.subList(idx, chunk.size()).spliterator();
+            chunk = null;
+            idx = 0;
+            return chunkSpliterator;
         }
 
         @Override
         public int characteristics() {
-            // TODO: determine if it's sorted by analysing user query. SIZED is not possible,
as limit is an upper threshold.
-            return Spliterator.IMMUTABLE | Spliterator.NONNULL;
+            return super.characteristics() | Spliterator.CONCURRENT;
         }
-    }
 
-    private static SQLBuilder fromQuery(final String query, final Connection conn) throws
SQLException {
-        return new SQLBuilder(conn.getMetaData(), true)
-                .append(query);
+        private boolean ensureChunkAvailable() {
+            if (chunk == null || idx >= chunk.size()) {
+                idx = 0;
+                try {
+                    chunk = adapter.prefetch(fetchSize, result);
+                } catch (SQLException e) {
+                    throw new BackingStoreException(e);
+                }
+                return chunk != null && !chunk.isEmpty();
+            }
+            return true;
+        }
     }
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java
new file mode 100644
index 0000000..f2c62ef
--- /dev/null
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/QuerySpliteratorsBench.java
@@ -0,0 +1,142 @@
+package org.apache.sis.internal.sql.feature;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sis.internal.metadata.sql.Initializer;
+
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+
+import static org.apache.sis.util.ArgumentChecks.ensureStrictlyPositive;
+
+@Fork(value = 2, jvmArgs = {"-server", "-Xmx2g"} )
+@Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
+public class QuerySpliteratorsBench {
+
+    @State(Scope.Benchmark)
+    public static class DatabaseInput {
+
+        @Param({"1000", "100000"})
+        int numRows;
+
+        @Param({"true", "false"})
+        boolean parallel;
+
+        @Param({"true", "false"})
+        boolean prefetch;
+
+        EmbeddedDataSource db;
+        QueryFeatureSet fs;
+
+        public DatabaseInput() {}
+
+        @Setup(Level.Trial)
+        public void setup() throws SQLException {
+            ensureStrictlyPositive("Number of rows", numRows);
+
+            db = new EmbeddedDataSource();
+            db.setDatabaseName("memory:spliterators");
+            db.setDataSourceName("Apache SIS test database");
+            db.setCreateDatabase("create");
+
+            try (Connection c = db.getConnection()) {
+                c.createStatement().execute(
+                        "CREATE TABLE TEST (str CHARACTER VARYING(20), myInt INTEGER, myDouble
DOUBLE)"
+                );
+                final PreparedStatement st = c.prepareStatement("INSERT INTO TEST values
(?, ?, ?)");
+
+                final Random rand = new Random();
+                int rows = 1;
+                final byte[] txt = new byte[20];
+                do {
+                    for (int i = 0; i < 500 ; i++, rows++) {
+                        rand.nextBytes(txt);
+                        st.setString(1, new String(txt, StandardCharsets.US_ASCII));
+                        st.setInt(2, rand.nextInt());
+                        st.setDouble(3, rand.nextDouble());
+                        st.addBatch();
+                    }
+                    st.executeBatch();
+                    st.clearBatch();
+                } while (rows < numRows);
+
+                fs = new QueryFeatureSet("SELECT * FROM TEST", db, c);
+                fs.allowBatchLoading = prefetch;
+            }
+        }
+
+        @TearDown
+        public void dropDatabase() throws SQLException {
+            db.setCreateDatabase("no");
+            db.setConnectionAttributes("drop=true");
+            try {
+                db.getConnection().close();
+            } catch (SQLException e) {                          // This is the expected exception.
+                if (!Initializer.isSuccessfulShutdown(e)) {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    @Benchmark
+    public void test(DatabaseInput input) throws SQLException {
+        final int sum = input.fs.features(input.parallel).mapToInt(f -> 1).sum();
+        if (sum != input.numRows) throw new AssertionError("..." + sum + "..." + "WTF ?!");
+    }
+/*
+    @Test
+    public void benchmark() throws Exception {
+        System.out.println("COMMON POOL: "+ ForkJoinPool.getCommonPoolParallelism());
+        final DatabaseInput db = new DatabaseInput();
+        db.numRows = 100000;
+        db.parallel = true;
+
+        long start = System.nanoTime();
+        db.setup();
+        System.out.println("Insertion time: "+((System.nanoTime()-start)/1e6)+" ms");
+
+        // warmup
+        for (int i = 0 ;  i < 5 ; i++) {
+            test(db);
+            test(db);
+        }
+
+        // go
+        long prefetch = 0, noprefetch = 0;
+        for (int i = 0 ; i < 100 ; i++) {
+            start = System.nanoTime();
+            test(db);
+            prefetch += System.nanoTime()-start;
+
+            start = System.nanoTime();
+            test(db);
+            noprefetch += System.nanoTime()-start;
+        }
+
+        System.out.println(String.format(
+                "Performances:%nP: %d%nI: %d",
+                (long) (prefetch / 1e7), (long) (noprefetch / 1e8)
+        ));
+    }
+*/
+    public static void main(String... args) throws Exception {
+        org.openjdk.jmh.Main.main(args);
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
index 890a9dd..a0e9888 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
@@ -78,13 +78,14 @@ class StreamSQL extends StreamDecoration<Feature> {
 
     private Consumer<SQLException> warningConsumer = e -> Logging.getLogger("sis.sql").log(Level.FINE,
"Cannot properly close a connection", e);
 
-    StreamSQL(final Table source) {
-        this(new Features.Builder(source), source.source);
+    StreamSQL(final Table source, boolean parallel) {
+        this(new Features.Builder(source), source.source, parallel);
     }
 
-    StreamSQL(QueryBuilder queryAdapter, DataSource source) {
+    StreamSQL(QueryBuilder queryAdapter, DataSource source, boolean parallel) {
         this.queryAdapter = queryAdapter;
         this.source = source;
+        this.parallel = parallel;
     }
 
     public void setWarningConsumer(Consumer<SQLException> warningConsumer) {
@@ -171,8 +172,14 @@ class StreamSQL extends StreamDecoration<Feature> {
     @Override
     public long count() {
         // Avoid opening a connection if sql text cannot be evaluated.
-        final String sql = select().estimateStatement(true);
-        try (Connection conn = source.getConnection()) {
+        final String sql;
+        try {
+            sql = select().estimateStatement(true);
+        } catch (UnsupportedOperationException e) {
+            // If underlying connector does not support query estimation, we will fallback
on brut-force counting.
+            return super.count();
+        }
+        try (Connection conn = QueryFeatureSet.connectReadOnly(source)) {
             try (Statement st = conn.createStatement();
                  ResultSet rs = st.executeQuery(sql)) {
                 if (rs.next()) {
@@ -199,7 +206,7 @@ class StreamSQL extends StreamDecoration<Feature> {
                 })
                 .onClose(() -> closeRef(connectionRef, true));
         if (peekAction != null) featureStream = featureStream.peek(peekAction);
-        return parallel ? featureStream : featureStream.parallel();
+        return parallel ? featureStream.parallel() : featureStream;
     }
 
     /**
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
index 80fc52b..81b8c75 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
@@ -418,7 +418,7 @@ final class Table extends AbstractFeatureSet {
      */
     @Override
     public Stream<Feature> features(final boolean parallel) throws DataStoreException
{
-        return new StreamSQL(this);
+        return new StreamSQL(this, parallel);
     }
 
     /**


Mime
View raw message