sis-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From desruisse...@apache.org
Subject [sis] 02/04: Implement parallel (multi-threaded) computation of isolines.
Date Fri, 22 Jan 2021 23:43:04 GMT
This is an automated email from the ASF dual-hosted git repository.

desruisseaux pushed a commit to branch geoapi-4.0
in repository https://gitbox.apache.org/repos/asf/sis.git

commit e11eee17700132903d380cc71bd58aa5598a877e
Author: Martin Desruisseaux <martin.desruisseaux@geomatys.com>
AuthorDate: Fri Jan 22 22:55:02 2021 +0100

    Implement parallel (multi-threaded) computation of isolines.
---
 .../sis/internal/feature/j2d/PathBuilder.java      |  22 +-
 .../internal/processing/image/CompoundFuture.java  | 202 +++++++++++
 .../internal/processing/image/IsolineTracer.java   | 102 +++++-
 .../sis/internal/processing/image/Isolines.java    | 191 ++++++++--
 .../internal/processing/image/TiledProcess.java    | 394 +++++++++++++++++++++
 5 files changed, 869 insertions(+), 42 deletions(-)

diff --git a/core/sis-feature/src/main/java/org/apache/sis/internal/feature/j2d/PathBuilder.java b/core/sis-feature/src/main/java/org/apache/sis/internal/feature/j2d/PathBuilder.java
index faf54a8..a971f92 100644
--- a/core/sis-feature/src/main/java/org/apache/sis/internal/feature/j2d/PathBuilder.java
+++ b/core/sis-feature/src/main/java/org/apache/sis/internal/feature/j2d/PathBuilder.java
@@ -65,11 +65,25 @@ public class PathBuilder {
      * Verifies that {@link #size} is even, positive and smaller than the given limit.
      * This method is used for assertions.
      */
-    private boolean validSize(final int limit) {
+    private boolean isValidSize(final int limit) {
         return size >= 0 && size <= limit && (size & 1) == 0;
     }
 
     /**
+     * Adds all polylines defined in the other builder. The other builder shall have no polylines under
+     * construction, i.e. {@link #append(double[], int, boolean)} shall not have been invoked since last
+     * {@link #createPolyline(boolean)} invocation.
+     *
+     * @param  other  the other builder for which to add polylines, or {@code null} if none.
+     */
+    public final void append(final PathBuilder other) {
+        if (other != null) {
+            assert other.size == 0;
+            polylines.addAll(other.polylines);
+        }
+    }
+
+    /**
      * Appends the given coordinates to current polyline, omitting repetitive points.
      * Coordinates are added to the same polyline than the one updated by previous calls
      * to this method, unless {@link #createPolyline(boolean)} has been invoked before.
@@ -115,7 +129,7 @@ public class PathBuilder {
                 if (Double.isNaN(x) || Double.isNaN(y)) {
                     if (offset != 0) {
                         size = filterChunk(coordinates, size, offset);
-                        assert validSize(offset) : size;
+                        assert isValidSize(offset) : size;
                         createPolyline(false);
                         offset = 0;
                     }
@@ -128,7 +142,7 @@ public class PathBuilder {
             }
         }
         size = filterChunk(coordinates, size, offset);
-        assert validSize(offset) : size;
+        assert isValidSize(offset) : size;
     }
 
     /**
@@ -184,7 +198,7 @@ public class PathBuilder {
      */
     public final void createPolyline(boolean close) throws TransformException {
         size = filterFull(coordinates, size);
-        assert validSize(coordinates.length) : size;
+        assert isValidSize(coordinates.length) : size;
         /*
          * If the point would be alone, discard the lonely point because it would be invisible
          * (a "move to" operation without "line to"). If there is two points, they should not
diff --git a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/CompoundFuture.java b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/CompoundFuture.java
new file mode 100644
index 0000000..c757108
--- /dev/null
+++ b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/CompoundFuture.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sis.internal.processing.image;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CancellationException;
+import org.apache.sis.internal.util.CollectionsExt;
+import org.apache.sis.internal.feature.Resources;
+
+
+/**
+ * The result of multiple asynchronous computations.
+ * This {@code Future} is considered completed when all components are completed.
+ *
+ * @author  Martin Desruisseaux (Geomatys)
+ * @version 1.1
+ * @since   1.1
+ * @module
+ */
+final class CompoundFuture<R> implements Future<R> {
+    /**
+     * The elements making this computation.
+     */
+    private final Future<R>[] components;
+
+    /**
+     * Creates a new future with the given components.
+     */
+    private CompoundFuture(final Future<R>[] components) {
+        this.components = components;
+    }
+
+    /**
+     * Returns a future waiting for all given tasks to complete.
+     * If the array length is 1, then this method returns directly its singleton element.
+     *
+     * @param  <R>         type if result computed by tasks.
+     * @param  components  the sub-tasks to execute. This array is not cloned; do not modify.
+     * @return a future containing all given tasks.
+     */
+    public static <R> Future<R> create(final Future<R>[] components) {
+        switch (components.length) {
+            case 0: return null;
+            case 1: return components[0];
+        }
+        return new CompoundFuture<>(components);
+    }
+
+    /**
+     * Attempts to cancel execution of this task. After this method return, subsequent calls
+     * to {@link #isCancelled()} return {@code true} if this method returned {@code true}.
+     *
+     * <h4>Departure from specification</h4>
+     * {@code Future} specification requires that after this method returns, subsequent calls
+     * to {@link #isDone()} return {@code true}. This is not guaranteed in this implementation.
+     *
+     * @param  mayInterruptIfRunning  whether the thread executing tasks should be interrupted.
+     * @return {@code true} if at least one component task could be interrupted.
+     */
+    @Override
+    public boolean cancel(final boolean mayInterruptIfRunning) {
+        boolean canceled = false;
+        for (final Future<R> c : components) {
+            canceled |= c.cancel(mayInterruptIfRunning);
+        }
+        return canceled;
+    }
+
+    /**
+     * Returns {@code true} if this task was cancelled before it completed normally.
+     * This task is considered cancelled if at least one component has been cancelled.
+     *
+     * @return {@code true} if at least one component task was cancelled before it completed.
+     */
+    @Override
+    public boolean isCancelled() {
+        for (final Future<R> c : components) {
+            if (c.isCancelled()) return true;
+        }
+        return false;
+    }
+
+    /**
+     * Returns {@code true} if this task completed.
+     * Completion may be due to normal termination, an exception, or cancellation.
+     *
+     * @return {@code true} if all component tasks completed.
+     */
+    @Override
+    public boolean isDone() {
+        for (final Future<R> c : components) {
+            if (!c.isDone()) return false;
+        }
+        return true;
+    }
+
+    /**
+     * Waits if necessary for all computations to complete, and then retrieves the result.
+     * If all task components return either {@code null} or the same {@code <R>} value,
+     * then that result is returned. Otherwise the various {@code <R>} values are given
+     * to {@link #merge(Collection)} for obtaining a single result.
+     *
+     * @return the computed result.
+     * @throws CancellationException if at least one computation was cancelled.
+     * @throws ExecutionException if at least one computation threw an exception.
+     * @throws InterruptedException if the current thread was interrupted while waiting.
+     */
+    @Override
+    public R get() throws InterruptedException, ExecutionException {
+        try {
+            return get(0, true);
+        } catch (TimeoutException e) {
+            // Should never happen because we specified `noTimeOut = true`
+            throw new AssertionError(e);
+        }
+    }
+
+    /**
+     * Same as {@link #get()} but with a timeout. The given timeout is the total timeout;
+     * each component task may have a smaller timeout for keeping the total equal to the
+     * given value.
+     *
+     * @param  timeout  the maximum time to wait.
+     * @param  unit     the time unit of the timeout argument.
+     * @throws CancellationException if at least one computation was cancelled.
+     * @throws ExecutionException if at least one computation threw an exception.
+     * @throws InterruptedException if the current thread was interrupted while waiting.
+     * @throws TimeoutException if the wait timed out.
+     */
+    @Override
+    public R get(final long timeout, final TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException
+    {
+        return get(System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, unit), false);
+    }
+
+    /**
+     * Implementation of public {@code get(…)} methods.
+     * The timeout given to this method, if not ignored, is an absolute timeout.
+     *
+     * @param  timeout    {@link System#nanoTime()} value when to stop waiting.
+     * @param  noTimeOut  {@code true} if {@code timeout} should be ignored.
+     */
+    private R get(final long timeout, final boolean noTimeOut)
+            throws InterruptedException, ExecutionException, TimeoutException
+    {
+        R singleton = null;
+        Set<R> results = null;
+        for (final Future<R> c : components) {
+            final R r = noTimeOut ? c.get() : c.get(Math.max(0, timeout - System.nanoTime()), TimeUnit.NANOSECONDS);
+            if (r != null) {
+                if (singleton == null) {
+                    singleton = r;
+                } else if (r != singleton) {
+                    if (results == null) {
+                        results = new HashSet<>();
+                        results.add(singleton);
+                    }
+                    results.add(r);
+                }
+            }
+        }
+        if (results != null) {
+            singleton = merge(results);
+        }
+        return singleton;
+    }
+
+    /**
+     * Invoked by {@code get(…)} if there is more than one non-null instance.
+     * The default implementation throws an exception.
+     *
+     * @param  results  all non-null instances found.
+     * @return the unique instance to return.
+     */
+    protected R merge(final Set<R> results) {
+        final R singleton = CollectionsExt.singletonOrNull(results);
+        if (singleton != null) {
+            return singleton;
+        }
+        throw new IllegalStateException(Resources.format(Resources.Keys.NotASingleton_1, "get()"));
+    }
+}
diff --git a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/IsolineTracer.java b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/IsolineTracer.java
index 5545965..246d8a5 100644
--- a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/IsolineTracer.java
+++ b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/IsolineTracer.java
@@ -17,11 +17,13 @@
 package org.apache.sis.internal.processing.image;
 
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;
-import java.util.ArrayList;
+import java.util.IdentityHashMap;
 import java.util.Collections;
 import java.awt.Point;
+import java.awt.Rectangle;
 import java.awt.Shape;
 import org.opengis.referencing.operation.MathTransform;
 import org.opengis.referencing.operation.TransformException;
@@ -77,7 +79,15 @@ final class IsolineTracer {
     int y;
 
     /**
+     * Translation to apply on coordinates. For isolines computed sequentially, this is the image origin
+     * (often 0,0 but not necessarily). For isolines computed in parallel, the translations are different
+     * for each computation tile.
+     */
+    private final double translateX, translateY;
+
+    /**
      * Final transform to apply on coordinates (integer source coordinates at pixel centers).
+     * Can be {@code null} if none.
      */
     private final MathTransform gridToCRS;
 
@@ -86,11 +96,14 @@ final class IsolineTracer {
      *
      * @param  window       the 2×2 window containing pixel values in the 4 corners of current contouring grid cell.
      * @param  pixelStride  increment to the position in {@code window} for reading next sample value.
+     * @param  domain       pixel coordinates where iteration will happen.
      * @param  gridToCRS    final transform to apply on coordinates (integer source coordinates at pixel centers).
      */
-    IsolineTracer(final double[] window, final int pixelStride, final MathTransform gridToCRS) {
+    IsolineTracer(final double[] window, final int pixelStride, final Rectangle domain, final MathTransform gridToCRS) {
         this.window      = window;
         this.pixelStride = pixelStride;
+        this.translateX  = domain.x;
+        this.translateY  = domain.y;
         this.gridToCRS   = gridToCRS;
     }
 
@@ -428,7 +441,8 @@ final class IsolineTracer {
          */
         private void interpolateMissingLeftSide() {
             if (polylineOnLeft.size == 0) {
-                polylineOnLeft.append(x, y + interpolate(0, 2*pixelStride));
+                polylineOnLeft.append(translateX + (x),
+                                      translateY + (y + interpolate(0, 2*pixelStride)));
             }
         }
 
@@ -446,7 +460,8 @@ final class IsolineTracer {
          * Appends to the given polyline a point interpolated on the top side.
          */
         private void interpolateOnTopSide(final Polyline appendTo) {
-            appendTo.append(x + interpolate(0, pixelStride), y);
+            appendTo.append(translateX + (x + interpolate(0, pixelStride)),
+                            translateY + (y));
         }
 
         /**
@@ -454,7 +469,8 @@ final class IsolineTracer {
          * The polyline on right side will become {@code polylineOnLeft} in next column.
          */
         private void interpolateOnRightSide() {
-            polylineOnLeft.append(x + 1, y + interpolate(pixelStride, 3*pixelStride));
+            polylineOnLeft.append(translateX + (x + 1),
+                                  translateY + (y + interpolate(pixelStride, 3*pixelStride)));
         }
 
         /**
@@ -462,7 +478,8 @@ final class IsolineTracer {
          * The polyline on top side will become a {@code polylineOnBottoù} in next row.
          */
         private void interpolateOnBottomSide(final Polyline polylineOnTop) {
-            polylineOnTop.append(x + interpolate(2*pixelStride, 3*pixelStride), y + 1);
+            polylineOnTop.append(translateX + (x + interpolate(2*pixelStride, 3*pixelStride)),
+                                 translateY + (y + 1));
         }
 
         /**
@@ -546,14 +563,19 @@ final class IsolineTracer {
         private void writeUnclosed(final Polyline polyline) throws TransformException {
             final Unclosed fragment = new Unclosed(polyline, null);
             final Polyline[] polylines;
+            final boolean close;
             if (fragment.isEmpty()) {
+                close = false;
                 polylines = new Polyline[] {polyline.opposite, polyline};       // (reverse, forward) point order.
-            } else if (fragment.addOrMerge(partialPaths)) {
-                polylines = fragment.toPolylines();
             } else {
-                return;
+                close = fragment.addOrMerge(partialPaths);
+                if (!close) {
+                    // Keep in `partialPaths`. Maybe it can be closed later.
+                    return;
+                }
+                polylines = fragment.toPolylines();
             }
-            path = writeTo(path, polylines, false);
+            path = writeTo(path, polylines, close);
         }
 
         /**
@@ -570,7 +592,7 @@ final class IsolineTracer {
 
         /**
          * Invoked after the iteration has been completed on the full area of interest.
-         * This method writes all remaining polylines to {@link #path} or {@link #partialPaths}.
+         * This method writes all remaining polylines to {@link #partialPaths}.
          * It assumes that {@link #finishedRow()} has already been invoked.
          * This {@code Isoline} can not be used anymore after this call.
          */
@@ -586,6 +608,58 @@ final class IsolineTracer {
                 writeUnclosed(polylinesOnTop[i]);
                 polylinesOnTop[i] = null;
             }
+            assert isConsistent();
+        }
+
+        /**
+         * Verifies that {@link #partialPaths} consistency. Used for assertions only.
+         */
+        private boolean isConsistent() {
+            for (final Map.Entry<Point,Unclosed> entry : partialPaths.entrySet()) {
+                if (!entry.getValue().isExtremity(entry.getKey())) return false;
+            }
+            return true;
+        }
+
+        /**
+         * Transfers all {@code other} polylines into this instance. The {@code other} instance should be a neighbor,
+         * i.e. an instance sharing a border with this instance. The {@code other} instance will become empty after
+         * this method call.
+         *
+         * @param  other  a neighbor level (on top, left, right or bottom) to merge with this level.
+         * @throws TransformException if an error occurred during polylines creation.
+         */
+        final void merge(final Level other) throws TransformException {
+            assert other != this && other.value == value;
+            if (path == null) {
+                path = other.path;
+            } else {
+                path.append(other.path);
+            }
+            other.path = null;
+            assert  this.isConsistent();
+            assert other.isConsistent();
+            final IdentityHashMap<Unclosed,Boolean> done = new IdentityHashMap<>(other.partialPaths.size() / 2);
+            for (final Map.Entry<Point,Unclosed> entry : other.partialPaths.entrySet()) {
+                final Unclosed fragment = entry.getValue();
+                if (done.put(fragment, Boolean.TRUE) == null) {
+                    assert fragment.isExtremity(entry.getKey());
+                    if (fragment.addOrMerge(partialPaths)) {
+                        path = writeTo(path, fragment.toPolylines(), true);
+                        fragment.clear();
+                    }
+                }
+            }
+        }
+
+        /**
+         * Flushes any pending {@link #partialPaths} to {@link #path}. This method is invoked after
+         * {@link #finish()} has been invoked for all sub-regions (many sub-regions may exist if
+         * isoline generation has been splitted for parallel computation).
+         *
+         * @throws TransformException if an error occurred during polylines creation.
+         */
+        final void flush() throws TransformException {
             for (final Map.Entry<Point,Unclosed> entry : partialPaths.entrySet()) {
                 final Unclosed fragment = entry.getValue();
                 assert fragment.isExtremity(entry.getKey());
@@ -1051,7 +1125,7 @@ final class IsolineTracer {
      */
     private static final class Joiner extends PathBuilder {
         /**
-         * Final transform to apply on coordinates.
+         * Final transform to apply on coordinates, or {@code null} if none.
          */
         private final MathTransform gridToCRS;
 
@@ -1142,7 +1216,9 @@ final class IsolineTracer {
          */
         @Override
         protected int filterFull(final double[] coordinates, final int upper) throws TransformException {
-            gridToCRS.transform(coordinates, 0, coordinates, 0, upper / Polyline.DIMENSION);
+            if (gridToCRS != null) {
+                gridToCRS.transform(coordinates, 0, coordinates, 0, upper / Polyline.DIMENSION);
+            }
             return upper;
         }
     }
diff --git a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/Isolines.java b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/Isolines.java
index 3be55e0..4f0a060 100644
--- a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/Isolines.java
+++ b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/Isolines.java
@@ -19,13 +19,13 @@ package org.apache.sis.internal.processing.image;
 import java.util.Arrays;
 import java.util.TreeMap;
 import java.util.NavigableMap;
+import java.util.concurrent.Future;
 import java.awt.Shape;
 import java.awt.geom.Path2D;
 import java.awt.image.RenderedImage;
 import org.opengis.coverage.grid.SequenceType;
 import org.opengis.referencing.operation.MathTransform;
 import org.opengis.referencing.operation.TransformException;
-import org.apache.sis.referencing.operation.transform.MathTransforms;
 import org.apache.sis.image.PixelIterator;
 import org.apache.sis.util.ArgumentChecks;
 import org.apache.sis.util.ArraysExt;
@@ -55,27 +55,40 @@ public final class Isolines {
     private final IsolineTracer.Level[] levels;
 
     /**
-     * Creates an initially empty set of isolines for the given levels.
-     * The given array should be a clone of user-provided array because
-     * this constructor may modify it in-place.
+     * Creates an initially empty set of isolines for the given levels. The given {@code values}
+     * array should be one of the arrays validated by {@link #cloneAndSort(double[][])}.
      */
     private Isolines(final IsolineTracer tracer, final int band, final double[] values, final int width) {
-        Arrays.sort(values);
-        int n = values.length;
-        while (n > 0 && Double.isNaN(values[n-1])) n--;
-        for (int i=n; --i>0;) {
-            if (values[i] == values[i-1]) {
-                // Remove duplicated elements. May replace -0 by +0.
-                System.arraycopy(values, i, values, i-1, n-- - i);
-            }
-        }
-        levels = new IsolineTracer.Level[n];
-        for (int i=0; i<n; i++) {
+        levels = new IsolineTracer.Level[values.length];
+        for (int i=0; i<values.length; i++) {
             levels[i] = tracer.new Level(band, values[i], width);
         }
     }
 
     /**
+     * Ensures that the given arrays are sorted and contains no NaN value.
+     */
+    private static double[][] cloneAndSort(double[][] levels) {
+        levels.clone();
+        for (int j=0; j<levels.length; j++) {
+            double[] values = levels[j];
+            ArgumentChecks.ensureNonNullElement("levels", j, values);
+            values = values.clone();
+            Arrays.sort(values);
+            int n = values.length;
+            while (n > 0 && Double.isNaN(values[n-1])) n--;
+            for (int i=n; --i>0;) {
+                if (values[i] == values[i-1]) {
+                    // Remove duplicated elements. May replace -0 by +0.
+                    System.arraycopy(values, i, values, i-1, n-- - i);
+                }
+            }
+            levels[j] = ArraysExt.resize(values, n);
+        }
+        return levels;
+    }
+
+    /**
      * Sets the specified bit on {@link IsolineTracer.Level#isDataAbove} for all levels lower than given value.
      *
      * <h4>How strict equalities are handled</h4>
@@ -106,6 +119,7 @@ public final class Isolines {
     /**
      * Generates isolines at the specified levels computed from data provided by the given image.
      * Isolines will be created for every bands in the given image.
+     * This method performs all computation sequentially in current thread.
      *
      * @param  data       image providing source values.
      * @param  levels     values for which to compute isolines. An array should be provided for each band.
@@ -117,20 +131,147 @@ public final class Isolines {
      * @throws TransformException if an interpolated point can not be transformed using the given transform.
      */
     public static Isolines[] generate(final RenderedImage data, final double[][] levels,
-                                      MathTransform gridToCRS) throws TransformException
+                                      final MathTransform gridToCRS) throws TransformException
     {
         ArgumentChecks.ensureNonNull("data",   data);
         ArgumentChecks.ensureNonNull("levels", levels);
-        {   // For keeping variable locale.
-            final MathTransform gridToImage = MathTransforms.translation(data.getMinX(), data.getMinY());
-            if (gridToCRS == null) {
-                gridToCRS = gridToImage;
-            } else {
-                ArgumentChecks.ensureDimensionsMatch("gridToCRS", 2, 2, gridToCRS);
-                gridToCRS = MathTransforms.concatenate(gridToImage, gridToCRS);
+        return flush(generate(iterators().create(data), cloneAndSort(levels), gridToCRS));
+    }
+
+    /**
+     * Generates isolines in background using an arbitrary amount of processors.
+     * This method returns immediately (i.e. the current thread is not used for isoline computation).
+     * The result will become available at a later time in the {@link Future} object.
+     *
+     * @param  data       image providing source values.
+     * @param  levels     values for which to compute isolines. An array should be provided for each band.
+     *                    If there is more bands than {@code levels.length}, the last array is reused for
+     *                    all remaining bands.
+     * @param  gridToCRS  transform from pixel coordinates to geometry coordinates, or {@code null} if none.
+     *                    Integer source coordinates are located at pixel centers.
+     * @return a {@code Future} representing pending completion of isoline computation.
+     */
+    public static Future<Isolines[]> parallelGenerate(final RenderedImage data, final double[][] levels,
+                                                      final MathTransform gridToCRS)
+    {
+        ArgumentChecks.ensureNonNull("data",   data);
+        ArgumentChecks.ensureNonNull("levels", levels);
+        return new Process(data, cloneAndSort(levels), gridToCRS).execute();
+    }
+
+    /**
+     * Returns a provider of {@link PixelIterator} suitable to isoline computations.
+     * It is critical that iterators use {@link SequenceType#LINEAR} iterator order.
+     */
+    private static PixelIterator.Builder iterators() {
+        return new PixelIterator.Builder().setIteratorOrder(SequenceType.LINEAR);
+    }
+
+    /**
+     * Creates all polylines that were still pending. This method should be the very last step,
+     * when all other computations finished. Pending polylines are sequences of points not yet
+     * stored in geometry objects because they were waiting to see if additional points would
+     * close them as polygons. This {@code flush()} method is invoked when we know that it will
+     * not be the case because there is no more points to add.
+     *
+     * @param  isolines  the isolines for which to write pending polylines.
+     * @return the {@code isolines} array, returned for convenience.
+     * @throws TransformException if an error occurred during polylines creation.
+     */
+    private static Isolines[] flush(final Isolines[] isolines) throws TransformException {
+        for (final Isolines isoline : isolines) {
+            for (final IsolineTracer.Level level : isoline.levels) {
+                level.flush();
             }
         }
-        final PixelIterator iterator = new PixelIterator.Builder().setIteratorOrder(SequenceType.LINEAR).create(data);
+        return isolines;
+    }
+
+    /**
+     * Wraps {@code Isolines.generate(…)} calculation in a process for parallel execution.
+     * The source image is divided in sub-region and the isolines in each sub-region will
+     * be computed in a different thread.
+     */
+    private static final class Process extends TiledProcess<Isolines[]> {
+        /**
+         * Values for which to compute isolines. An array should be provided for each band.
+         * If there is more bands than {@code levels.length}, the last array is reused for
+         * all remaining bands.
+         *
+         * @see #cloneAndSort(double[][])
+         */
+        private final double[][] levels;
+
+        /**
+         * Transform from image upper left corner (in pixel coordinates) to geometry coordinates.
+         */
+        private final MathTransform gridToCRS;
+
+        /**
+         * Creates a process for parallel isoline computation.
+         *
+         * @param  data       image providing source values.
+         * @param  levels     values for which to compute isolines.
+         * @param  gridToCRS  transform from pixel coordinates to geometry coordinates, or {@code null} if none.
+         */
+        Process(final RenderedImage data, final double[][] levels, final MathTransform gridToCRS) {
+            super(data, 1, 1, iterators());
+            this.levels = levels;
+            this.gridToCRS = gridToCRS;
+        }
+
+        /**
+         * Invoked by {@link TiledProcess} for creating a sub-task
+         * doing isoline computation on a sub-region of the image.
+         */
+        @Override
+        protected Task createSubTask() {
+            return new Tile();
+        }
+
+        /**
+         * A sub-task doing isoline computation on a sub-region of the image.
+         * The region is determined by the {@link #iterator}.
+         */
+        private final class Tile extends Task {
+            /** Isolines computed in the sub-region of this sub-task. */
+            private Isolines[] isolines;
+
+            /** Creates a new sub-task. */
+            Tile() {
+            }
+
+            /** Invoked in a background thread for performing isoline computation. */
+            @Override protected void execute() throws TransformException {
+                isolines = generate(iterator, levels, gridToCRS);
+            }
+
+            /** Invoked in a background thread for merging results of two sub-tasks. */
+            @Override protected void merge(final Task neighbor) throws TransformException {
+                for (int i=0; i<isolines.length; i++) {
+                    final Isolines target = isolines[i];
+                    final Isolines source = ((Tile) neighbor).isolines[i];
+                    for (int j=0; j<target.levels.length; j++) {
+                        target.levels[j].merge(source.levels[j]);
+                    }
+                }
+            }
+
+            /** Invoked on the last sub-task (after all merges) for getting final result. */
+            @Override protected Isolines[] result() throws TransformException {
+                return flush(isolines);
+            }
+        }
+    }
+
+    /**
+     * Generates isolines for the specified levels in the region traversed by the given iterator.
+     * This is the implementation of {@link #generate(RenderedImage, double[][], MathTransform)}
+     * but potentially on a sub-region for parallel "fork-join" execution.
+     */
+    private static Isolines[] generate(final PixelIterator iterator, final double[][] levels,
+                                       final MathTransform gridToCRS) throws TransformException
+    {
         /*
          * Prepares a window of size 2×2 pixels over pixel values. Window elements are traversed
          * by incrementing indices in following order: band, column, row. The window content will
@@ -138,7 +279,7 @@ public final class Isolines {
          */
         final int numBands = iterator.getNumBands();
         final double[] window = new double[numBands * 4];
-        final IsolineTracer tracer = new IsolineTracer(window, numBands, gridToCRS);
+        final IsolineTracer tracer = new IsolineTracer(window, numBands, iterator.getDomain(), gridToCRS);
         /*
          * Prepare the set of isolines for each band in the image.
          * The number of cells on the horizontal axis is one less
diff --git a/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/TiledProcess.java b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/TiledProcess.java
new file mode 100644
index 0000000..350cdd8
--- /dev/null
+++ b/core/sis-feature/src/main/java/org/apache/sis/internal/processing/image/TiledProcess.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sis.internal.processing.image;
+
+import java.awt.Rectangle;
+import java.awt.image.RenderedImage;
+import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.sis.internal.system.CommonExecutor;
+import org.apache.sis.internal.util.Numerics;
+import org.apache.sis.internal.jdk9.JDK9;
+import org.apache.sis.image.PixelIterator;
+import org.apache.sis.util.ArgumentChecks;
+
+
+/**
+ * Calculation in a two-dimensional space that can be subdivided in smaller calculations in sub-regions.
+ * This class manages a kind of fork-join task but differs from {@link java.util.concurrent.ForkJoinPool}
+ * in the following aspects:
+ *
+ * <ul class="verbose">
+ *   <li>The fork and join processes are presumed relatively costly (i.e. they may need to reconstruct
+ *       geometries splitted in two consecutive tiles). So instead of having many medium tasks waiting
+ *       for a thread to take them, it may be more efficient to have fewer tasks processing larger areas.
+ *       {@code TiledProcess} tries to create a number of sub-tasks close to the number of processors.
+ *       This is a different approach than "work stealing" algorithm applied by JDK {@code ForkJoinPool},
+ *       which is designed for smaller (and more easily separable) non-blocking tasks.</li>
+ *   <li>The main task is splitted in sub-tasks with a single fork step, with two division factors along
+ *       <var>x</var> and <var>y</var> axes which can be any integer (not necessarily powers of 2). This
+ *       is a different approach than JDK {@code ForkJoinPool} where tasks are forked recursively in two
+ *       sub-tasks at each step.</li>
+ *   <li>The join operation tries to operate on tiles that are neighbors in the two dimensional space.
+ *       It allows the join operation to merge geometries that are splited between two tiles.</li>
+ *   <li>Tasks may block on I/O operations. We want to avoid blocking the JDK common fork/join pool,
+ *       so we use a separated pool.</li>
+ * </ul>
+ *
+ * The tiling applied by {@code TiledProcess} is independent of {@link RenderedImage} tiling.
+ * This class assumes that the objects to be calculated are geometries or other non-raster data.
+ * Consequently tile size will be determined by other considerations such as the number of processors.
+ *
+ * @author  Martin Desruisseaux (Geomatys)
+ * @version 1.1
+ *
+ * @param  <R>  the type of value computed as a result of this process.
+ *
+ * @since 1.1
+ * @module
+ */
+abstract class TiledProcess<R> {
+    /**
+     * Minimal "tile" size for sub-tasks computation. That size should not be too small because the
+     * fork/join processes have some extra cost compared to processing the whole image as one single tile.
+     */
+    private static final int MIN_TILE_SIZE = 400;
+
+    /**
+     * Number of threads that are running. This is used for knowing when a thread is the last one.
+     */
+    private final AtomicInteger runningThreads;
+
+    /**
+     * All tasks executed in parallel threads. The array length should be less than the number of processors.
+     * All read and write accesses to array elements must be done inside a {@code synchronized(tasks)} block.
+     *
+     * <p>This array initially contains only {@code null} elements. Non-null elements are assigned after
+     * {@link Task#execute()} completion but before {@link Task#merge(Task)}.</p>
+     */
+    private final Task[] tasks;
+
+    /**
+     * Number of tiles (or tasks) on the <var>x</var> axis. Used for computing (x,y) coordinates of elements
+     * in the {@link #tasks} array, considering that the array is a matrix encoded in row-major fashion.
+     * This is the increment to apply on array index for incrementing the <var>y</var> coordinate by one.
+     */
+    private final int yStride;
+
+    /**
+     * Index of this {@code TiledProcess} instance in the {@link #tasks} array.
+     * This is a temporary variable for {@link Task#index} initialization only.
+     */
+    private int taskIndex;
+
+    /**
+     * Iterator over the pixel for each element in the {@link #tasks} array.
+     * This is a temporary variable for {@link Task#iterator} initialization only.
+     */
+    private PixelIterator[] iterators;
+
+    /**
+     * Prepares {@link TiledProcess} for execution of a task splitted in different regions.
+     * This constructor splits the given image in sub-regions ("tiles" but not in the sense
+     * of {@link RenderedImage} tiles), then creates a pixel iterator for each sub-region.
+     * Iterators are created with the given {@link org.apache.sis.image.PixelIterator.Builder},
+     * which should be configured by the caller in all desired aspects (e.g. iteration order) except the
+     * {@linkplain org.apache.sis.image.PixelIterator.Builder#setRegionOfInterest(Rectangle) region of interest},
+     * which will be overwritten by this method.
+     *
+     * <p>Usage example:</p>
+     * {@preformat java
+     *     TiledProcess process = new TiledProcess<MyResultType>(image, 0, 0,
+     *             new PixelIterator.Builder().setIteratorOrder(SequenceType.LINEAR))
+     *     {
+     *         &#64;Override
+     *         protected Task createSubTask() {
+     *             return new SubTask();
+     *         }
+     *
+     *         private final class SubTask extends Task {
+     *             private MyResultType result;
+     *
+     *             &#64;Override protected void execute() {result = ...}    // Do calculation in background thread.
+     *             &#64;Override protected void merge(Task neighbor) {...}  // Merge this.result with neighbor.result().
+     *             &#64;Override protected MyResultType result() {return result;}
+     *         }
+     *     };
+     *     process.execute();
+     * }
+     *
+     * @param  data             the image on which to iterate over the pixels.
+     * @param  overlapX         the number of overlapping pixels between tiles on the <var>x</var> axis.
+     * @param  overlapY         the number of overlapping pixels between tiles on the <var>y</var> axis.
+     * @param  iteratorFactory  a pre-configured (except for sub-region aspect) supplier of pixel iterators.
+     * @throws ArithmeticException if the image size is too large.
+     */
+    @SuppressWarnings({"rawtypes", "unchecked"})                // Generic array creation.
+    protected TiledProcess(final RenderedImage data, final int overlapX, final int overlapY,
+                           final PixelIterator.Builder iteratorFactory)
+    {
+        ArgumentChecks.ensurePositive("overlapX", overlapX);
+        ArgumentChecks.ensurePositive("overlapY", overlapY);
+        /*
+         * Get a first estimation of the number of tiles. If the total number is greater than `PARALLELISM`
+         * (number of processors - 1), scale the number of tiles for having a total less than `PARALLELISM`.
+         */
+        final int width  = data.getWidth();
+        final int height = data.getHeight();
+        int  numTileX = Math.max(width  / MIN_TILE_SIZE, 1);
+        int  numTileY = Math.max(height / MIN_TILE_SIZE, 1);
+        long numTiles = JDK9.multiplyFull(numTileX, numTileY);
+        if (numTiles > CommonExecutor.PARALLELISM) {
+            final double r = Math.sqrt(CommonExecutor.PARALLELISM / (double) numTiles);     // Always < 1.
+            if (numTileX >= numTileY) {
+                numTileX = Math.max((int) Math.round(numTileX * r), 1);
+                numTileY = Math.max(Math.min(CommonExecutor.PARALLELISM / numTileX, numTileY), 1);
+            } else {
+                numTileY = Math.max((int) Math.round(numTileY * r), 1);
+                numTileX = Math.max(Math.min(CommonExecutor.PARALLELISM / numTileY, numTileX), 1);
+            }
+        }
+        yStride        = numTileX;
+        tasks          = new TiledProcess.Task[numTileX * numTileY];    // length ≤ CommonExecutor.PARALLELISM.
+        runningThreads = new AtomicInteger(tasks.length);
+        /*
+         * Prepare the pixel iterators for all sub-tasks, without starting those sub-tasks yet.
+         * The sub-tasks will be created and started by `execute()` after everything is ready.
+         */
+        final int xmin = data.getMinX();
+        final int ymin = data.getMinY();
+        final int xmax = Math.addExact(xmin, width);
+        final int ymax = Math.addExact(ymin, height);
+        final int xinc = Numerics.ceilDiv(width,  numTileX);
+        final int yinc = Numerics.ceilDiv(height, numTileY);
+        final Rectangle subArea = new Rectangle(Math.addExact(xinc, overlapX),
+                                                Math.addExact(yinc, overlapY));
+        int count = 0;
+        iterators = new PixelIterator[tasks.length];
+        for (subArea.y = ymin; subArea.y < ymax; subArea.y += yinc) {
+            for (subArea.x = xmin; subArea.x < xmax; subArea.x += xinc) {
+                iterators[count++] = iteratorFactory.setRegionOfInterest(subArea).create(data);
+            }
+        }
+        assert count == tasks.length;
+    }
+
+    /**
+     * Starts execution of each sub-task in its own thread.
+     *
+     * @return a {@code Future} representing pending completion of the task.
+     */
+    public final Future<R> execute() {
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        final Future<R>[] components = new Future[tasks.length];
+        while (taskIndex < components.length) {
+            final Task task = createSubTask();
+            components[taskIndex++] = CommonExecutor.instance().submit(task);
+        }
+        iterators = null;       // Clear only after all tasks have been created.
+        return CompoundFuture.create(components);
+    }
+
+    /**
+     * Creates a sub-task doing the computation on a sub-region of the image.
+     * This method is invoked by {@link #execute()} for each "tile" where the
+     * sub-task will be executed. Each sub-tasks will have its
+     * {@linkplain Task#iterator own pixel iterator}.
+     *
+     * @return a sub-task over a sub-region of the image to process.
+     */
+    protected abstract Task createSubTask();
+
+    /**
+     * A task to be executed in {@link TiledProcess} for a sub-region of the image to process.
+     *
+     * <p>This class implements {@link Callable} for {@code TiledProcess} convenience.
+     * This implementation details should be ignored; it may change in any future version.</p>
+     */
+    abstract class Task implements Callable<R> {
+        /**
+         * Index of this {@code Task} instance in the {@link TiledProcess#tasks} array.
+         */
+        private final int index;
+
+        /**
+         * Iterator over the pixels in the sub-region explored by this task.
+         */
+        protected final PixelIterator iterator;
+
+        /**
+         * Synchronization lock during the merge phase. Created only before merge attempts.
+         * This reference shall be considered final after initialization.
+         */
+        private ReentrantLock merging;
+
+        /**
+         * Creates a new sub-task to be executed in a sub-area of a two-dimensional plane.
+         */
+        protected Task() {
+            index = taskIndex;
+            iterator = iterators[index];
+        }
+
+        /**
+         * Executes this sub-task. This method is invoked by {@link TiledProcess#execute()}
+         * in a background thread. Implementation should store the result in this {@code Task}
+         * instance for future {@linkplain #merge(Task) merge}.
+         *
+         * @throws Exception if an error occurred during sub-task execution.
+         */
+        protected abstract void execute() throws Exception;
+
+        /**
+         * Merges the result of given sub-task into this one. The given {@code Task} will be a neighbor tile
+         * (located on top, left, right or bottom of this tile) on a <em>best-effort</em> basis (no guarantee).
+         * After this method call, all data should be in {@code this} and the {@code neighbor} sub-task will be
+         * discarded.
+         *
+         * @param  neighbor  the other sub-task to merge with this one.
+         * @throws Exception if an error occurred during merge operation.
+         */
+        protected abstract void merge(Task neighbor) throws Exception;
+
+        /**
+         * Returns the computation result. This method is invoked by {@link TiledProcess} only once,
+         * on the last {@code Task} instance after all other instances have been {@linkplain #merge
+         * merged} on {@code this}.
+         *
+         * @return the computation result.
+         * @throws Exception if final result can not be computed.
+         */
+        protected abstract R result() throws Exception;
+
+        /**
+         * Executes this sub-task, then tries to merge it with all neighbor sub-tasks that are completed.
+         * This method is public as an implementation side-effect and should not be invoked directly;
+         * it will typically be invoked by {@link java.util.concurrent.Executor} instead.
+         *
+         * @return value computed by this sub-task, or {@code null} if this is not the last task.
+         * @throws Exception if an error occurred during sub-task execution or merge operation.
+         *
+         * @see java.util.concurrent.Executor#execute(Runnable)
+         */
+        @Override
+        @SuppressWarnings("fallthrough")
+        public final R call() throws Exception {
+            execute();
+            final Task[] tasks = TiledProcess.this.tasks;
+            final int yStride  = TiledProcess.this.yStride;
+            final int rowStart = index - (index % yStride);
+            /*
+             * No lock can exist for this `Task` before this line because they were no reference to
+             * `this` in the `tasks` array. This reference will be added (potentially many times)
+             * after the lock for making sure that no other thread uses it before we finished.
+             */
+            assert merging == null;
+            merging = new ReentrantLock();
+            merging.lock();
+            try {
+                synchronized (tasks) {
+                    assert tasks[index] == null;
+                    tasks[index] = this;
+                }
+                /*
+                 * We will check the 4 neighbors (top, left, right, bottom) for another `Task` instance
+                 * that we can merge with `this`. We may recheck those 4 neighbors many times depending
+                 * how many we found in previous iteration:
+                 *
+                 *   - If `count == 4` we are done and the loop is stopped.
+                 *   - If `count == 0` we found no neighbor and abandon. Note that neighbors may appear later
+                 *     if they were still under computation at the time this method is invoked. A final check
+                 *     for late additions will be done sequentially in the last thread.
+                 *   - For all other counts, we merged some but not all neighbors. Maybe new neighbors became
+                 *     available during the time we did the merge, so the 4 neighbors will be checked again.
+                 */
+                int count = 0;
+retry:          for (int side=0;; side++) {         // Break condition is inside the loop.
+                    final boolean valid;            // Whether `i` index is inside bounds.
+                    final int i;                    // Neighbor index in the `tasks` array.
+                    switch (side) {
+                        default: {
+                            if ((count & 3) == 0) break retry;       // `count` == 0 or 4.
+                            count = 0;
+                            side  = 0;              // Continue (fallthrough) with case 0.
+                        }
+                        case 0: i = index - yStride; valid = (i >= 0);                  break;    // Neighbor on top.
+                        case 1: i = index - 1;       valid = (i >= rowStart);           break;    // Neighbor on left.
+                        case 2: i = index + 1;       valid = (i <  rowStart + yStride); break;    // Neighbor on right.
+                        case 3: i = index + yStride; valid = (i <  tasks.length);       break;    // Neighbor on bottom.
+                    }
+                    if (valid) {
+                        /*
+                         * Index was computed without synchronization, but access to `tasks` elements
+                         * need to be synchronized. If we find a neighbor that we can merge, replace
+                         * immediately all instances by `this` so that no other thread will merge it.
+                         * The `this` instance will be ignored by other threads until this method exits
+                         * because we hold the `this.merging` lock.
+                         */
+                        final Task neighbor;
+                        synchronized (tasks) {
+                            neighbor = tasks[i];
+                            if (neighbor == null || neighbor == this || !neighbor.merging.tryLock()) {
+                                continue;       // Ignore (for now) this neighbor, check next neighbor.
+                            }
+                            for (int j=0; j<tasks.length; j++) {
+                                if (tasks[j] == neighbor) {
+                                    tasks[j] = this;
+                                }
+                            }
+                        }
+                        /*
+                         * Do the actual merge between sub-tasks while we hold the lock on the two
+                         * `Task` instances. The `neighbor` reference is discarded after this block.
+                         */
+                        try {
+                            merge(neighbor);
+                        } finally {
+                            neighbor.merging.unlock();
+                        }
+                        count++;
+                    }
+                }
+            } finally {
+                merging.unlock();
+            }
+            /*
+             * If this thread is the last one, check if there is any unmerged `Task` instances.
+             * There is no more synchronization at this point because no other thread is using
+             * those objects; this final check is monothread and sequential.
+             */
+            if (runningThreads.decrementAndGet() != 0) {
+                return null;
+            }
+            for (int i=0; i<tasks.length; i++) {
+                final Task other = tasks[i];
+                if (other != null && other != this) {
+                    assert !other.merging.isLocked();
+                    for (int j=i; j<tasks.length; j++) {
+                        if (tasks[j] == other) {
+                            tasks[j] = this;
+                        }
+                    }
+                    merge(other);
+                }
+            }
+            return result();
+        }
+    }
+}


Mime
View raw message