// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Concurrency { /// /// Represents an object that schedules each unit of work on a separate thread. /// public sealed class NewThreadScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic { private static readonly Lazy Instance = new(static () => new NewThreadScheduler()); private readonly Func _threadFactory; /// /// Creates an object that schedules each unit of work on a separate thread. /// public NewThreadScheduler() : this(action => new Thread(action)) { } /// /// Gets an instance of this scheduler that uses the default Thread constructor. /// public static NewThreadScheduler Default => Instance.Value; /// /// Creates an object that schedules each unit of work on a separate thread. /// /// Factory function for thread creation. /// is null. public NewThreadScheduler(Func threadFactory) { _threadFactory = threadFactory ?? throw new ArgumentNullException(nameof(threadFactory)); } /// /// Schedules an action to be executed after dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } var scheduler = new EventLoopScheduler(_threadFactory) { ExitIfEmpty = true }; return scheduler.Schedule(state, dueTime, action); } /// /// Schedules a long-running task by creating a new thread. Cancellation happens through polling. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public IDisposable ScheduleLongRunning(TState state, Action action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } var d = new BooleanDisposable(); var thread = _threadFactory(() => { // // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning // requires us to ensure the scheduled work gets an opportunity to observe // the cancellation request. // action(state, d); }); thread.Start(); return d; } /// /// Schedules a periodic piece of work by creating a new thread that goes to sleep when work has been dispatched and wakes up again at the next periodic due time. /// /// The type of the state passed to the scheduled action. /// Initial state passed to the action upon the first iteration. /// Period for running the work periodically. /// Action to be executed, potentially updating the state. /// The disposable object used to cancel the scheduled recurring action (best effort). /// is null. /// is less than . public IDisposable SchedulePeriodic(TState state, TimeSpan period, Func action) { if (period < TimeSpan.Zero) { throw new ArgumentOutOfRangeException(nameof(period)); } if (action == null) { throw new ArgumentNullException(nameof(action)); } var periodic = new Periodic(state, period, action); var thread = _threadFactory(periodic.Run); thread.Start(); return periodic; } private sealed class Periodic : IDisposable { private readonly IStopwatch _stopwatch; private readonly TimeSpan _period; private readonly Func _action; private readonly object _cancel = new(); private volatile bool _done; private TState _state; private TimeSpan _next; public Periodic(TState state, TimeSpan period, Func action) { _stopwatch = ConcurrencyAbstractionLayer.Current.StartStopwatch(); _period = period; _action = action; _state = state; _next = period; } public void Run() { while (!_done) { var timeout = Scheduler.Normalize(_next - _stopwatch.Elapsed); lock (_cancel) { if (Monitor.Wait(_cancel, timeout)) { return; } } _state = _action(_state); _next += _period; } } public void Dispose() { _done = true; lock (_cancel) { Monitor.Pulse(_cancel); } } } /// /// Starts a new stopwatch object. /// /// New stopwatch object; started at the time of the request. public override IStopwatch StartStopwatch() { // // Strictly speaking, this explicit override is not necessary because the base implementation calls into // the enlightenment module to obtain the CAL, which would circle back to System.Reactive.PlatformServices // where we're currently running. This is merely a short-circuit to avoid the additional roundtrip. // return new StopwatchImpl(); } } }