sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject [sis] 04/45: feat(SQL-Store): improve stream decoration to handle mappings to/from DoubleStream.
Date Tue, 12 Nov 2019 16:44:31 GMT
This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/sql-store
in repository https://gitbox.apache.org/repos/asf/sis.git

commit cd98b745530bb91b268a01ebce248eb859dd30f4
Author: Alexis Manin <amanin@apache.org>
AuthorDate: Wed Aug 21 13:17:34 2019 +0200

    feat(SQL-Store): improve stream decoration to handle mappings to/from DoubleStream.
---
 .../sis/internal/util/BaseStreamDecoration.java    |  68 ++++++++
 .../sis/internal/util/DoubleStreamDecoration.java  | 178 +++++++++++++++++++++
 .../apache/sis/internal/util/StreamDecoration.java | 110 ++++---------
 .../apache/sis/internal/sql/feature/StreamSQL.java | 149 ++++++++++++++++-
 4 files changed, 426 insertions(+), 79 deletions(-)

diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/BaseStreamDecoration.java
b/core/sis-utility/src/main/java/org/apache/sis/internal/util/BaseStreamDecoration.java
new file mode 100644
index 0000000..f20e32d
--- /dev/null
+++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/BaseStreamDecoration.java
@@ -0,0 +1,68 @@
+package org.apache.sis.internal.util;
+
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.stream.BaseStream;
+
+public abstract class BaseStreamDecoration<T, S extends BaseStream<T, S>> implements
BaseStream<T, S> {
+
+    private S decorated;
+
+    private boolean closed;
+
+    /**
+     * Get previously created wrapped stream, or create it if never done.
+     * @return
+     */
+    protected final S getOrCreate() {
+        if (closed) throw new IllegalStateException("Stream has already been closed.");
+        if (decorated == null) {
+            decorated = createDecoratedStream();
+        }
+
+        return decorated;
+    }
+
+    protected abstract S createDecoratedStream();
+
+    @Override
+    public void close() {
+        closed = true;
+        if (decorated != null) decorated.close();
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return getOrCreate().iterator();
+    }
+
+    @Override
+    public Spliterator<T> spliterator() {
+        return getOrCreate().spliterator();
+    }
+
+    @Override
+    public boolean isParallel() {
+        return getOrCreate().isParallel();
+    }
+
+    @Override
+    public S sequential() {
+        return getOrCreate().sequential();
+    }
+
+    @Override
+    public S parallel() {
+        return getOrCreate().parallel();
+    }
+
+    @Override
+    public S unordered() {
+        return getOrCreate().unordered();
+    }
+
+    @Override
+    public S onClose(Runnable closeHandler) {
+        return getOrCreate().onClose(closeHandler);
+    }
+}
diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/DoubleStreamDecoration.java
b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DoubleStreamDecoration.java
new file mode 100644
index 0000000..1215169
--- /dev/null
+++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/DoubleStreamDecoration.java
@@ -0,0 +1,178 @@
+package org.apache.sis.internal.util;
+
+import java.util.DoubleSummaryStatistics;
+import java.util.OptionalDouble;
+import java.util.PrimitiveIterator;
+import java.util.Spliterator;
+import java.util.function.BiConsumer;
+import java.util.function.DoubleBinaryOperator;
+import java.util.function.DoubleConsumer;
+import java.util.function.DoubleFunction;
+import java.util.function.DoublePredicate;
+import java.util.function.DoubleToIntFunction;
+import java.util.function.DoubleToLongFunction;
+import java.util.function.DoubleUnaryOperator;
+import java.util.function.ObjDoubleConsumer;
+import java.util.function.Supplier;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+
+public abstract class DoubleStreamDecoration extends BaseStreamDecoration<Double, DoubleStream>
implements DoubleStream {
+
+    @Override
+    public DoubleStream filter(DoublePredicate predicate) {
+        return getOrCreate().filter(predicate);
+    }
+
+    @Override
+    public DoubleStream map(DoubleUnaryOperator mapper) {
+        return getOrCreate().map(mapper);
+    }
+
+    @Override
+    public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
+        return getOrCreate().mapToObj(mapper);
+    }
+
+    @Override
+    public IntStream mapToInt(DoubleToIntFunction mapper) {
+        return getOrCreate().mapToInt(mapper);
+    }
+
+    @Override
+    public LongStream mapToLong(DoubleToLongFunction mapper) {
+        return getOrCreate().mapToLong(mapper);
+    }
+
+    @Override
+    public DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
+        return getOrCreate().flatMap(mapper);
+    }
+
+    @Override
+    public DoubleStream distinct() {
+        return getOrCreate().distinct();
+    }
+
+    @Override
+    public DoubleStream sorted() {
+        return getOrCreate().sorted();
+    }
+
+    @Override
+    public DoubleStream peek(DoubleConsumer action) {
+        return getOrCreate().peek(action);
+    }
+
+    @Override
+    public DoubleStream limit(long maxSize) {
+        return getOrCreate().limit(maxSize);
+    }
+
+    @Override
+    public DoubleStream skip(long n) {
+        return getOrCreate().skip(n);
+    }
+
+    @Override
+    public void forEach(DoubleConsumer action) {
+        getOrCreate().forEach(action);
+    }
+
+    @Override
+    public void forEachOrdered(DoubleConsumer action) {
+        getOrCreate().forEachOrdered(action);
+    }
+
+    @Override
+    public double[] toArray() {
+        return getOrCreate().toArray();
+    }
+
+    @Override
+    public double reduce(double identity, DoubleBinaryOperator op) {
+        return getOrCreate().reduce(identity, op);
+    }
+
+    @Override
+    public OptionalDouble reduce(DoubleBinaryOperator op) {
+        return getOrCreate().reduce(op);
+    }
+
+    @Override
+    public <R> R collect(Supplier<R> supplier, ObjDoubleConsumer<R> accumulator,
BiConsumer<R, R> combiner) {
+        return getOrCreate().collect(supplier, accumulator, combiner);
+    }
+
+    @Override
+    public double sum() {
+        return getOrCreate().sum();
+    }
+
+    @Override
+    public OptionalDouble min() {
+        return getOrCreate().min();
+    }
+
+    @Override
+    public OptionalDouble max() {
+        return getOrCreate().max();
+    }
+
+    @Override
+    public long count() {
+        return getOrCreate().count();
+    }
+
+    @Override
+    public OptionalDouble average() {
+        return getOrCreate().average();
+    }
+
+    @Override
+    public DoubleSummaryStatistics summaryStatistics() {
+        return getOrCreate().summaryStatistics();
+    }
+
+    @Override
+    public boolean anyMatch(DoublePredicate predicate) {
+        return getOrCreate().anyMatch(predicate);
+    }
+
+    @Override
+    public boolean allMatch(DoublePredicate predicate) {
+        return getOrCreate().allMatch(predicate);
+    }
+
+    @Override
+    public boolean noneMatch(DoublePredicate predicate) {
+        return getOrCreate().noneMatch(predicate);
+    }
+
+    @Override
+    public OptionalDouble findFirst() {
+        return getOrCreate().findFirst();
+    }
+
+    @Override
+    public OptionalDouble findAny() {
+        return getOrCreate().findAny();
+    }
+
+    @Override
+    public Stream<Double> boxed() {
+        return getOrCreate().boxed();
+    }
+
+    @Override
+    public PrimitiveIterator.OfDouble iterator() {
+        return getOrCreate().iterator();
+    }
+
+    @Override
+    public Spliterator.OfDouble spliterator() {
+        return getOrCreate().spliterator();
+    }
+}
diff --git a/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java
b/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java
index 53637b2..45cd0f8 100644
--- a/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java
+++ b/core/sis-utility/src/main/java/org/apache/sis/internal/util/StreamDecoration.java
@@ -1,9 +1,7 @@
 package org.apache.sis.internal.util;
 
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.Optional;
-import java.util.Spliterator;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.BinaryOperator;
@@ -21,81 +19,81 @@ import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 import java.util.stream.Stream;
 
-public abstract class StreamDecoration<T> implements Stream<T> {
+public abstract class StreamDecoration<T> extends BaseStreamDecoration<T, Stream<T>>
implements Stream<T> {
 
     @Override
     public Stream<T> filter(Predicate<? super T> predicate) {
-        return getDecoratedStream().filter(predicate);
+        return getOrCreate().filter(predicate);
     }
 
     @Override
     public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
-        return getDecoratedStream().map(mapper);
+        return getOrCreate().map(mapper);
     }
 
     @Override
     public IntStream mapToInt(ToIntFunction<? super T> mapper) {
-        return getDecoratedStream().mapToInt(mapper);
+        return getOrCreate().mapToInt(mapper);
     }
 
     @Override
     public LongStream mapToLong(ToLongFunction<? super T> mapper) {
-        return getDecoratedStream().mapToLong(mapper);
+        return getOrCreate().mapToLong(mapper);
     }
 
     @Override
     public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
-        return getDecoratedStream().mapToDouble(mapper);
+        return getOrCreate().mapToDouble(mapper);
     }
 
     @Override
     public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<?
extends R>> mapper) {
-        return getDecoratedStream().flatMap(mapper);
+        return getOrCreate().flatMap(mapper);
     }
 
     @Override
     public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper)
{
-        return getDecoratedStream().flatMapToInt(mapper);
+        return getOrCreate().flatMapToInt(mapper);
     }
 
     @Override
     public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper)
{
-        return getDecoratedStream().flatMapToLong(mapper);
+        return getOrCreate().flatMapToLong(mapper);
     }
 
     @Override
     public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream>
mapper) {
-        return getDecoratedStream().flatMapToDouble(mapper);
+        return getOrCreate().flatMapToDouble(mapper);
     }
 
     @Override
     public Stream<T> distinct() {
-        return getDecoratedStream().distinct();
+        return getOrCreate().distinct();
     }
 
     @Override
     public Stream<T> sorted() {
-        return getDecoratedStream().sorted();
+        return getOrCreate().sorted();
     }
 
     @Override
     public Stream<T> sorted(Comparator<? super T> comparator) {
-        return getDecoratedStream().sorted(comparator);
+        return getOrCreate().sorted(comparator);
     }
 
     @Override
     public Stream<T> peek(Consumer<? super T> action) {
-        return getDecoratedStream().peek(action);
+        return getOrCreate().peek(action);
     }
 
     @Override
     public Stream<T> limit(long maxSize) {
-        return getDecoratedStream().limit(maxSize);
+        return getOrCreate().limit(maxSize);
     }
 
     @Override
     public Stream<T> skip(long n) {
-        return getDecoratedStream().skip(n);
+        return getOrCreate().skip(n);
     }
 
 /*
@@ -112,128 +110,86 @@ public abstract class StreamDecoration<T> implements Stream<T>
{
 
     @Override
     public void forEach(Consumer<? super T> action) {
-        getDecoratedStream().forEach(action);
+        getOrCreate().forEach(action);
     }
 
     @Override
     public void forEachOrdered(Consumer<? super T> action) {
-        getDecoratedStream().forEachOrdered(action);
+        getOrCreate().forEachOrdered(action);
     }
 
     @Override
     public Object[] toArray() {
-        return getDecoratedStream().toArray();
+        return getOrCreate().toArray();
     }
 
     @Override
     public <A> A[] toArray(IntFunction<A[]> generator) {
-        return getDecoratedStream().toArray(generator);
+        return getOrCreate().toArray(generator);
     }
 
     @Override
     public T reduce(T identity, BinaryOperator<T> accumulator) {
-        return getDecoratedStream().reduce(identity, accumulator);
+        return getOrCreate().reduce(identity, accumulator);
     }
 
     @Override
     public Optional<T> reduce(BinaryOperator<T> accumulator) {
-        return getDecoratedStream().reduce(accumulator);
+        return getOrCreate().reduce(accumulator);
     }
 
     @Override
     public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner) {
-        return getDecoratedStream().reduce(identity, accumulator, combiner);
+        return getOrCreate().reduce(identity, accumulator, combiner);
     }
 
     @Override
     public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T>
accumulator, BiConsumer<R, R> combiner) {
-        return getDecoratedStream().collect(supplier, accumulator, combiner);
+        return getOrCreate().collect(supplier, accumulator, combiner);
     }
 
     @Override
     public <R, A> R collect(Collector<? super T, A, R> collector) {
-        return getDecoratedStream().collect(collector);
+        return getOrCreate().collect(collector);
     }
 
     @Override
     public Optional<T> min(Comparator<? super T> comparator) {
-        return getDecoratedStream().min(comparator);
+        return getOrCreate().min(comparator);
     }
 
     @Override
     public Optional<T> max(Comparator<? super T> comparator) {
-        return getDecoratedStream().max(comparator);
+        return getOrCreate().max(comparator);
     }
 
     @Override
     public long count() {
-        return getDecoratedStream().count();
+        return getOrCreate().count();
     }
 
     @Override
     public boolean anyMatch(Predicate<? super T> predicate) {
-        return getDecoratedStream().anyMatch(predicate);
+        return getOrCreate().anyMatch(predicate);
     }
 
     @Override
     public boolean allMatch(Predicate<? super T> predicate) {
-        return getDecoratedStream().allMatch(predicate);
+        return getOrCreate().allMatch(predicate);
     }
 
     @Override
     public boolean noneMatch(Predicate<? super T> predicate) {
-        return getDecoratedStream().noneMatch(predicate);
+        return getOrCreate().noneMatch(predicate);
     }
 
     @Override
     public Optional<T> findFirst() {
-        return getDecoratedStream().findFirst();
+        return getOrCreate().findFirst();
     }
 
     @Override
     public Optional<T> findAny() {
-        return getDecoratedStream().findAny();
+        return getOrCreate().findAny();
     }
-
-    @Override
-    public Iterator<T> iterator() {
-        return getDecoratedStream().iterator();
-    }
-
-    @Override
-    public Spliterator<T> spliterator() {
-        return getDecoratedStream().spliterator();
-    }
-
-    @Override
-    public boolean isParallel() {
-        return getDecoratedStream().isParallel();
-    }
-
-    @Override
-    public Stream<T> sequential() {
-        return getDecoratedStream().sequential();
-    }
-
-    @Override
-    public Stream<T> parallel() {
-        return getDecoratedStream().parallel();
-    }
-
-    @Override
-    public Stream<T> unordered() {
-        return getDecoratedStream().unordered();
-    }
-
-    @Override
-    public Stream<T> onClose(Runnable closeHandler) {
-        return getDecoratedStream().onClose(closeHandler);
-    }
-
-    @Override
-    public void close() {
-        getDecoratedStream().close();
-    }
-
-    protected abstract Stream<T> getDecoratedStream();
 }
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
index 7e948a3..88c59dd 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
@@ -7,6 +7,8 @@ import java.sql.Statement;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.DoubleFunction;
+import java.util.function.DoubleUnaryOperator;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.function.ToDoubleFunction;
@@ -20,6 +22,7 @@ import java.util.stream.StreamSupport;
 
 import org.opengis.feature.Feature;
 
+import org.apache.sis.internal.util.DoubleStreamDecoration;
 import org.apache.sis.internal.util.StreamDecoration;
 import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.util.collection.BackingStoreException;
@@ -39,7 +42,7 @@ class StreamSQL extends StreamDecoration<Feature> {
 
     @Override
     public <R> Stream<R> map(Function<? super Feature, ? extends R> mapper)
{
-        return super.map(mapper);
+        return new MappedStream<>(mapper, this);
     }
 
     @Override
@@ -114,7 +117,7 @@ class StreamSQL extends StreamDecoration<Feature> {
     }
 
     @Override
-    protected synchronized Stream<Feature> getDecoratedStream() {
+    protected synchronized Stream<Feature> createDecoratedStream() {
             final AtomicReference<Connection> connectionRef = new AtomicReference<>();
             return Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection()))
                     .map(Supplier::get)
@@ -141,4 +144,146 @@ class StreamSQL extends StreamDecoration<Feature> {
             }
         };
     }
+
+    private static class MappedStream<I, O> extends StreamDecoration<O> {
+         private final Function<? super I, ? extends O> mapper;
+         Stream<I> source;
+
+        private MappedStream(Function<? super I, ? extends O> mapper, Stream<I>
source) {
+            this.mapper = mapper;
+            this.source = source;
+        }
+
+        @Override
+        public Stream<O> distinct() {
+            source = source.distinct();
+            return this;
+        }
+
+        @Override
+        public Stream<O> limit(long maxSize) {
+            source = source.limit(maxSize);
+            return this;
+        }
+
+        @Override
+        public Stream<O> skip(long n) {
+            source = source.skip(n);
+            return this;
+        }
+
+        @Override
+        public long count() {
+            return source.count();
+        }
+
+        @Override
+        public boolean isParallel() {
+            return source.isParallel();
+        }
+
+        @Override
+        public Stream<O> sequential() {
+            source = source.sequential();
+            return this;
+        }
+
+        @Override
+        public Stream<O> parallel() {
+            source = source.parallel();
+            return this;
+        }
+
+        @Override
+        public <R> Stream<R> map(Function<? super O, ? extends R> mapper)
{
+            return new MappedStream<>(this.mapper.andThen(mapper), source);
+        }
+
+        @Override
+        public DoubleStream mapToDouble(ToDoubleFunction<? super O> mapper) {
+            return new ToDoubleStream<I>(source, i -> mapper.applyAsDouble(this.mapper.apply(i)));
+        }
+
+        @Override
+        protected Stream<O> createDecoratedStream() {
+            // Break possible infinite loop by sinking source content through its spliterator
(terminal op).
+            final Stream<I> sink = StreamSupport.stream(source.spliterator(), source.isParallel());
+            sink.onClose(source::close);
+            return sink.map(mapper);
+        }
+    }
+
+    private static class ToDoubleStream<T> extends DoubleStreamDecoration {
+
+        Stream<T> source;
+        final ToDoubleFunction<T> toDouble;
+
+        private ToDoubleStream(Stream<T> source, ToDoubleFunction<T> toDouble)
{
+            this.source = source;
+            this.toDouble = toDouble;
+        }
+
+        @Override
+        public DoubleStream map(DoubleUnaryOperator mapper) {
+            return new ToDoubleStream<T>(source, t -> mapper.applyAsDouble(toDouble.applyAsDouble(t)));
+        }
+
+        @Override
+        public <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper)
{
+            return new MappedStream<T, U>(t -> mapper.apply(toDouble.applyAsDouble(t)),
source);
+        }
+
+        @Override
+        public DoubleStream distinct() {
+            source = source.distinct();
+            return this;
+        }
+
+        @Override
+        public DoubleStream limit(long maxSize) {
+            source = source.limit(maxSize);
+            return this;
+        }
+
+        @Override
+        public DoubleStream skip(long n) {
+            source = source.skip(n);
+            return this;
+        }
+
+        @Override
+        public long count() {
+            return source.count();
+        }
+
+        @Override
+        public Stream<Double> boxed() {
+            return new MappedStream<>(t -> Double.valueOf(toDouble.applyAsDouble(t)),
source);
+        }
+
+        @Override
+        public boolean isParallel() {
+            return source.isParallel();
+        }
+
+        @Override
+        public DoubleStream sequential() {
+            source = source.sequential();
+            return this;
+        }
+
+        @Override
+        public DoubleStream parallel() {
+            source = source.parallel();
+            return this;
+        }
+
+        @Override
+        protected DoubleStream createDecoratedStream() {
+            // Break possible cycle by sinking source content through its spliterator (terminal
op).
+            final Stream<T> sink = StreamSupport.stream(source.spliterator(), source.isParallel());
+            sink.onClose(source::close);
+            return sink.mapToDouble(toDouble);
+        }
+    }
 }


Mime
View raw message