sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject [sis] 08/45: fix(SQLStore): fix peek operation management on overriden SQL stream.
Date Tue, 12 Nov 2019 16:44:35 GMT
This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/sql-store
in repository https://gitbox.apache.org/repos/asf/sis.git

commit 3f36c855cff67f19fc7dce84d9e7e4a507b5ae6e
Author: Alexis Manin <amanin@apache.org>
AuthorDate: Tue Aug 27 14:32:31 2019 +0200

    fix(SQLStore): fix peek operation management on overriden SQL stream.
---
 .../apache/sis/internal/sql/feature/StreamSQL.java | 71 ++++++++--------
 .../org/apache/sis/storage/sql/SQLStoreTest.java   | 97 ++++++++++++++++++----
 2 files changed, 121 insertions(+), 47 deletions(-)

diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
index 1a59655..f6d8c3d 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
@@ -120,7 +120,7 @@ class StreamSQL extends StreamDecoration<Feature> {
             peekAction = action;
         } else {
             // Safe cast, because Stream values are strongly typed to O.
-            peekAction = peekAction.andThen((Consumer)action);
+            peekAction = peekAction.andThen((Consumer) action);
         }
 
         return this;
@@ -149,7 +149,8 @@ class StreamSQL extends StreamDecoration<Feature> {
             throw new BackingStoreException("Cannot create SQL COUNT query", e);
         }
         try (Connection conn = queryBuilder.parent.source.getConnection()) {
-            try (final Statement st = conn.createStatement(); final ResultSet rs = st.executeQuery(sql))
{
+            try (Statement st = conn.createStatement();
+                 ResultSet rs = st.executeQuery(sql)) {
                 if (rs.next()) {
                     return rs.getLong(1);
                 } else return 0;
@@ -161,19 +162,21 @@ class StreamSQL extends StreamDecoration<Feature> {
 
     @Override
     protected synchronized Stream<Feature> createDecoratedStream() {
-            final AtomicReference<Connection> connectionRef = new AtomicReference<>();
-            return Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection()))
-                    .map(Supplier::get)
-                    .peek(connectionRef::set)
-                    .flatMap(conn -> {
-                        try {
-                            final Features iter = queryBuilder.build(conn);
-                            return StreamSupport.stream(iter, parallel).onClose(iter);
-                        } catch (SQLException | DataStoreException e) {
-                            throw new BackingStoreException(e);
-                        }
-                    })
-                    .onClose(() -> queryBuilder.parent.closeRef(connectionRef));
+        final AtomicReference<Connection> connectionRef = new AtomicReference<>();
+        Stream<Feature> featureStream = Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection()))
+                .map(Supplier::get)
+                .peek(connectionRef::set)
+                .flatMap(conn -> {
+                    try {
+                        final Features iter = queryBuilder.build(conn);
+                        return StreamSupport.stream(iter, parallel).onClose(iter);
+                    } catch (SQLException | DataStoreException e) {
+                        throw new BackingStoreException(e);
+                    }
+                })
+                .onClose(() -> queryBuilder.parent.closeRef(connectionRef));
+        if (peekAction != null) featureStream = featureStream.peek(peekAction);
+        return featureStream;
     }
 
     /**
@@ -202,10 +205,9 @@ class StreamSQL extends StreamDecoration<Feature> {
      * @param <I> Type of object received as input of mapping operation.
      * @param <O> Return type of mapping operation.
      */
-    private static class MappedStream<I, O> extends StreamDecoration<O> {
-        private final Function<? super I, ? extends O> mapper;
+    private static final class MappedStream<I, O> extends StreamDecoration<O>
{
+        private Function<? super I, ? extends O> mapper;
         private Stream<I> source;
-        private Consumer<? super O> peekAction;
 
         private MappedStream(Function<? super I, ? extends O> mapper, Stream<I>
source) {
             this.mapper = mapper;
@@ -213,15 +215,8 @@ class StreamSQL extends StreamDecoration<Feature> {
         }
 
         @Override
-        @SuppressWarnings("unchecked")
         public Stream<O> peek(Consumer<? super O> action) {
-            if (peekAction == null) {
-                peekAction = action;
-            } else {
-                // Safe cast, because Stream values are strongly typed to O.
-                peekAction = peekAction.andThen((Consumer)action);
-            }
-
+            mapper = concatenate(mapper, action);
             return this;
         }
 
@@ -280,8 +275,7 @@ class StreamSQL extends StreamDecoration<Feature> {
             // Break possible infinite loop by sinking source content through its spliterator
(terminal op).
             final Stream<I> sink = StreamSupport.stream(source.spliterator(), source.isParallel());
             sink.onClose(source::close);
-            final Stream<O> result = sink.map(mapper);
-            return peekAction == null? result : result.peek(peekAction);
+            return sink.map(mapper);
         }
     }
 
@@ -291,12 +285,10 @@ class StreamSQL extends StreamDecoration<Feature> {
      *
      * @param <T> Type of objects contained in source stream (before double mapping).
      */
-    private static class ToDoubleStream<T> extends DoubleStreamDecoration {
+    private static final class ToDoubleStream<T> extends DoubleStreamDecoration {
 
         Stream<T> source;
-        final ToDoubleFunction<T> toDouble;
-
-        private DoubleConsumer peekAction;
+        ToDoubleFunction<T> toDouble;
 
         private ToDoubleStream(Stream<T> source, ToDoubleFunction<T> toDouble)
{
             this.source = source;
@@ -305,7 +297,12 @@ class StreamSQL extends StreamDecoration<Feature> {
 
         @Override
         public DoubleStream peek(DoubleConsumer action) {
-            peekAction = peekAction == null? action : peekAction.andThen(action);
+            final ToDoubleFunction<T> toDoubleFixedRef = toDouble;
+            toDouble = t -> {
+                final double value = toDoubleFixedRef.applyAsDouble(t);
+                action.accept(value);
+                return value;
+            };
             return this;
         }
 
@@ -372,4 +369,12 @@ class StreamSQL extends StreamDecoration<Feature> {
             return sink.mapToDouble(toDouble);
         }
     }
+
+    private static <I, O> Function<? super I, ? extends O> concatenate(final
Function<? super I, ? extends O> function, final Consumer<? super O> consumer)
{
+        return i -> {
+            final O o = function.apply(i);
+            consumer.accept(o);
+            return o;
+        };
+    }
 }
diff --git a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
index 7269f20..271bdb7 100644
--- a/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
+++ b/storage/sis-sqlstore/src/test/java/org/apache/sis/storage/sql/SQLStoreTest.java
@@ -16,26 +16,37 @@
  */
 package org.apache.sis.storage.sql;
 
-import java.util.Map;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
+
+import org.opengis.feature.AttributeType;
+import org.opengis.feature.Feature;
+import org.opengis.feature.FeatureAssociationRole;
+import org.opengis.feature.FeatureType;
+import org.opengis.feature.PropertyType;
+
+import org.apache.sis.storage.DataStoreException;
 import org.apache.sis.storage.FeatureSet;
 import org.apache.sis.storage.StorageConnector;
-import org.apache.sis.test.sql.TestDatabase;
 import org.apache.sis.test.TestCase;
+import org.apache.sis.test.sql.TestDatabase;
+
 import org.junit.Test;
 
-import static org.apache.sis.test.Assert.*;
+import static org.apache.sis.test.Assert.assertEquals;
+import static org.apache.sis.test.Assert.assertInstanceOf;
+import static org.apache.sis.test.Assert.assertNotEquals;
+import static org.apache.sis.test.Assert.assertNotNull;
+import static org.apache.sis.test.Assert.assertSame;
+import static org.apache.sis.test.Assert.assertTrue;
+import static org.apache.sis.test.Assert.fail;
 
 // Branch-dependent imports
-import org.opengis.feature.Feature;
-import org.opengis.feature.FeatureType;
-import org.opengis.feature.PropertyType;
-import org.opengis.feature.AttributeType;
-import org.opengis.feature.FeatureAssociationRole;
 
 
 /**
@@ -52,6 +63,13 @@ public final strictfp class SQLStoreTest extends TestCase {
      */
     private static final String SCHEMA = "features";
 
+    private static final int[] POPULATIONS = {
+            13622267,  // Tokyo,    2016.
+            2206488,   // Paris,    2017.
+            1704694,   // Montréal, 2016.
+            531902     // Québec,   2016.
+    };
+
     /**
      * Number of time that the each country has been seen while iterating over the cities.
      */
@@ -130,6 +148,10 @@ public final strictfp class SQLStoreTest extends TestCase {
                 try (Stream<Feature> features = cities.features(false)) {
                     features.forEach((f) -> verifyContent(f));
                 }
+
+                // Now, we'll check that overloaded stream operations are functionally stable,
even stacked.
+                verifyStreamOperations(cities);
+
             }
         }
         assertEquals(Integer.valueOf(2), countryCount.remove("CAN"));
@@ -139,6 +161,53 @@ public final strictfp class SQLStoreTest extends TestCase {
     }
 
     /**
+     * Checks that operations stacked on feature stream are well executed. This test focus
on mapping and peeking
+     * actions overloaded by sql streams. We'd like to test skip and limit operations too,
but ignore it for now,
+     * because ordering of results matters for such a test.
+     *
+     * @implNote Most of stream operations used here are meaningless. We just want to ensure
that the pipeline does not
+     * skip any operation.
+     *
+     * @param cities The feature set to read from. We expect a feature set containing all
cities defined for the test
+     *               class.
+     * @throws DataStoreException Let's propagate any error raised by input feature set.
+     */
+    private static void verifyStreamOperations(final FeatureSet cities) throws DataStoreException
{
+        try (Stream<Feature> features = cities.features(false)) {
+            final AtomicInteger peekCount = new AtomicInteger();
+            final AtomicInteger mapCount = new AtomicInteger();
+            final long populations = features.peek(f -> peekCount.incrementAndGet())
+                    .peek(f -> peekCount.incrementAndGet())
+                    .map(f -> {
+                        mapCount.incrementAndGet();
+                        return f;
+                    })
+                    .peek(f -> peekCount.incrementAndGet())
+                    .map(f -> {
+                        mapCount.incrementAndGet();
+                        return f;
+                    })
+                    .map(f -> f.getPropertyValue("population"))
+                    .mapToDouble(obj -> ((Number) obj).doubleValue())
+                    .peek(f -> peekCount.incrementAndGet())
+                    .peek(f -> peekCount.incrementAndGet())
+                    .boxed()
+                    .mapToDouble(d -> {mapCount.incrementAndGet(); return d;})
+                    .mapToObj(d -> {mapCount.incrementAndGet(); return d;})
+                    .mapToDouble(d -> {mapCount.incrementAndGet(); return d;})
+                    .map(d -> {mapCount.incrementAndGet(); return d;})
+                    .mapToLong(d -> (long) d)
+                    .sum();
+
+            long expectedPopulations = 0;
+            for (long pop : POPULATIONS) expectedPopulations += pop;
+            assertEquals("Overall population count via Stream pipeline", expectedPopulations,
populations);
+            assertEquals("Number of mapping (by element in the stream)", 24, mapCount.get());
+            assertEquals("Number of peeking (by element in the stream)", 20, peekCount.get());
+        }
+    }
+
+    /**
      * Verifies the result of analyzing the structure of the {@code "Cities"} table.
      */
     private static void verifyFeatureType(final FeatureType type, final String[] expectedNames,
final Object[] expectedTypes) {
@@ -178,7 +247,7 @@ public final strictfp class SQLStoreTest extends TestCase {
                 englishName = "Tōkyō";
                 country     = "JPN";
                 countryName = "日本";
-                population  = 13622267;         // In 2016.
+                population  = POPULATIONS[0];
                 parks       = new String[] {"Yoyogi-kōen", "Shinjuku Gyoen"};
                 break;
             }
@@ -186,7 +255,7 @@ public final strictfp class SQLStoreTest extends TestCase {
                 englishName = "Paris";
                 country     = "FRA";
                 countryName = "France";
-                population  = 2206488;          // In 2017.
+                population  = POPULATIONS[1];
                 parks       = new String[] {"Tuileries Garden", "Luxembourg Garden"};
                 break;
             }
@@ -194,7 +263,7 @@ public final strictfp class SQLStoreTest extends TestCase {
                 englishName = "Montreal";
                 country     = "CAN";
                 countryName = "Canada";
-                population  = 1704694;          // In 2016.
+                population  = POPULATIONS[2];
                 isCanada    = true;
                 parks       = new String[] {"Mount Royal"};
                 break;
@@ -203,7 +272,7 @@ public final strictfp class SQLStoreTest extends TestCase {
                 englishName = "Quebec";
                 country     = "CAN";
                 countryName = "Canada";
-                population  = 531902;           // In 2016.
+                population  = POPULATIONS[3];
                 isCanada    = true;
                 parks = new String[] {};
                 break;


Mime
View raw message