// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Globalization; namespace System.Reactive.Concurrency { /// /// Base class for virtual time schedulers. /// /// Absolute time representation type. /// Relative time representation type. public abstract class VirtualTimeSchedulerBase : IScheduler, IServiceProvider, IStopwatchProvider where TAbsolute : IComparable { /// /// Creates a new virtual time scheduler with the default value of TAbsolute as the initial clock value. /// protected VirtualTimeSchedulerBase() : this(default!, Comparer.Default) { // // NB: We allow a default value for TAbsolute here, which typically is a struct. For compat reasons, we can't // add a generic constraint (either struct or, better, new()), and maybe a derived class has handled null // in all abstract methods. // } /// /// Creates a new virtual time scheduler with the specified initial clock value and absolute time comparer. /// /// Initial value for the clock. /// Comparer to determine causality of events based on absolute time. /// is null. protected VirtualTimeSchedulerBase(TAbsolute initialClock, IComparer comparer) { Clock = initialClock; Comparer = comparer ?? throw new ArgumentNullException(nameof(comparer)); } /// /// Adds a relative time value to an absolute time value. /// /// Absolute time value. /// Relative time value to add. /// The resulting absolute time sum value. protected abstract TAbsolute Add(TAbsolute absolute, TRelative relative); /// /// Converts the absolute time value to a DateTimeOffset value. /// /// Absolute time value to convert. /// The corresponding DateTimeOffset value. protected abstract DateTimeOffset ToDateTimeOffset(TAbsolute absolute); /// /// Converts the TimeSpan value to a relative time value. /// /// TimeSpan value to convert. /// The corresponding relative time value. protected abstract TRelative ToRelative(TimeSpan timeSpan); /// /// Gets whether the scheduler is enabled to run work. /// public bool IsEnabled { get; private set; } /// /// Gets the comparer used to compare absolute time values. /// protected IComparer Comparer { get; } /// /// Schedules an action to be executed at dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Absolute time at which to execute the action. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). public abstract IDisposable ScheduleAbsolute(TState state, TAbsolute dueTime, Func action); /// /// Schedules an action to be executed at dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Relative time after which to execute the action. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). public IDisposable ScheduleRelative(TState state, TRelative dueTime, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } var runAt = Add(Clock, dueTime); return ScheduleAbsolute(state, runAt, action); } /// /// Schedules an action to be executed. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public IDisposable Schedule(TState state, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleAbsolute(state, Clock, action); } /// /// Schedules an action to be executed after dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Relative time after which to execute the action. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleRelative(state, ToRelative(dueTime), action); } /// /// Schedules an action to be executed at dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Absolute time at which to execute the action. /// Action to be executed. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } return ScheduleRelative(state, ToRelative(dueTime - Now), action); } /// /// Starts the virtual time scheduler. /// public void Start() { if (!IsEnabled) { IsEnabled = true; do { var next = GetNext(); if (next != null) { if (Comparer.Compare(next.DueTime, Clock) > 0) { Clock = next.DueTime; } next.Invoke(); } else { IsEnabled = false; } } while (IsEnabled); } } /// /// Stops the virtual time scheduler. /// public void Stop() { IsEnabled = false; } /// /// Advances the scheduler's clock to the specified time, running all work till that point. /// /// Absolute time to advance the scheduler's clock to. /// is in the past. /// The scheduler is already running. VirtualTimeScheduler doesn't support running nested work dispatch loops. To simulate time slippage while running work on the scheduler, use . public void AdvanceTo(TAbsolute time) { var dueToClock = Comparer.Compare(time, Clock); if (dueToClock < 0) { throw new ArgumentOutOfRangeException(nameof(time)); } if (dueToClock == 0) { return; } if (!IsEnabled) { IsEnabled = true; do { var next = GetNext(); if (next != null && Comparer.Compare(next.DueTime, time) <= 0) { if (Comparer.Compare(next.DueTime, Clock) > 0) { Clock = next.DueTime; } next.Invoke(); } else { IsEnabled = false; } } while (IsEnabled); Clock = time; } else { throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.CANT_ADVANCE_WHILE_RUNNING, nameof(AdvanceTo))); } } /// /// Advances the scheduler's clock by the specified relative time, running all work scheduled for that timespan. /// /// Relative time to advance the scheduler's clock by. /// is negative. /// The scheduler is already running. VirtualTimeScheduler doesn't support running nested work dispatch loops. To simulate time slippage while running work on the scheduler, use . public void AdvanceBy(TRelative time) { var dt = Add(Clock, time); var dueToClock = Comparer.Compare(dt, Clock); if (dueToClock < 0) { throw new ArgumentOutOfRangeException(nameof(time)); } if (dueToClock == 0) { return; } if (!IsEnabled) { AdvanceTo(dt); } else { throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Strings_Linq.CANT_ADVANCE_WHILE_RUNNING, nameof(AdvanceBy))); } } /// /// Advances the scheduler's clock by the specified relative time. /// /// Relative time to advance the scheduler's clock by. /// is negative. public void Sleep(TRelative time) { var dt = Add(Clock, time); var dueToClock = Comparer.Compare(dt, Clock); if (dueToClock < 0) { throw new ArgumentOutOfRangeException(nameof(time)); } Clock = dt; } /// /// Gets the scheduler's absolute time clock value. /// public TAbsolute Clock { get; protected set; } /// /// Gets the scheduler's notion of current time. /// public DateTimeOffset Now => ToDateTimeOffset(Clock); /// /// Gets the next scheduled item to be executed. /// /// The next scheduled item. protected abstract IScheduledItem? GetNext(); object? IServiceProvider.GetService(Type serviceType) => GetService(serviceType); /// /// Discovers scheduler services by interface type. The base class implementation supports /// only the IStopwatchProvider service. To influence service discovery - such as adding /// support for other scheduler services - derived types can override this method. /// /// Scheduler service interface type to discover. /// Object implementing the requested service, if available; null otherwise. protected virtual object? GetService(Type serviceType) { if (serviceType == typeof(IStopwatchProvider)) { return this; } return null; } /// /// Starts a new stopwatch object. /// /// New stopwatch object; started at the time of the request. public IStopwatch StartStopwatch() { var start = ClockToDateTimeOffset(); return new VirtualTimeStopwatch(this, start); } private DateTimeOffset ClockToDateTimeOffset() => ToDateTimeOffset(Clock); private sealed class VirtualTimeStopwatch : IStopwatch { private readonly VirtualTimeSchedulerBase _parent; private readonly DateTimeOffset _start; public VirtualTimeStopwatch(VirtualTimeSchedulerBase parent, DateTimeOffset start) { _parent = parent; _start = start; } public TimeSpan Elapsed => _parent.ClockToDateTimeOffset() - _start; } } /// /// Base class for virtual time schedulers using a priority queue for scheduled items. /// /// Absolute time representation type. /// Relative time representation type. public abstract class VirtualTimeScheduler : VirtualTimeSchedulerBase where TAbsolute : IComparable { private readonly SchedulerQueue _queue = new(); /// /// Creates a new virtual time scheduler with the default value of TAbsolute as the initial clock value. /// protected VirtualTimeScheduler() { } /// /// Creates a new virtual time scheduler. /// /// Initial value for the clock. /// Comparer to determine causality of events based on absolute time. /// is null. protected VirtualTimeScheduler(TAbsolute initialClock, IComparer comparer) : base(initialClock, comparer) { } /// /// Gets the next scheduled item to be executed. /// /// The next scheduled item. protected override IScheduledItem? GetNext() { lock (_queue) { while (_queue.Count > 0) { var next = _queue.Peek(); if (next.IsCanceled) { _queue.Dequeue(); } else { return next; } } } return null; } /// /// Schedules an action to be executed at dueTime. /// /// The type of the state passed to the scheduled action. /// State passed to the action to be executed. /// Action to be executed. /// Absolute time at which to execute the action. /// The disposable object used to cancel the scheduled action (best effort). /// is null. public override IDisposable ScheduleAbsolute(TState state, TAbsolute dueTime, Func action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } ScheduledItem? si = null; var run = new Func((scheduler, state1) => { lock (_queue) { _queue.Remove(si!); // NB: Assigned before function is invoked. } return action(scheduler, state1); }); si = new ScheduledItem(this, state, run, dueTime, Comparer); lock (_queue) { _queue.Enqueue(si); } return si; } } }