sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject [sis] 02/45: refactor(SQL-Store): try to improve count operation by overriding returned stream.
Date Tue, 12 Nov 2019 16:44:29 GMT
This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/sql-store
in repository https://gitbox.apache.org/repos/asf/sis.git

commit 569f505ba48682022ad445a6be4194430c27a787
Author: Alexis Manin <amanin@apache.org>
AuthorDate: Wed Aug 14 16:30:31 2019 +0200

    refactor(SQL-Store): try to improve count operation by overriding returned stream.
---
 .../apache/sis/internal/util/DecoratedStream.java  | 243 +++++++++++++++++++++
 .../org/apache/sis/internal/sql/feature/Table.java | 124 ++++++++---
 2 files changed, 333 insertions(+), 34 deletions(-)

diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java
b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java
new file mode 100644
index 0000000..eb398d9
--- /dev/null
+++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DecoratedStream.java
@@ -0,0 +1,243 @@
+package org.apache.sis.internal.util;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.function.ToDoubleFunction;
+import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collector;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+public class DecoratedStream<T> implements Stream<T> {
+
+    final Stream<T> source;
+
+    protected DecoratedStream(Stream<T> source) {
+        this.source = source;
+    }
+
+    @Override
+    public Stream<T> filter(Predicate<? super T> predicate) {
+        return source.filter(predicate);
+    }
+
+    @Override
+    public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
+        return source.map(mapper);
+    }
+
+    @Override
+    public IntStream mapToInt(ToIntFunction<? super T> mapper) {
+        return source.mapToInt(mapper);
+    }
+
+    @Override
+    public LongStream mapToLong(ToLongFunction<? super T> mapper) {
+        return source.mapToLong(mapper);
+    }
+
+    @Override
+    public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
+        return source.mapToDouble(mapper);
+    }
+
+    @Override
+    public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<?
extends R>> mapper) {
+        return source.flatMap(mapper);
+    }
+
+    @Override
+    public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper)
{
+        return source.flatMapToInt(mapper);
+    }
+
+    @Override
+    public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper)
{
+        return source.flatMapToLong(mapper);
+    }
+
+    @Override
+    public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream>
mapper) {
+        return source.flatMapToDouble(mapper);
+    }
+
+    @Override
+    public Stream<T> distinct() {
+        return source.distinct();
+    }
+
+    @Override
+    public Stream<T> sorted() {
+        return source.sorted();
+    }
+
+    @Override
+    public Stream<T> sorted(Comparator<? super T> comparator) {
+        return source.sorted(comparator);
+    }
+
+    @Override
+    public Stream<T> peek(Consumer<? super T> action) {
+        return source.peek(action);
+    }
+
+    @Override
+    public Stream<T> limit(long maxSize) {
+        return source.limit(maxSize);
+    }
+
+    @Override
+    public Stream<T> skip(long n) {
+        return source.skip(n);
+    }
+
+/*
+    @Override
+    public Stream<T> takeWhile(Predicate<? super T> predicate) {
+        return source.takeWhile(predicate);
+    }
+
+    @Override
+    public Stream<T> dropWhile(Predicate<? super T> predicate) {
+        return source.dropWhile(predicate);
+    }
+*/
+
+    @Override
+    public void forEach(Consumer<? super T> action) {
+        source.forEach(action);
+    }
+
+    @Override
+    public void forEachOrdered(Consumer<? super T> action) {
+        source.forEachOrdered(action);
+    }
+
+    @Override
+    public Object[] toArray() {
+        return source.toArray();
+    }
+
+    @Override
+    public <A> A[] toArray(IntFunction<A[]> generator) {
+        return source.toArray(generator);
+    }
+
+    @Override
+    public T reduce(T identity, BinaryOperator<T> accumulator) {
+        return source.reduce(identity, accumulator);
+    }
+
+    @Override
+    public Optional<T> reduce(BinaryOperator<T> accumulator) {
+        return source.reduce(accumulator);
+    }
+
+    @Override
+    public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner) {
+        return source.reduce(identity, accumulator, combiner);
+    }
+
+    @Override
+    public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T>
accumulator, BiConsumer<R, R> combiner) {
+        return source.collect(supplier, accumulator, combiner);
+    }
+
+    @Override
+    public <R, A> R collect(Collector<? super T, A, R> collector) {
+        return source.collect(collector);
+    }
+
+    @Override
+    public Optional<T> min(Comparator<? super T> comparator) {
+        return source.min(comparator);
+    }
+
+    @Override
+    public Optional<T> max(Comparator<? super T> comparator) {
+        return source.max(comparator);
+    }
+
+    @Override
+    public long count() {
+        return source.count();
+    }
+
+    @Override
+    public boolean anyMatch(Predicate<? super T> predicate) {
+        return source.anyMatch(predicate);
+    }
+
+    @Override
+    public boolean allMatch(Predicate<? super T> predicate) {
+        return source.allMatch(predicate);
+    }
+
+    @Override
+    public boolean noneMatch(Predicate<? super T> predicate) {
+        return source.noneMatch(predicate);
+    }
+
+    @Override
+    public Optional<T> findFirst() {
+        return source.findFirst();
+    }
+
+    @Override
+    public Optional<T> findAny() {
+        return source.findAny();
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return source.iterator();
+    }
+
+    @Override
+    public Spliterator<T> spliterator() {
+        return source.spliterator();
+    }
+
+    @Override
+    public boolean isParallel() {
+        return source.isParallel();
+    }
+
+    @Override
+    public Stream<T> sequential() {
+        return source.sequential();
+    }
+
+    @Override
+    public Stream<T> parallel() {
+        return source.parallel();
+    }
+
+    @Override
+    public Stream<T> unordered() {
+        return source.unordered();
+    }
+
+    @Override
+    public Stream<T> onClose(Runnable closeHandler) {
+        return source.onClose(closeHandler);
+    }
+
+    @Override
+    public void close() {
+        source.close();
+    }
+}
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
index 8c2f35a..380669b 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
@@ -16,47 +16,55 @@
  */
 package org.apache.sis.internal.sql.feature;
 
-import java.util.Map;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
-import java.sql.DatabaseMetaData;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
 import javax.sql.DataSource;
-import org.opengis.util.GenericName;
+
+import org.opengis.feature.AttributeType;
+import org.opengis.feature.Feature;
+import org.opengis.feature.FeatureAssociationRole;
+import org.opengis.feature.FeatureType;
 import org.opengis.referencing.crs.CoordinateReferenceSystem;
+import org.opengis.util.GenericName;
+
+import org.apache.sis.feature.builder.AssociationRoleBuilder;
 import org.apache.sis.feature.builder.AttributeRole;
 import org.apache.sis.feature.builder.AttributeTypeBuilder;
-import org.apache.sis.feature.builder.AssociationRoleBuilder;
 import org.apache.sis.feature.builder.FeatureTypeBuilder;
 import org.apache.sis.internal.feature.Geometries;
-import org.apache.sis.storage.DataStoreException;
-import org.apache.sis.storage.DataStoreContentException;
-import org.apache.sis.storage.InternalDataStoreException;
 import org.apache.sis.internal.metadata.sql.Reflection;
+import org.apache.sis.internal.metadata.sql.SQLBuilder;
 import org.apache.sis.internal.metadata.sql.SQLUtilities;
 import org.apache.sis.internal.storage.AbstractFeatureSet;
 import org.apache.sis.internal.util.CollectionsExt;
-import org.apache.sis.util.collection.WeakValueHashMap;
-import org.apache.sis.util.collection.TreeTable;
+import org.apache.sis.internal.util.DecoratedStream;
+import org.apache.sis.storage.DataStoreContentException;
+import org.apache.sis.storage.DataStoreException;
+import org.apache.sis.storage.InternalDataStoreException;
 import org.apache.sis.util.CharSequences;
-import org.apache.sis.util.Exceptions;
 import org.apache.sis.util.Classes;
-import org.apache.sis.util.Numbers;
 import org.apache.sis.util.Debug;
+import org.apache.sis.util.Numbers;
+import org.apache.sis.util.collection.BackingStoreException;
+import org.apache.sis.util.collection.TreeTable;
+import org.apache.sis.util.collection.WeakValueHashMap;
 
 // Branch-dependent imports
-import org.opengis.feature.Feature;
-import org.opengis.feature.FeatureType;
-import org.opengis.feature.AttributeType;
-import org.opengis.feature.FeatureAssociationRole;
 
 
 /**
@@ -602,21 +610,30 @@ final class Table extends AbstractFeatureSet {
      */
     @Override
     public Stream<Feature> features(final boolean parallel) throws DataStoreException
{
-        DataStoreException ex;
-        Connection connection = null;
-        try {
-            connection = source.getConnection();
-            final Features iter = features(connection, new ArrayList<>(), null);
-            return StreamSupport.stream(iter, parallel).onClose(iter);
-        } catch (SQLException cause) {
-            ex = new DataStoreException(Exceptions.unwrap(cause));
-        }
-        if (connection != null) try {
-            connection.close();
-        } catch (SQLException e) {
-            ex.addSuppressed(e);
+        final AtomicReference<Connection> connectionRef = new AtomicReference<>();
+        final Stream<Feature> featureStream = Stream.generate(uncheck(() -> source.getConnection()))
+                .peek(connectionRef::set)
+                .flatMap(conn -> {
+                    try {
+                        final Features iter = features(conn, new ArrayList<>(), null);
+                        return StreamSupport.stream(iter, parallel).onClose(iter);
+                    } catch (SQLException | InternalDataStoreException e) {
+                        throw new BackingStoreException(e);
+                    }
+                })
+                .onClose(() -> closeRef(connectionRef));
+        return new CountOverload<>(featureStream);
+    }
+
+    private void closeRef(final AtomicReference<Connection> ref) {
+        final Connection conn = ref.get();
+        if (conn != null) {
+            try {
+                conn.close();
+            } catch (SQLException e) {
+                warning(e);
+            }
         }
-        throw ex;
     }
 
     /**
@@ -631,4 +648,43 @@ final class Table extends AbstractFeatureSet {
     {
         return new Features(this, connection, attributeNames, attributeColumns, importedKeys,
exportedKeys, following, noFollow);
     }
+
+    private class CountOverload<T> extends DecoratedStream<T> {
+
+        CountOverload(Stream<T> source) {
+            super(source);
+        }
+
+        @Override
+        public long count() {
+            try (Connection conn = Table.this.source.getConnection()) {
+                final String query = new SQLBuilder(conn.getMetaData(), true)
+                        .append("SELECT COUNT(")
+                        .appendIdentifier(attributeColumns[0])
+                        .append(')')
+                        .append(" FROM ")
+                        .appendIdentifier(name.catalog, name.schema, name.table)
+                        .toString();
+                try (final Statement st = conn.createStatement(); final ResultSet rs = st.executeQuery(query))
{
+                    if (rs.next()) {
+                        return rs.getLong(1);
+                    } else return 0;
+                }
+            } catch (SQLException e) {
+                throw new BackingStoreException("Cannot estimate feature set size using SQL
COUNT query", e);
+            }
+        }
+    }
+
+    private static <T> Supplier<T> uncheck(final Callable<T> generator)
{
+        return () -> {
+            try {
+                return generator.call();
+            } catch (RuntimeException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new BackingStoreException(e);
+            }
+        };
+    }
 }


Mime
View raw message