// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Subjects { /// /// Represents an object that is both an observable sequence as well as an observer. /// Each notification is broadcasted to all subscribed observers. /// /// The type of the elements processed by the subject. public sealed class Subject : SubjectBase { #region Fields private SubjectDisposable[] _observers; private Exception? _exception; #pragma warning disable CA1825 // (Avoid zero-length array allocations.) The identity of these arrays matters, so we can't use the shared Array.Empty() instance private static readonly SubjectDisposable[] Terminated = new SubjectDisposable[0]; private static readonly SubjectDisposable[] Disposed = new SubjectDisposable[0]; #pragma warning restore CA1825 #endregion #region Constructors /// /// Creates a subject. /// public Subject() => _observers = Array.Empty(); #endregion #region Properties /// /// Indicates whether the subject has observers subscribed to it. /// public override bool HasObservers => Volatile.Read(ref _observers).Length != 0; /// /// Indicates whether the subject has been disposed. /// public override bool IsDisposed => Volatile.Read(ref _observers) == Disposed; #endregion #region Methods #region IObserver implementation private static void ThrowDisposed() => throw new ObjectDisposedException(string.Empty); /// /// Notifies all subscribed observers about the end of the sequence. /// public override void OnCompleted() { for (; ; ) { var observers = Volatile.Read(ref _observers); if (observers == Disposed) { _exception = null; ThrowDisposed(); break; } if (observers == Terminated) { break; } if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers) { foreach (var observer in observers) { observer.Observer?.OnCompleted(); } break; } } } /// /// Notifies all subscribed observers about the specified exception. /// /// The exception to send to all currently subscribed observers. /// is null. public override void OnError(Exception error) { if (error == null) { throw new ArgumentNullException(nameof(error)); } for (; ; ) { var observers = Volatile.Read(ref _observers); if (observers == Disposed) { _exception = null; ThrowDisposed(); break; } if (observers == Terminated) { break; } _exception = error; if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers) { foreach (var observer in observers) { observer.Observer?.OnError(error); } break; } } } /// /// Notifies all subscribed observers about the arrival of the specified element in the sequence. /// /// The value to send to all currently subscribed observers. public override void OnNext(T value) { var observers = Volatile.Read(ref _observers); if (observers == Disposed) { _exception = null; ThrowDisposed(); return; } foreach (var observer in observers) { observer.Observer?.OnNext(value); } } #endregion #region IObservable implementation /// /// Subscribes an observer to the subject. /// /// Observer to subscribe to the subject. /// Disposable object that can be used to unsubscribe the observer from the subject. /// is null. public override IDisposable Subscribe(IObserver observer) { if (observer == null) { throw new ArgumentNullException(nameof(observer)); } var disposable = default(SubjectDisposable); for (; ; ) { var observers = Volatile.Read(ref _observers); if (observers == Disposed) { _exception = null; ThrowDisposed(); break; } if (observers == Terminated) { var ex = _exception; if (ex != null) { observer.OnError(ex); } else { observer.OnCompleted(); } break; } disposable ??= new SubjectDisposable(this, observer); var n = observers.Length; var b = new SubjectDisposable[n + 1]; Array.Copy(observers, 0, b, 0, n); b[n] = disposable; if (Interlocked.CompareExchange(ref _observers, b, observers) == observers) { return disposable; } } return Disposable.Empty; } private void Unsubscribe(SubjectDisposable observer) { for (; ; ) { var a = Volatile.Read(ref _observers); var n = a.Length; if (n == 0) { break; } var j = Array.IndexOf(a, observer); if (j < 0) { break; } SubjectDisposable[] b; if (n == 1) { b = Array.Empty(); } else { b = new SubjectDisposable[n - 1]; Array.Copy(a, 0, b, 0, j); Array.Copy(a, j + 1, b, j, n - j - 1); } if (Interlocked.CompareExchange(ref _observers, b, a) == a) { break; } } } private sealed class SubjectDisposable : IDisposable { private Subject _subject; private volatile IObserver? _observer; public SubjectDisposable(Subject subject, IObserver observer) { _subject = subject; _observer = observer; } public IObserver? Observer => _observer; public void Dispose() { var observer = Interlocked.Exchange(ref _observer, null); if (observer == null) { return; } _subject.Unsubscribe(this); _subject = null!; } } #endregion #region IDisposable implementation /// /// Releases all resources used by the current instance of the class and unsubscribes all observers. /// public override void Dispose() { Interlocked.Exchange(ref _observers, Disposed); _exception = null; } #endregion #endregion } }