// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; namespace System.Reactive.Concurrency { /// /// Represents an object that schedules units of work to run immediately on the current thread. /// /// Singleton instance of this type exposed through this static property. public sealed class ImmediateScheduler : LocalScheduler { private static readonly Lazy StaticInstance = new(static () => new ImmediateScheduler()); private ImmediateScheduler() { } /// /// Gets the singleton instance of the immediate scheduler. /// public static ImmediateScheduler Instance => StaticInstance.Value; /// /// Schedules an action to be executed. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } return action(new AsyncLockScheduler(), state); } /// /// Schedules an action to be executed after dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Relative time after which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } var dt = Scheduler.Normalize(dueTime); if (dt.Ticks > 0) { ConcurrencyAbstractionLayer.Current.Sleep(dt); } return action(new AsyncLockScheduler(), state); } private sealed class AsyncLockScheduler : LocalScheduler { private AsyncLock? _asyncLock; public override IDisposable Schedule(TState state, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } var m = new SingleAssignmentDisposable(); _asyncLock ??= new AsyncLock(); _asyncLock.Wait( (@this: this, m, action, state), tuple => { if (!tuple.m.IsDisposed) { tuple.m.Disposable = tuple.action(tuple.@this, tuple.state); } }); return m; } public override IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } if (dueTime.Ticks <= 0) { return Schedule(state, action); } return ScheduleSlow(state, dueTime, action); } private IDisposable ScheduleSlow(TState state, TimeSpan dueTime, Func action) { var timer = ConcurrencyAbstractionLayer.Current.StartStopwatch(); var m = new SingleAssignmentDisposable(); _asyncLock ??= new AsyncLock(); _asyncLock.Wait( (@this: this, m, state, action, timer, dueTime), tuple => { if (!tuple.m.IsDisposed) { var sleep = tuple.dueTime - tuple.timer.Elapsed; if (sleep.Ticks > 0) { ConcurrencyAbstractionLayer.Current.Sleep(sleep); } if (!tuple.m.IsDisposed) { tuple.m.Disposable = tuple.action(tuple.@this, tuple.state); } } }); return m; } } } }