lucenenet-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [lucenenet] NightOwl888 opened a new issue #354: TaskMergeScheduler: Salvage or deprecate?
Date Fri, 25 Sep 2020 14:14:04 GMT

NightOwl888 opened a new issue #354:
URL: https://github.com/apache/lucenenet/issues/354


   [`TaskMergeScheduler`](https://github.com/apache/lucenenet/blob/ece6bea0a3c98961a77d7060b5615fccefabe725/src/Lucene.Net/Support/Index/TaskMergeScheduler.cs)
was originally created to sidestep issues with limited multithreading support on .NET Standard
1.x. However, since we have dropped support for .NET Standard 1.x, it is no longer technically
a requirement. While having a merge scheduler based on TPL sounds like a brilliant idea, in
its current incarnation it falls considerably short of where it needs to be to be included
in the release.
   
   1. It is several orders of magnitude slower than `ConcurrentMergeScheduler`.
   2. The `Lucene.Net.Index.TestTaskMergeScheduler::TestSubclassTaskMergeScheduler()` test
occasionally fails (see #269).
   
   `TaskMergeScheduler` has been removed from being randomly injected into tests in the test
framework because of the negative performance impact it has on test runs.
   
   I have fixed a few bugs with it, including the fact it was failing to re-throw a background
merge failure, a requirement for `IndexWriter` to retry. 
   
   The main performance issue stems from the fact that it uses `ToArray()` to copy its `_mergeThreads`
collection when looping. An attempt was made to replace the collection with [`ConcurrentHashSet<T>`](https://github.com/apache/lucenenet/blob/ece6bea0a3c98961a77d7060b5615fccefabe725/src/Lucene.Net/Support/ConcurrentHashSet.cs)
and remove `ToArray()` on each loop, but at least one of the tests starts failing with that
change.
   
   So, my question is should we attempt to salvage it, or deprecate it? By deprecate, I mean
it will be marked obsolete in the next beta and deleted from the project in the first release
candidate.
   
   If someone wants to take a stab at either fixing it or redesigning it, it is fair game.
This could be an opportunity to add a more performant alternative to `ConcurrentMergeScheduler`
to the project. If it performs adequately, we will consider making it the default.
   
   ```c#
   using J2N.Collections.Generic.Extensions;
   using Lucene.Net.Support.Threading;
   using Lucene.Net.Util;
   using System;
   using System.Collections.Generic;
   using System.Linq;
   using System.Runtime.CompilerServices;
   using System.Text;
   using System.Threading;
   using System.Threading.Tasks;
   using Directory = Lucene.Net.Store.Directory;
   
   namespace Lucene.Net.Index
   {
       /*
        * Licensed to the Apache Software Foundation (ASF) under one or more
        * contributor license agreements.  See the NOTICE file distributed with
        * this work for additional information regarding copyright ownership.
        * The ASF licenses this file to You under the Apache License, Version 2.0
        * (the "License"); you may not use this file except in compliance with
        * the License.  You may obtain a copy of the License at
        *
        *     http://www.apache.org/licenses/LICENSE-2.0
        *
        * Unless required by applicable law or agreed to in writing, software
        * distributed under the License is distributed on an "AS IS" BASIS,
        * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        * See the License for the specific language governing permissions and
        * limitations under the License.
        */
   
       /// <summary>
       /// A <see cref="MergeScheduler"/> that runs each merge using
       /// <see cref="Task"/>s on the default <see cref="TaskScheduler"/>.
       /// 
       /// <para>If more than <see cref="MaxMergeCount"/> merges are
       /// requested then this class will forcefully throttle the
       /// incoming threads by pausing until one more more merges
       /// complete.</para>
       ///  
       /// LUCENENET specific
       /// </summary>
       public class TaskMergeScheduler : MergeScheduler, IConcurrentMergeScheduler
       {
           public const string COMPONENT_NAME = "CMS";
   
           private readonly TaskScheduler _taskScheduler = TaskScheduler.Default;
           private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
           private readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim();
           /// <summary>
           /// List of currently active <see cref="MergeThread"/>s.</summary>
           private readonly IList<MergeThread> _mergeThreads = new List<MergeThread>();
   
           /// <summary>
           /// How many <see cref="MergeThread"/>s have kicked off (this is use
           /// to name them).
           /// </summary>
           private int _mergeThreadCount;
   
           /// <summary>
           /// <see cref="Directory"/> that holds the index. </summary>
           private Directory _directory;
   
           /// <summary>
           /// <see cref="IndexWriter"/> that owns this instance.
           /// </summary>
           private IndexWriter _writer;
   
           /// <summary>
           /// Sole constructor, with all settings set to default
           /// values.
           /// </summary>
           public TaskMergeScheduler() : base()
           {
               MaxThreadCount = _taskScheduler.MaximumConcurrencyLevel;
               MaxMergeCount = _taskScheduler.MaximumConcurrencyLevel;
           }
   
           /// <summary>
           /// Sets the maximum number of merge threads and simultaneous merges allowed.
           /// </summary>
           /// <param name="maxMergeCount"> The max # simultaneous merges that are allowed.
           ///       If a merge is necessary yet we already have this many
           ///       threads running, the incoming thread (that is calling
           ///       add/updateDocument) will block until a merge thread
           ///       has completed.  Note that we will only run the
           ///       smallest <paramref name="maxThreadCount"/> merges at a time. </param>
           /// <param name="maxThreadCount"> The max # simultaneous merge threads that
should
           ///       be running at once.  This must be &lt;= <paramref name="maxMergeCount"/>
</param>
           public void SetMaxMergesAndThreads(int maxMergeCount, int maxThreadCount)
           {
               // This is handled by TaskScheduler.Default.MaximumConcurrencyLevel
           }
   
           /// <summary>
           /// Max number of merge threads allowed to be running at
           /// once.  When there are more merges then this, we
           /// forcefully pause the larger ones, letting the smaller
           /// ones run, up until <see cref="MaxMergeCount"/> merges at which point
           /// we forcefully pause incoming threads (that presumably
           /// are the ones causing so much merging).
           /// </summary>
           /// <seealso cref="SetMaxMergesAndThreads(int, int)"/>
           public int MaxThreadCount { get; private set; }
   
           /// <summary>
           /// Max number of merges we accept before forcefully
           /// throttling the incoming threads
           /// </summary>
           public int MaxMergeCount { get; private set; }
   
           /// <summary>
           /// Return the priority that merge threads run at. This is always the same.
           /// </summary>
           public int MergeThreadPriority
           {
               get
               {
   #if !FEATURE_THREAD_PRIORITY
                   return 2;
   #else
                   return (int)ThreadPriority.Normal;
   #endif 
               }
           }
   
           /// <summary>
           /// This method has no effect in <see cref="TaskMergeScheduler"/> because
the
           /// <see cref="MergeThreadPriority"/> returns a constant value.
           /// </summary>
           public void SetMergeThreadPriority(int priority)
           {
           }
   
           /// <summary>
           /// Called whenever the running merges have changed, to pause &amp; unpause
           /// threads. This method sorts the merge threads by their merge size in
           /// descending order and then pauses/unpauses threads from first to last --
           /// that way, smaller merges are guaranteed to run before larger ones.
           /// </summary>
           private void UpdateMergeThreads()
           {
               foreach (var merge in _mergeThreads.ToArray())
               {
                   // Prune any dead threads
                   if (!merge.IsAlive)
                   {
                       _mergeThreads.Remove(merge);
                       merge.Dispose();
                   }
               }
           }
   
           /// <summary>
           /// Returns <c>true</c> if verbosing is enabled. This method is usually
used in
           /// conjunction with <see cref="Message(string)"/>, like that:
           ///
           /// <code>
           /// if (Verbose) {
           ///     Message(&quot;your message&quot;);
           /// }
           /// </code>
           /// </summary>
           protected bool Verbose => _writer != null && _writer.infoStream.IsEnabled(COMPONENT_NAME);
   
           /// <summary>
           /// Outputs the given message - this method assumes <see cref="Verbose"/>
was
           /// called and returned <c>true</c>.
           /// </summary>
           protected virtual void Message(string message)
           {
               _writer.infoStream.Message(COMPONENT_NAME, message);
           }
   
           protected override void Dispose(bool disposing)
           {
               Sync();
               _manualResetEvent.Dispose();
           }
   
           /// <summary>
           /// Wait for any running merge threads to finish. 
           /// This call is not interruptible as used by <see cref="MergeScheduler.Dispose()"/>.
           /// </summary>
           public virtual void Sync()
           {
               foreach (var merge in _mergeThreads.ToArray())
               {
                   if (merge == null || !merge.IsAlive)
                   {
                       continue;
                   }
   
                   try
                   {
                       merge.Wait();
                   }
                   catch (OperationCanceledException)
                   {
                       // expected when we cancel.
                   }
                   catch (AggregateException ae)
                   {
                       ae.Handle(ex =>
                       {
                           if (!(ex is OperationCanceledException))
                           {
                               HandleMergeException(ex);
                               return true;
                           }
   
                           return false;
                       });
                   }
               }
           }
   
           /// <summary>
           /// Returns the number of merge threads that are alive. Note that this number
           /// is &lt;= <see cref="_mergeThreads"/> size.
           /// </summary>
           private int MergeThreadCount
           {
               get { return _mergeThreads.Count(x => x.IsAlive && x.CurrentMerge
!= null); }
           }
   
           [MethodImpl(MethodImplOptions.NoInlining)]
           public override void Merge(IndexWriter writer, MergeTrigger trigger, bool newMergesFound)
           {
               using (_lock.Write())
               {
                   _writer = writer;
                   _directory = writer.Directory;
   
                   if (Verbose)
                   {
                       Message("now merge");
                       Message("  index: " + writer.SegString());
                   }
   
                   // First, quickly run through the newly proposed merges
                   // and add any orthogonal merges (ie a merge not
                   // involving segments already pending to be merged) to
                   // the queue.  If we are way behind on merging, many of
                   // these newly proposed merges will likely already be
                   // registered.
   
                   // Iterate, pulling from the IndexWriter's queue of
                   // pending merges, until it's empty:
                   while (true)
                   {
                       long startStallTime = 0;
                       while (writer.HasPendingMerges() && MergeThreadCount >=
MaxMergeCount)
                       {
                           // this means merging has fallen too far behind: we
                           // have already created maxMergeCount threads, and
                           // now there's at least one more merge pending.
                           // Note that only maxThreadCount of
                           // those created merge threads will actually be
                           // running; the rest will be paused (see
                           // updateMergeThreads).  We stall this producer
                           // thread to prevent creation of new segments,
                           // until merging has caught up:
                           startStallTime = Environment.TickCount;
                           if (Verbose)
                           {
                               Message("    too many merges; stalling...");
                           }
   
                           _manualResetEvent.Reset();
                           _manualResetEvent.Wait();
                       }
   
                       if (Verbose)
                       {
                           if (startStallTime != 0)
                           {
                               Message("  stalled for " + (Environment.TickCount - startStallTime)
+ " msec");
                           }
                       }
   
                       MergePolicy.OneMerge merge = writer.NextMerge();
                       if (merge == null)
                       {
                           if (Verbose)
                           {
                               Message("  no more merges pending; now return");
                           }
                           return;
                       }
   
                       bool success = false;
                       try
                       {
                           if (Verbose)
                           {
                               Message("  consider merge " + writer.SegString(merge.Segments));
                           }
   
                           // OK to spawn a new merge thread to handle this
                           // merge:
                           var merger = CreateTask(writer, merge);
   
                           merger.MergeThreadCompleted += OnMergeThreadCompleted;
   
                           _mergeThreads.Add(merger);
   
                           if (Verbose)
                           {
                               Message("    launch new thread [" + merger.Name + "]");
                           }
   
                           merger.Start(_taskScheduler);
   
                           // Must call this after starting the thread else
                           // the new thread is removed from mergeThreads
                           // (since it's not alive yet):
                           UpdateMergeThreads();
   
                           success = true;
                       }
                       finally
                       {
                           if (!success)
                           {
                               writer.MergeFinish(merge);
                           }
                       }
                   }
               }
           }
   
           /// <summary>
           /// Does the actual merge, by calling <see cref="IndexWriter.Merge(MergePolicy.OneMerge)"/>
</summary>
           [MethodImpl(MethodImplOptions.NoInlining)]
           protected virtual void DoMerge(MergePolicy.OneMerge merge)
           {
               _writer.Merge(merge);
           }
   
           private void OnMergeThreadCompleted(object sender, EventArgs e)
           {
               var mergeThread = sender as MergeThread;
   
               if (mergeThread == null)
               {
                   return;
               }
   
               mergeThread.MergeThreadCompleted -= OnMergeThreadCompleted;
   
               using (_lock.Write())
               {
                   UpdateMergeThreads();
               }
           }
   
           /// <summary>
           /// Create and return a new <see cref="MergeThread"/> </summary>
           private MergeThread CreateTask(IndexWriter writer, MergePolicy.OneMerge merge)
           {
               var count = Interlocked.Increment(ref _mergeThreadCount);
               var name = string.Format("Lucene Merge Task #{0}", count);
   
               return new MergeThread(name, writer, merge, writer.infoStream, Verbose, _manualResetEvent,
HandleMergeException, DoMerge);
           }
   
           /// <summary>
           /// Called when an exception is hit in a background merge
           /// thread
           /// </summary>
           protected virtual void HandleMergeException(Exception exc)
           {
               // suppressExceptions is normally only set during testing
               if (suppressExceptions)
               {
                   return;
               }
   
   //#if FEATURE_THREAD_INTERRUPT
   //            try
   //            {
   //#endif
                   // When an exception is hit during merge, IndexWriter
                   // removes any partial files and then allows another
                   // merge to run.  If whatever caused the error is not
                   // transient then the exception will keep happening,
                   // so, we sleep here to avoid saturating CPU in such
                   // cases:
                   Thread.Sleep(250);
   //#if FEATURE_THREAD_INTERRUPT // LUCENENET NOTE: Senseless to catch and rethrow the same
exception type
   //            }
   //            catch (ThreadInterruptedException ie)
   //            {
   //                throw new ThreadInterruptedException("Thread Interrupted Exception",
ie);
   //            }
   //#endif
               throw new MergePolicy.MergeException(exc, _directory);
           }
   
           private bool suppressExceptions;
   
           /// <summary>
           /// Used for testing </summary>
           public virtual void SetSuppressExceptions()
           {
               suppressExceptions = true;
           }
   
           /// <summary>
           /// Used for testing </summary>
           public virtual void ClearSuppressExceptions()
           {
               suppressExceptions = false;
           }
   
           public override string ToString()
           {
               StringBuilder sb = new StringBuilder(this.GetType().Name + ": ");
               sb.AppendFormat("maxThreadCount={0}, ", MaxThreadCount);
               sb.AppendFormat("maxMergeCount={0}", MaxMergeCount);
               return sb.ToString();
           }
   
           public override object Clone()
           {
               TaskMergeScheduler clone = (TaskMergeScheduler)base.Clone();
               clone._writer = null;
               clone._directory = null;
               clone._mergeThreads.Clear();
               return clone;
           }
   
           /// <summary>
           /// Runs a merge thread, which may run one or more merges
           /// in sequence.
           /// </summary>
           internal class MergeThread : IDisposable
           {
               public event EventHandler MergeThreadCompleted;
   
               private readonly CancellationTokenSource _cancellationTokenSource;
               private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
               private readonly ManualResetEventSlim _resetEvent;
               private readonly Action<Exception> _exceptionHandler;
               private readonly Action<MergePolicy.OneMerge> _doMerge;
               private readonly InfoStream _logger;
               private readonly IndexWriter _writer;
               private readonly MergePolicy.OneMerge _startingMerge;
               private readonly bool _isLoggingEnabled;
   
               private Task _task;
               private MergePolicy.OneMerge _runningMerge;
               private volatile bool _isDisposed = false;
               private volatile bool _isDone;
   
               /// <summary>
               /// Sole constructor. </summary>
               public MergeThread(string name, IndexWriter writer, MergePolicy.OneMerge startMerge,
                   InfoStream logger, bool isLoggingEnabled,
                   ManualResetEventSlim resetEvent, Action<Exception> exceptionHandler,
Action<MergePolicy.OneMerge> doMerge)
               {
                   Name = name;
                   _cancellationTokenSource = new CancellationTokenSource();
                   _writer = writer;
                   _startingMerge = startMerge;
                   _logger = logger;
                   _isLoggingEnabled = isLoggingEnabled;
                   _resetEvent = resetEvent;
                   _exceptionHandler = exceptionHandler;
                   _doMerge = doMerge;
               }
   
               public string Name { get; private set; }
   
               public Task Instance
               {
                   get
                   {
                       using (_lock.Read())
                       {
                           return _task;
                       }
                   }
               }
   
               /// <summary>
               /// Record the currently running merge. </summary>
               public virtual MergePolicy.OneMerge RunningMerge
               {
                   get
                   {
                       using (_lock.Read())
                       {
                           return _runningMerge;
                       }
                   }
                   set => Interlocked.Exchange(ref _runningMerge, value);
               }
   
               /// <summary>
               /// Return the current merge, or <c>null</c> if this 
               /// <see cref="MergeThread"/> is done.
               /// </summary>
               public virtual MergePolicy.OneMerge CurrentMerge
               {
                   get
                   {
                       using (_lock.Read())
                       {
                           if (_isDone)
                           {
                               return null;
                           }
   
                           return _runningMerge ?? _startingMerge;
                       }
                   }
               }
   
               public bool IsAlive
               {
                   get
                   {
                       if (_isDisposed || _isDone)
                       {
                           return false;
                       }
   
                       using (_lock.Read())
                       {
                           return _task != null
                               && (_task.Status != TaskStatus.Canceled
                               || _task.Status != TaskStatus.Faulted
                               || _task.Status != TaskStatus.RanToCompletion);
                       }
                   }
               }
   
               public void Start(TaskScheduler taskScheduler)
               {
                   using (_lock.Write())
                   {
                       if (_task == null && !_cancellationTokenSource.IsCancellationRequested)
                       {
                           _task = Task.Factory.StartNew(() => Run(_cancellationTokenSource.Token),
_cancellationTokenSource.Token, TaskCreationOptions.None, taskScheduler);
                       }
                   }
               }
   
               public void Wait()
               {
                   if (!IsAlive)
                   {
                       return;
                   }
   
                   _task.Wait(_cancellationTokenSource.Token);
               }
   
               public void Cancel()
               {
                   if (!IsAlive)
                   {
                       return;
                   }
   
                   using (_lock.Write())
                   {
                       if (!_cancellationTokenSource.IsCancellationRequested)
                       {
                           _cancellationTokenSource.Cancel();
                       }
                   }
               }
   
               private void Run(CancellationToken cancellationToken)
               {
                   // First time through the while loop we do the merge
                   // that we were started with:
                   MergePolicy.OneMerge merge = _startingMerge;
   
                   try
                   {
                       if (_isLoggingEnabled)
                       {
                           _logger.Message(COMPONENT_NAME, "  merge thread: start");
                       }
   
                       while (true && !cancellationToken.IsCancellationRequested)
                       {
                           RunningMerge = merge;
                           // LUCENENET NOTE: We MUST call DoMerge(merge) instead of 
                           // _writer.Merge(merge) because the tests specifically look
                           // for the method name DoMerge in the stack trace. 
                           _doMerge(merge);
   
                           // Subsequent times through the loop we do any new
                           // merge that writer says is necessary:
                           merge = _writer.NextMerge();
   
                           // Notify here in case any threads were stalled;
                           // they will notice that the pending merge has
                           // been pulled and possibly resume:
                           _resetEvent.Set();
   
                           if (merge != null)
                           {
                               if (_isLoggingEnabled)
                               {
                                   _logger.Message(COMPONENT_NAME, "  merge thread: do another
merge " + _writer.SegString(merge.Segments));
                               }
                           }
                           else
                           {
                               break;
                           }
                       }
   
                       if (_isLoggingEnabled)
                       {
                           _logger.Message(COMPONENT_NAME, "  merge thread: done");
                       }
                   }
                   catch (Exception exc)
                   {
                       // Ignore the exception if it was due to abort:
                       if (!(exc is MergePolicy.MergeAbortedException))
                       {
                           //System.out.println(Thread.currentThread().getName() + ": CMS:
exc");
                           //exc.printStackTrace(System.out)
                           _exceptionHandler(exc);
                       }
                   }
                   finally
                   {
                       _isDone = true;
   
                       if (MergeThreadCompleted != null)
                       {
                           MergeThreadCompleted(this, EventArgs.Empty);
                       }
                   }
               }
   
               public void Dispose()
               {
                   if (_isDisposed)
                   {
                       return;
                   }
   
                   _isDisposed = true;
                   _lock.Dispose();
                   _cancellationTokenSource.Dispose();
               }
   
               public override string ToString()
               {
                   return _task == null
                       ? string.Format("Task[{0}], Task has not been started yet.", Name)
                       : string.Format("Task[{0}], Id[{1}], Status[{2}]", Name, _task.Id,
_task.Status);
               }
   
               public override bool Equals(object obj)
               {
                   var compared = obj as MergeThread;
   
                   if (compared == null
                       || (Instance == null && compared.Instance != null)
                       || (Instance != null && compared.Instance == null))
                   {
                       return false;
                   }
   
                   return Instance.Id == compared.Instance.Id;
               }
   
               public override int GetHashCode()
               {
                   return Instance == null
                       ? base.GetHashCode()
                       : Instance.GetHashCode();
               }
           }
       }
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message