// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Diagnostics.CodeAnalysis; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Subjects { /// /// Represents a value that changes over time. /// Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications. /// /// The type of the elements processed by the subject. public sealed class BehaviorSubject : SubjectBase { #region Fields private readonly object _gate = new(); private ImmutableList> _observers; private bool _isStopped; private T _value; private Exception? _exception; private bool _isDisposed; #endregion #region Constructors /// /// Initializes a new instance of the class which creates a subject that caches its last value and starts with the specified value. /// /// Initial value sent to observers when no other value has been received by the subject yet. public BehaviorSubject(T value) { _value = value; _observers = ImmutableList>.Empty; } #endregion #region Properties /// /// Indicates whether the subject has observers subscribed to it. /// public override bool HasObservers => _observers?.Data.Length > 0; /// /// Indicates whether the subject has been disposed. /// public override bool IsDisposed { get { lock (_gate) { return _isDisposed; } } } /// /// Gets the current value or throws an exception. /// /// The initial value passed to the constructor until is called; after which, the last value passed to . /// /// is frozen after is called. /// After is called, always throws the specified exception. /// An exception is always thrown after is called. /// /// Reading is a thread-safe operation, though there's a potential race condition when or are being invoked concurrently. /// In some cases, it may be necessary for a caller to use external synchronization to avoid race conditions. /// /// /// Dispose was called. public T Value { get { lock (_gate) { CheckDisposed(); _exception?.Throw(); return _value; } } } #endregion #region Methods /// /// Tries to get the current value or throws an exception. /// /// The initial value passed to the constructor until is called; after which, the last value passed to . /// true if a value is available; false if the subject was disposed. /// /// The value returned from is frozen after is called. /// After is called, always throws the specified exception. /// /// Calling is a thread-safe operation, though there's a potential race condition when or are being invoked concurrently. /// In some cases, it may be necessary for a caller to use external synchronization to avoid race conditions. /// /// public bool TryGetValue([MaybeNullWhen(false)] out T value) { lock (_gate) { if (_isDisposed) { value = default; return false; } _exception?.Throw(); value = _value; return true; } } #region IObserver implementation /// /// Notifies all subscribed observers about the end of the sequence. /// public override void OnCompleted() { IObserver[]? os = null; lock (_gate) { CheckDisposed(); if (!_isStopped) { os = _observers.Data; _observers = ImmutableList>.Empty; _isStopped = true; } } if (os != null) { foreach (var o in os) { o.OnCompleted(); } } } /// /// Notifies all subscribed observers about the exception. /// /// The exception to send to all observers. /// is null. public override void OnError(Exception error) { if (error == null) { throw new ArgumentNullException(nameof(error)); } IObserver[]? os = null; lock (_gate) { CheckDisposed(); if (!_isStopped) { os = _observers.Data; _observers = ImmutableList>.Empty; _isStopped = true; _exception = error; } } if (os != null) { foreach (var o in os) { o.OnError(error); } } } /// /// Notifies all subscribed observers about the arrival of the specified element in the sequence. /// /// The value to send to all observers. public override void OnNext(T value) { IObserver[]? os = null; lock (_gate) { CheckDisposed(); if (!_isStopped) { _value = value; os = _observers.Data; } } if (os != null) { foreach (var o in os) { o.OnNext(value); } } } #endregion #region IObservable implementation /// /// Subscribes an observer to the subject. /// /// Observer to subscribe to the subject. /// Disposable object that can be used to unsubscribe the observer from the subject. /// is null. public override IDisposable Subscribe(IObserver observer) { if (observer == null) { throw new ArgumentNullException(nameof(observer)); } Exception? ex; lock (_gate) { CheckDisposed(); if (!_isStopped) { _observers = _observers.Add(observer); observer.OnNext(_value); return new Subscription(this, observer); } ex = _exception; } if (ex != null) { observer.OnError(ex); } else { observer.OnCompleted(); } return Disposable.Empty; } private void Unsubscribe(IObserver observer) { lock (_gate) { if (!_isDisposed) { _observers = _observers.Remove(observer); } } } #endregion #region IDisposable implementation /// /// Unsubscribe all observers and release resources. /// public override void Dispose() { lock (_gate) { _isDisposed = true; _observers = null!; // NB: Disposed checks happen prior to accessing _observers. _value = default!; _exception = null; } } private void CheckDisposed() { if (_isDisposed) { throw new ObjectDisposedException(string.Empty); } } #endregion private sealed class Subscription : IDisposable { private BehaviorSubject _subject; private IObserver? _observer; public Subscription(BehaviorSubject subject, IObserver observer) { _subject = subject; _observer = observer; } public void Dispose() { var observer = Interlocked.Exchange(ref _observer, null); if (observer == null) { return; } _subject.Unsubscribe(observer); _subject = null!; } } #endregion } }