// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Security; namespace System.Runtime.CompilerServices { /// /// Represents a builder for asynchronous methods that return a task-like . /// /// The type of the elements in the sequence. #pragma warning disable CA1815 // (Override equals on value types.) Method only meant to be used by await/async generated code, so equality is not required. public struct TaskObservableMethodBuilder #pragma warning restore CA1815 { /// /// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous /// method whose return type is a task-like . /// private IAsyncStateMachine _stateMachine; /// /// The underlying observable sequence representing the result produced by the asynchronous method. /// private TaskObservable _inner; /// /// Creates an instance of the struct. /// /// A new instance of the struct. #pragma warning disable CA1000 // (Do not declare static members on generic types.) Async method builders are required to define a static Create method, and are require to be generic when the async type produces a result. public static TaskObservableMethodBuilder Create() => default; #pragma warning restore CA1000 // Do not declare static members on generic types /// /// Begins running the builder with the associated state machine. /// /// The type of the state machine. /// The state machine instance, passed by reference. /// is null. #pragma warning disable CA1045 // (Avoid ref.) Required because this is an async method builder public void Start(ref TStateMachine stateMachine) #pragma warning restore CA1045 where TStateMachine : IAsyncStateMachine { if (stateMachine == null) { throw new ArgumentNullException(nameof(stateMachine)); } stateMachine.MoveNext(); } /// /// Associates the builder with the specified state machine. /// /// The state machine instance to associate with the builder. /// is null. /// The state machine was previously set. public void SetStateMachine(IAsyncStateMachine stateMachine) { if (_stateMachine != null) { throw new InvalidOperationException(); } _stateMachine = stateMachine ?? throw new ArgumentNullException(nameof(stateMachine)); } /// /// Marks the observable as successfully completed. /// /// The result to use to complete the observable sequence. /// The observable has already completed. public void SetResult(T result) { if (_inner == null) { _inner = new TaskObservable(result); } else { _inner.SetResult(result); } } /// /// Marks the observable as failed and binds the specified exception to the observable sequence. /// /// The exception to bind to the observable sequence. /// is null. /// The observable has already completed. public void SetException(Exception exception) { if (exception == null) { throw new ArgumentNullException(nameof(exception)); } if (_inner == null) { _inner = new TaskObservable(exception); } else { _inner.SetException(exception); } } /// /// Gets the observable sequence for this builder. /// public ITaskObservable Task => _inner ??= new TaskObservable(); /// /// Schedules the state machine to proceed to the next action when the specified awaiter completes. /// /// The type of the awaiter. /// The type of the state machine. /// The awaiter. /// The state machine. #pragma warning disable CA1045 // (Avoid ref.) Required because this is an async method builder public void AwaitOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine) #pragma warning restore CA1045 where TAwaiter : INotifyCompletion where TStateMachine : IAsyncStateMachine { try { if (_stateMachine == null) { var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready. _stateMachine = stateMachine; _stateMachine.SetStateMachine(_stateMachine); } // NB: Rx has historically not bothered with execution contexts, so we don't do it here either. awaiter.OnCompleted(_stateMachine.MoveNext); } catch (Exception ex) { // NB: Prevent reentrancy into the async state machine when an exception would be observed // by the caller. This could cause concurrent execution of the async method. Instead, // rethrow the exception elsewhere. Rethrow(ex); } } /// /// Schedules the state machine to proceed to the next action when the specified awaiter completes. /// /// The type of the awaiter. /// The type of the state machine. /// The awaiter. /// The state machine. [SecuritySafeCritical] #pragma warning disable CA1045 // (Avoid ref.) Required because this is an async method builder public void AwaitUnsafeOnCompleted(ref TAwaiter awaiter, ref TStateMachine stateMachine) #pragma warning restore CA1045 where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine { try { if (_stateMachine == null) { var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready. _stateMachine = stateMachine; _stateMachine.SetStateMachine(_stateMachine); } // NB: Rx has historically not bothered with execution contexts, so we don't do it here either. awaiter.UnsafeOnCompleted(_stateMachine.MoveNext); } catch (Exception ex) { // NB: Prevent reentrancy into the async state machine when an exception would be observed // by the caller. This could cause concurrent execution of the async method. Instead, // rethrow the exception elsewhere. Rethrow(ex); } } /// /// Rethrows an exception that was thrown from an awaiter's OnCompleted methods. /// /// The exception to rethrow. private static void Rethrow(Exception exception) { Scheduler.Default.Schedule(exception, (ex, recurse) => ex.Throw()); } /// /// Implementation of the IObservable<T> interface compatible with async method return types. /// /// /// This class implements a "task-like" type that can be used as the return type of an asynchronous /// method in C# 7.0 and beyond. For example: /// /// async Observable<int> RxAsync() /// { /// var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1)); /// return res * 2; /// } /// /// internal sealed class TaskObservable : ITaskObservable, ITaskObservableAwaiter { /// /// The underlying observable sequence to subscribe to in case the asynchronous method did not /// finish synchronously. /// private readonly AsyncSubject? _subject; /// /// The result returned by the asynchronous method in case the method finished synchronously. /// private readonly T? _result; /// /// The exception thrown by the asynchronous method in case the method finished synchronously. /// private readonly Exception? _exception; /// /// Creates a new for an asynchronous method that has not finished yet. /// public TaskObservable() { _subject = new AsyncSubject(); } /// /// Creates a new for an asynchronous method that synchronously returned /// the specified value. /// /// The result returned by the asynchronous method. public TaskObservable(T result) { _result = result; } /// /// Creates a new for an asynchronous method that synchronously threw /// the specified . /// /// The exception thrown by the asynchronous method. public TaskObservable(Exception exception) { _exception = exception; } /// /// Marks the observable as successfully completed. /// /// The result to use to complete the observable sequence. /// The observable has already completed. public void SetResult(T result) { if (IsCompleted) { throw new InvalidOperationException(); } _subject!.OnNext(result); _subject.OnCompleted(); } /// /// Marks the observable as failed and binds the specified exception to the observable sequence. /// /// The exception to bind to the observable sequence. /// is null. /// The observable has already completed. public void SetException(Exception exception) { if (IsCompleted) { throw new InvalidOperationException(); } _subject!.OnError(exception); } /// /// Subscribes the given observer to the observable sequence. /// /// Observer that will receive notifications from the observable sequence. /// Disposable object representing an observer's subscription to the observable sequence. /// is null. public IDisposable Subscribe(IObserver observer) { if (_subject != null) { return _subject.Subscribe(observer); } if (_exception != null) { observer.OnError(_exception); return Disposable.Empty; } observer.OnNext(_result!); return Disposable.Empty; } /// /// Gets an awaiter that can be used to await the eventual completion of the observable sequence. /// /// An awaiter that can be used to await the eventual completion of the observable sequence. public ITaskObservableAwaiter GetAwaiter() => this; /// /// Gets a Boolean indicating whether the observable sequence has completed. /// public bool IsCompleted => _subject?.IsCompleted ?? true; /// /// Gets the result produced by the observable sequence. /// /// The result produced by the observable sequence. public T GetResult() { if (_subject != null) { return _subject.GetResult(); } _exception?.Throw(); return _result!; } /// /// Attaches the specified to the observable sequence. /// /// The continuation to attach. public void OnCompleted(Action continuation) { if (_subject != null) { _subject.OnCompleted(continuation); } else { continuation(); } } } } }