// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; using System.Threading; namespace System.Reactive { internal interface ISink { void ForwardOnNext(TTarget value); void ForwardOnCompleted(); void ForwardOnError(Exception error); } internal abstract class Sink : ISink, IDisposable { private SingleAssignmentDisposableValue _upstream; private volatile IObserver _observer; protected Sink(IObserver observer) { _observer = observer; } public void Dispose() { if (Interlocked.Exchange(ref _observer, NopObserver.Instance) != NopObserver.Instance) Dispose(true); } /// /// Override this method to dispose additional resources. /// The method is guaranteed to be called at most once. /// /// If true, the method was called from . protected virtual void Dispose(bool disposing) { //Calling base.Dispose(true) is not a proper disposal, so we can omit the assignment here. //Sink is internal so this can pretty much be enforced. //_observer = NopObserver.Instance; _upstream.Dispose(); } public void ForwardOnNext(TTarget value) { _observer.OnNext(value); } public void ForwardOnCompleted() { _observer.OnCompleted(); Dispose(); } public void ForwardOnError(Exception error) { _observer.OnError(error); Dispose(); } protected void SetUpstream(IDisposable upstream) { _upstream.Disposable = upstream; } protected void DisposeUpstream() { _upstream.Dispose(); } } /// /// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer. /// /// Type of the resulting sequence's elements. /// /// Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer. internal abstract class Sink : Sink, IObserver { protected Sink(IObserver observer) : base(observer) { } public virtual void Run(IObservable source) { SetUpstream(source.SubscribeSafe(this)); } public abstract void OnNext(TSource value); public virtual void OnError(Exception error) => ForwardOnError(error); public virtual void OnCompleted() => ForwardOnCompleted(); public IObserver GetForwarder() => new _(this); private sealed class _ : IObserver { private readonly Sink _forward; public _(Sink forward) { _forward = forward; } public void OnNext(TTarget value) => _forward.ForwardOnNext(value); public void OnError(Exception error) => _forward.ForwardOnError(error); public void OnCompleted() => _forward.ForwardOnCompleted(); } } }