sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject [sis] 01/01: feat(Storage): Add an extension of storage connector with fail-fast behaviors
Date Mon, 28 Sep 2020 14:25:55 GMT
This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/strict_storage_connector
in repository https://gitbox.apache.org/repos/asf/sis.git

commit dbeef9069be6fe6209969857c0fbc756b11a101e
Author: Alexis Manin <amanin@apache.org>
AuthorDate: Wed Apr 29 20:44:09 2020 +0200

    feat(Storage): Add an extension of storage connector with fail-fast behaviors
---
 .../apache/sis/storage/StrictStorageConnector.java | 238 +++++++++++++++++++++
 .../apache/sis/storage/StorageConnectorTest.java   |  68 +++++-
 .../sis/storage/StrictStorageConnectorTest.java    | 177 +++++++++++++++
 .../apache/sis/test/suite/StorageTestSuite.java    |   1 +
 4 files changed, 475 insertions(+), 9 deletions(-)

diff --git a/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java
b/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java
new file mode 100644
index 0000000..ea9975c
--- /dev/null
+++ b/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java
@@ -0,0 +1,238 @@
+package org.apache.sis.storage;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import javax.imageio.stream.ImageInputStream;
+import javax.sql.DataSource;
+import org.apache.sis.util.UnconvertibleObjectException;
+import org.apache.sis.util.collection.BackingStoreException;
+
+/**
+ * Extension of a storage connector providing strong encapsulation of "views". This allows
to:
+ * <ul>
+ *     <li>Adopt a <em>fail-fast</em> behavior in case storage view is
corrupted by a user</li>
+ *     <li>
+ *         Provide easier usage:
+ *         <ul>
+ *             <li>Initial mark/final rewind is performed internally, user do not need
to care about it.</li>
+ *             <li>Provide strongly typed operators, to guide user on how to use this
object.</li>
+ *         </ul>
+ *     </li>
+ * </ul>
+ * The purpose of this class is to be merged in StorageConnector once its principle has been
validated.
+ *
+ * <em>Guarantees</em>:
+ * <ul>
+ *     <li>This object is <em>not</em> concurrent, and ensure a <em>fail-fast</em>
behavior in such cases.</li>
+ *     <li>
+ *         useAs* methods will enforce following behavior:
+ *         <ul>
+ *             <li>If possible, rewind properly consumed storage view to its initial
state</li>
+ *             <li>If above statement is not possible, an error will be immediately
propagated, and the connector will be marked as closed.</li>
+ *         </ul>
+ *     </li>
+ * </ul>
+ *
+ * Typical usage:
+ * <ol>
+ *     <li>Check storage compatibility through `useAs*` methods</li>
+ *     <li>If the storage is compatible, commit our choice by locking a storage view,
closing the connector in the process.</li>
+ * </ol>
+ *
+ * Example:
+ * <pre>
+ * final Path file = Paths.get("path/to/file");
+ * try (var c = new StrictStorageConnector(new StorageConnector(file)) {
+ *
+ *   // Use connector automatically reset buffering to check support
+ *   Boolean isSupported = c.useAsBuffer((buffer) -%gt; buffer.get() == SEARCHED_KEY);
+ *
+ *   // Once support is validated, acquire real storage connection. At this point,
+ *   // storage life cycle becomes the responsability of the caller, allowing it
+ *   // to survive beyond the connector scope.
+ *   if (supported) {
+ *     try ( InputStream stream = c.commit( InputStream.class ) ) {
+ *         // read all needed data from acquired stream
+ *     }
+ *   }
+ * }
+ * </pre>
+ */
+public class StrictStorageConnector implements AutoCloseable {
+
+    private final StorageConnector storage;
+
+    private Object committedStorage;
+    private volatile int concurrentFlag;
+
+    public StrictStorageConnector(StorageConnector storage) {
+        this.storage = storage;
+    }
+
+    public void closeAllExcept(Object view) throws DataStoreException {
+        // Closing multiple times is OK. However, if the view is not null, we will let control
raise an error.
+        if (concurrentFlag < 0 && view == null) return;
+        try {
+            doUnderControl(() -> {
+                concurrentFlag = -1;
+                storage.closeAllExcept(view);
+                committedStorage = view;
+                return null;
+            });
+        } catch (IOException e) {
+            throw new DataStoreException(e);
+        }
+    }
+
+    /**
+     * Provides an in-memory byte buffer containing first bytes of the source storage.
+     * To know how many bytes are available, refer to the buffer {@link ByteBuffer#remaining()
remaining byte count}.
+     * User <em>do not</em> need to rewind buffer after use. It is the storage
connector responsability.
+     *
+     * @param operator User operation to perform against preovided buffer.
+     * @param <T>
+     * @return
+     * @throws DataStoreException
+     * @throws IOException
+     */
+    public <T> T useAsBuffer(StorageOperatingFunction<ByteBuffer, T> operator)
throws DataStoreException, IOException {
+        return doUnderControl(() -> {
+            final ByteBuffer buffer = getOrFail(ByteBuffer.class);
+            try ( Closeable rewindOnceDone = buffer::rewind ) {
+                return operator.apply(buffer);
+            }
+        });
+    }
+
+    public <T> T useAsImageInputStream(StorageOperatingFunction<ImageInputStream,
T> operator) throws IOException, DataStoreException {
+        return doUnderControl(() -> {
+            ImageInputStream stream = getOrFail(ImageInputStream.class);
+            final long positionCtrl = stream.getStreamPosition();
+            stream.mark();
+            T result;
+            try ( Closeable rewindOnceDone = stream::reset ) {
+                result = operator.apply(stream);
+            }
+            if (stream.getStreamPosition() != positionCtrl) {
+                concurrentFlag = -1; // mark this connector as closed/not valid anymore
+                throw new DataStoreException("Operator has messed with stream marks");
+            }
+            return result;
+        });
+    }
+
+    /**
+     * Temporarily expose storage through queried interface/class to be used by a user defined
operator.
+     * Note that provided storage is checked after use, to ensure it has not been corrupted
by input operator.
+     *
+     * @param storageType Storage access interface to provide to the operator.
+     * @param operator The operator that will access storage to compute a result.
+     * @param <S> Storage class
+     * @param <T> Type of result computed by user operator.
+     * @return The value computed by user operator.
+     * @throws IOException If given operator throws IOException on execution.
+     * @throws UnsupportedStorageException If queried storage type cannot be accessed in
current context.
+     * @throws DataStoreException If an error occurs while fetching queried storage.
+     */
+    public <S, T> T useAs(Class<T> storageType, StorageOperatingFunction<S,
T> operator) throws IOException, DataStoreException {
+        if (ByteBuffer.class.isAssignableFrom(storageType)) return useAsBuffer((StorageOperatingFunction<ByteBuffer,
T>) operator);
+        else if (ImageInputStream.class.isAssignableFrom(storageType)) return useAsImageInputStream((StorageOperatingFunction<ImageInputStream,
T>) operator);
+        else if (URI.class.isAssignableFrom(storageType)) return ((StorageOperatingFunction<URI,
T>) operator).apply(getURI().orElseThrow(() -> new UnsupportedStorageException("Cannot
acquire an URI")));
+        else if (Path.class.isAssignableFrom(storageType)) return ((StorageOperatingFunction<Path,
T>) operator).apply(getPath().orElseThrow(() -> new UnsupportedStorageException("Cannot
acquire a path")));
+        else if (File.class.isAssignableFrom(storageType)) return ((StorageOperatingFunction<File,
T>) operator).apply(getPath().map(p -> p.toFile()).orElseThrow(() -> new UnsupportedStorageException("Cannot
acquire a file")));
+        else throw new UnsupportedStorageException("Queried storage type is not supported
yet: "+storageType);
+    }
+
+    /**
+     * Ensure only one storage operation is running at any time against this storage connector.
It allows fail-fast
+     * behavior if this connector is used in concurrent context.
+     *
+     * @param operator The operation to perform once we checked no other operation is running.
+     *
+     * @param <V> Type of result value produced by given operator.
+     * @return The result produced by given operator.
+     * @throws IOException If anything wrong happens while input operator consumes storage,
or we can mark/rewind storage.
+     * @throws DataStoreException Same reasons as for IOException + can happen if queried
storage is of unsupported type.
+     * @throws IllegalStateException If this connector is already closed.
+     */
+    protected <V> V doUnderControl(StorageCallable<V> operator) throws IOException,
DataStoreException {
+        if (concurrentFlag < 0) throw new IllegalStateException("...");
+        if (concurrentFlag != 0) throw new ConcurrentReadException("...");
+        concurrentFlag++;
+        try {
+            return operator.call();
+        } finally {
+            concurrentFlag--;
+        }
+    }
+
+    public Optional<Path> getPath() { return getSilently(Path.class); }
+
+    public Optional<URI> getURI() { return getSilently(URI.class); }
+
+    public Optional<DataSource> getSQLDatasource() { return getSilently(DataSource.class);
}
+
+    public Optional<String> getPathAsString() { return getSilently(String.class); }
+
+    /**
+     * Retrieve storage in the queried form, closing all other opened view in the same time.
+     * <em>Warning</em>: This method also closes this storage connector, making
invalid any more calls on it.
+     *
+     * @param target Type of the view to get back / keep opened.
+     * @param <T> Type of the wanted storage connection.
+     * @return Underlying storage in the requested form. Never null.
+     *
+     * @throws IOException If anything goes wrong while initializing storage access.
+     * @throws DataStoreException If this connector is used concurrently, or if any problem
occurs while initializing view.
+     * @throws IllegalStateException If this connector is already closed.
+     */
+    public <T> T commit(Class<T> target) throws IOException, DataStoreException
{
+        return doUnderControl(() -> {
+            final T result = getOrFail(target);
+            concurrentFlag = -1; //close flag
+            storage.closeAllExcept(result);
+            return result;
+        });
+    }
+
+    private <T> T getOrFail(Class<T> target) throws DataStoreException {
+        T view = storage.getStorageAs(target);
+        if (view == null) throw new UnsupportedStorageException();
+        return view;
+    }
+
+    private <T> Optional<T> getSilently(Class<T> target) {
+        try {
+            return Optional.ofNullable(storage.getStorageAs(target));
+        } catch (UnconvertibleObjectException e) {
+            // TODO: log fine
+            return Optional.empty();
+        } catch (DataStoreException e) {
+            // According to current implementation, that should never happen.
+            // Moreover, it is not really logic to propagate DataStoreException, as this
operation should not involve
+            // any "storage" logic (only in-memory path/uri conversion if needed).
+            throw new BackingStoreException(e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException, DataStoreException {
+        storage.closeAllExcept(committedStorage);
+    }
+
+    private interface StorageCallable<V> extends Callable<V> {
+        @Override
+        V call() throws IOException, DataStoreException;
+    }
+
+    @FunctionalInterface
+    public interface StorageOperatingFunction<I, O> {
+        O apply(I storage) throws IOException, DataStoreException;
+    }
+}
diff --git a/storage/sis-storage/src/test/java/org/apache/sis/storage/StorageConnectorTest.java
b/storage/sis-storage/src/test/java/org/apache/sis/storage/StorageConnectorTest.java
index a7d33ea..381bec8 100644
--- a/storage/sis-storage/src/test/java/org/apache/sis/storage/StorageConnectorTest.java
+++ b/storage/sis-storage/src/test/java/org/apache/sis/storage/StorageConnectorTest.java
@@ -16,29 +16,40 @@
  */
 package org.apache.sis.storage;
 
-import java.net.URI;
+import java.io.ByteArrayInputStream;
 import java.io.DataInput;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
-import java.io.IOException;
+import java.net.URI;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.nio.channels.ReadableByteChannel;
-import javax.imageio.stream.ImageInputStream;
-import javax.imageio.ImageIO;
+import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
-import org.apache.sis.setup.OptionKey;
-import org.apache.sis.util.UnconvertibleObjectException;
+import java.util.Random;
+import javax.imageio.ImageIO;
+import javax.imageio.stream.ImageInputStream;
 import org.apache.sis.internal.storage.io.ChannelDataInput;
 import org.apache.sis.internal.storage.io.ChannelImageInputStream;
 import org.apache.sis.internal.storage.io.InputStreamAdapter;
-import org.apache.sis.test.DependsOnMethod;
+import org.apache.sis.setup.OptionKey;
 import org.apache.sis.test.DependsOn;
+import org.apache.sis.test.DependsOnMethod;
 import org.apache.sis.test.TestCase;
+import org.apache.sis.util.UnconvertibleObjectException;
 import org.junit.Test;
 
 import static org.junit.Assume.assumeTrue;
-import static org.opengis.test.Assert.*;
+import static org.opengis.test.Assert.assertArrayEquals;
+import static org.opengis.test.Assert.assertEquals;
+import static org.opengis.test.Assert.assertFalse;
+import static org.opengis.test.Assert.assertInstanceOf;
+import static org.opengis.test.Assert.assertNotNull;
+import static org.opengis.test.Assert.assertNotSame;
+import static org.opengis.test.Assert.assertNull;
+import static org.opengis.test.Assert.assertSame;
+import static org.opengis.test.Assert.assertTrue;
+import static org.opengis.test.Assert.fail;
 
 
 /**
@@ -390,4 +401,43 @@ public final strictfp class StorageConnectorTest extends TestCase {
         assertTrue("channel.isOpen()", channel.isOpen());
         channel.close();
     }
+
+    @Test
+    public void getting_buffer_should_not_change_underlying_stream_position() throws Exception
{
+        final StorageConnector con = create(false);
+
+        final ImageInputStream stream = con.getStorageAs(ImageInputStream.class);
+
+        final ByteBuffer buffer = con.getStorageAs(ByteBuffer.class);
+        assertEquals(0, stream.getStreamPosition());
+
+        buffer.get();
+        assertEquals(0, stream.getStreamPosition());
+
+        con.closeAllExcept(null);
+    }
+
+    @Test
+    public void moving_stream_should_not_impact_buffer() throws Exception {
+        final byte[] data = new byte[(int) Math.pow(2, 16)];
+        new Random().nextBytes(data);
+        final StorageConnector connector = new StorageConnector(new ByteArrayInputStream(data));
+        final ByteBuffer buffer = connector.getStorageAs(ByteBuffer.class);
+        final int blockSize = buffer.remaining();
+        final byte[] ctrl = new byte[blockSize];
+        buffer.get(ctrl).rewind();
+
+        InputStream stream = connector.getStorageAs(InputStream.class);
+        stream.mark(blockSize *2);
+        stream.skip(blockSize);
+        stream.read(new byte[blockSize]);
+        stream.reset();
+
+        final byte[] afterValue = new byte[blockSize];
+        buffer.get(afterValue).rewind();
+
+        assertArrayEquals(ctrl, afterValue);
+
+        connector.closeAllExcept(null);
+    }
 }
diff --git a/storage/sis-storage/src/test/java/org/apache/sis/storage/StrictStorageConnectorTest.java
b/storage/sis-storage/src/test/java/org/apache/sis/storage/StrictStorageConnectorTest.java
new file mode 100644
index 0000000..40110a4
--- /dev/null
+++ b/storage/sis-storage/src/test/java/org/apache/sis/storage/StrictStorageConnectorTest.java
@@ -0,0 +1,177 @@
+package org.apache.sis.storage;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.apache.sis.setup.OptionKey;
+import org.apache.sis.test.DependsOn;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@DependsOn(org.apache.sis.storage.StorageConnectorTest.class)
+public class StrictStorageConnectorTest {
+    /**
+     * Name of the test file, in the same directory than this {@code StorageConnectorTest}
file.
+     */
+    private static final String FILENAME = "Any.txt";
+
+    /**
+     * Creates the instance to test. This method uses the {@code "test.txt"} ASCII file as
+     * the resource to test. The resource can be provided either as a URL or as a stream.
+     */
+    private static StrictStorageConnector create(final boolean asStream) {
+        final Class<?> c = StorageConnectorTest.class;
+        final Object storage = asStream ? c.getResourceAsStream(FILENAME) : c.getResource(FILENAME);
+        assertNotNull(storage);
+        final StorageConnector unsafeConnector = new StorageConnector(storage);
+        unsafeConnector.setOption(OptionKey.ENCODING, StandardCharsets.US_ASCII);
+        unsafeConnector.setOption(OptionKey.URL_ENCODING, "UTF-8");
+        final StrictStorageConnector connector = new StrictStorageConnector(unsafeConnector);
+        return connector;
+    }
+
+    private static byte[] getFileBytes() throws URISyntaxException, IOException {
+        final Path filePath = Paths.get(StorageConnector.class.getResource(FILENAME).toURI());
+        return Files.readAllBytes(filePath);
+    }
+
+    @Test
+    public void acquiring_path_works() {
+        final StrictStorageConnector connector = create(false);
+        assertTrue(connector.getPath().isPresent());
+        assertTrue(connector.getURI().isPresent());
+        assertTrue(connector.getPathAsString().isPresent());
+        assertFalse(connector.getSQLDatasource().isPresent());
+    }
+
+    @Test
+    public void stream_based_connector_return_empty_path() {
+        final StrictStorageConnector connector = create(true);
+        assertFalse(connector.getPath().isPresent());
+        assertFalse(connector.getURI().isPresent());
+        assertFalse(connector.getPathAsString().isPresent());
+        assertFalse(connector.getSQLDatasource().isPresent());
+    }
+
+    @Test
+    public void byte_buffer_is_rewind_after_use() throws Exception {
+        final byte[] ctrl = getFileBytes();
+        try (final StrictStorageConnector connector = create(false)) {
+            // Mess with internal buffer
+            connector.useAsBuffer(buffer -> {
+                // mess with it
+                return buffer.get(new byte[10]);
+            });
+            // ensure it has been properly rewind
+            connector.useAsBuffer(buffer -> {
+                assertEquals(0, buffer.position());
+                byte[] readValue = new byte[buffer.remaining()];
+                buffer.get(readValue);
+                assertArrayEquals(ctrl, readValue);
+                return null;
+            });
+        }
+    }
+
+    @Test
+    public void fail_fast_when_user_corrupts_stream_mark() throws IOException, DataStoreException
{
+        try (final StrictStorageConnector c = create(false)) {
+            try {
+                c.useAsImageInputStream(stream -> {
+                    stream.skipBytes(1);
+                    stream.mark();
+                    return 0;
+                });
+                fail("We should have detected something has gone wrong");
+            } catch (DataStoreException e) {
+                // Expected behavior: connector has detected that rewind did not work properly.
+            }
+        }
+    }
+
+    @Test
+    public void no_concurrency_allowed() throws Exception {
+        try (final StrictStorageConnector c = create(false)) {
+            synchronized (c) {
+            new Thread(() -> {
+                try {
+                    c.useAsBuffer(buffer -> {
+                        synchronized (c) {
+                            try {
+                                c.notifyAll();
+                                c.wait(1000);
+                            } catch (InterruptedException e) {
+                                // Do not matter here.
+                            }
+                        }
+                        return null;
+                    });
+
+                } catch (IOException | DataStoreException e) {
+                    // Do not matter here.
+                }
+            }).start();
+
+            // Ensure above operation is
+                c.wait(100);
+            }
+
+            try {
+                c.useAsBuffer(buffer -> null);
+                fail("Concurrency error should have been raised.");
+            } catch (ConcurrentReadException e) {
+                // Expected behavior: fail-fast to prevent concurrency.
+            }
+            synchronized (c) {
+                c.notifyAll();
+            }
+        }
+    }
+
+    @Test
+    public void commit_close_all_resources_but_chosen() throws Exception {
+        final InputStream is;
+        try (final StrictStorageConnector c = create(false)) {
+
+            is = c.commit(InputStream.class);
+
+            try {
+                c.useAsBuffer(buffer -> null);
+                fail("connector should be closed");
+            } catch (IllegalStateException e) {
+                // Expected behavior
+                try {
+                    is.read();
+                } catch (IOException bis) {
+                    fail("We queried for the input stream to stay open.");
+                }
+            }
+        }
+
+        try ( final InputStream close = is ) {
+            is.read();
+        } catch (IOException e) {
+            fail("Committed storage view should still be opened.");
+        }
+    }
+
+    @Test
+    public void closing_multiple_times_causes_no_error() throws Exception {
+        try ( StrictStorageConnector c = create(true) ) {
+
+            c.commit(InputStream.class);
+            c.closeAllExcept(null);
+        }
+    }
+}
diff --git a/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java
b/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java
index b89e232..3d50486 100644
--- a/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java
+++ b/storage/sis-storage/src/test/java/org/apache/sis/test/suite/StorageTestSuite.java
@@ -44,6 +44,7 @@ import org.junit.BeforeClass;
     org.apache.sis.storage.FeatureNamingTest.class,
     org.apache.sis.storage.ProbeResultTest.class,
     org.apache.sis.storage.StorageConnectorTest.class,
+    org.apache.sis.storage.StrictStorageConnectorTest.class,
     org.apache.sis.storage.event.StoreListenersTest.class,
     org.apache.sis.internal.storage.query.CoverageQueryTest.class,
     org.apache.sis.internal.storage.query.SimpleQueryTest.class,


Mime
View raw message