sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject [sis] 09/45: fix(SQLStore): fix connection auto-commit management for feature streaming.
Date Tue, 12 Nov 2019 16:44:36 GMT
This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/sql-store
in repository https://gitbox.apache.org/repos/asf/sis.git

commit 926af1a24482b5255de165f0dd0b491ca7bbbdb9
Author: Alexis Manin <amanin@apache.org>
AuthorDate: Wed Aug 28 18:15:05 2019 +0200

    fix(SQLStore): fix connection auto-commit management for feature streaming.
---
 .../apache/sis/internal/sql/feature/Features.java   | 15 +--------------
 .../apache/sis/internal/sql/feature/StreamSQL.java  | 21 ++++++++++++++++++---
 .../org/apache/sis/internal/sql/feature/Table.java  |  3 ++-
 3 files changed, 21 insertions(+), 18 deletions(-)

diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
index 0934bf7..a596e7b 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Features.java
@@ -57,7 +57,7 @@ import static org.apache.sis.util.ArgumentChecks.ensureNonNull;
  * @since   1.0
  * @module
  */
-final class Features implements Spliterator<Feature>, Runnable {
+final class Features implements Spliterator<Feature> {
     /**
      * An empty array of iterators, used when there is no dependency.
      */
@@ -531,19 +531,6 @@ final class Features implements Spliterator<Feature>, Runnable
{
     }
 
     /**
-     * Closes the (pooled) connection, including the statements of all dependencies.
-     * This is a handler to be invoked by {@link java.util.stream.Stream#close()}.
-     */
-    @Override
-    public void run() {
-        try {
-            close();
-        } catch (SQLException e) {
-            throw new BackingStoreException(e);
-        }
-    }
-
-    /**
      * Useful to customiez value retrieval on result sets. Example:
      * {@code
      * SQLBiFunction<ResultSet, Integer, Integer> get = ResultSet::getInt;
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
index f6d8c3d..7a295b2 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/StreamSQL.java
@@ -38,6 +38,8 @@ import java.util.stream.LongStream;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import javax.sql.DataSource;
+
 import org.opengis.feature.Feature;
 
 import org.apache.sis.internal.util.DoubleStreamDecoration;
@@ -163,23 +165,36 @@ class StreamSQL extends StreamDecoration<Feature> {
     @Override
     protected synchronized Stream<Feature> createDecoratedStream() {
         final AtomicReference<Connection> connectionRef = new AtomicReference<>();
-        Stream<Feature> featureStream = Stream.of(uncheck(() -> queryBuilder.parent.source.getConnection()))
+        Stream<Feature> featureStream = Stream.of(uncheck(this::connectNoAuto))
                 .map(Supplier::get)
                 .peek(connectionRef::set)
                 .flatMap(conn -> {
                     try {
                         final Features iter = queryBuilder.build(conn);
-                        return StreamSupport.stream(iter, parallel).onClose(iter);
+                        return StreamSupport.stream(iter, parallel);
                     } catch (SQLException | DataStoreException e) {
                         throw new BackingStoreException(e);
                     }
                 })
-                .onClose(() -> queryBuilder.parent.closeRef(connectionRef));
+                .onClose(() -> queryBuilder.parent.closeRef(connectionRef, true));
         if (peekAction != null) featureStream = featureStream.peek(peekAction);
         return featureStream;
     }
 
     /**
+     * Acquire a connection over {@link Table parent table} database, forcing
+     * {@link Connection#setAutoCommit(boolean) auto-commit} to false.
+     *
+     * @return A new connection to {@link Table parent table} database, with deactivated
auto-commit.
+     * @throws SQLException If we cannot create a new connection. See {@link DataSource#getConnection()}
for details.
+     */
+    private Connection connectNoAuto() throws SQLException {
+        final Connection conn = queryBuilder.parent.source.getConnection();
+        conn.setAutoCommit(false);
+        return conn;
+    }
+
+    /**
      * Transform a callable into supplier by catching any potential verified exception and
rethrowing it as a {@link BackingStoreException}.
      * @param generator The callable to use in a non-verified error context. Must not be
null.
      * @param <T> The return type of input callable.
diff --git a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
index 321561c..5a9c555 100644
--- a/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
+++ b/storage/sis-sqlstore/src/main/java/org/apache/sis/internal/sql/feature/Table.java
@@ -606,10 +606,11 @@ final class Table extends AbstractFeatureSet {
         return new StreamSQL(this);
     }
 
-    void closeRef(final AtomicReference<Connection> ref) {
+    void closeRef(final AtomicReference<Connection> ref, boolean forceRollback) {
         final Connection conn = ref.get();
         if (conn != null) {
             try {
+                if (forceRollback) conn.rollback();
                 conn.close();
             } catch (SQLException e) {
                 warning(e);


Mime
View raw message