// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Subjects
{
///
/// Represents an object that is both an observable sequence as well as an observer.
/// Each notification is broadcasted to all subscribed observers.
///
/// The type of the elements processed by the subject.
public sealed class Subject : SubjectBase
{
#region Fields
private SubjectDisposable[] _observers;
private Exception? _exception;
#pragma warning disable CA1825 // (Avoid zero-length array allocations.) The identity of these arrays matters, so we can't use the shared Array.Empty() instance
private static readonly SubjectDisposable[] Terminated = new SubjectDisposable[0];
private static readonly SubjectDisposable[] Disposed = new SubjectDisposable[0];
#pragma warning restore CA1825
#endregion
#region Constructors
///
/// Creates a subject.
///
public Subject() => _observers = Array.Empty();
#endregion
#region Properties
///
/// Indicates whether the subject has observers subscribed to it.
///
public override bool HasObservers => Volatile.Read(ref _observers).Length != 0;
///
/// Indicates whether the subject has been disposed.
///
public override bool IsDisposed => Volatile.Read(ref _observers) == Disposed;
#endregion
#region Methods
#region IObserver implementation
private static void ThrowDisposed() => throw new ObjectDisposedException(string.Empty);
///
/// Notifies all subscribed observers about the end of the sequence.
///
public override void OnCompleted()
{
for (; ; )
{
var observers = Volatile.Read(ref _observers);
if (observers == Disposed)
{
_exception = null;
ThrowDisposed();
break;
}
if (observers == Terminated)
{
break;
}
if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers)
{
foreach (var observer in observers)
{
observer.Observer?.OnCompleted();
}
break;
}
}
}
///
/// Notifies all subscribed observers about the specified exception.
///
/// The exception to send to all currently subscribed observers.
/// is null.
public override void OnError(Exception error)
{
if (error == null)
{
throw new ArgumentNullException(nameof(error));
}
for (; ; )
{
var observers = Volatile.Read(ref _observers);
if (observers == Disposed)
{
_exception = null;
ThrowDisposed();
break;
}
if (observers == Terminated)
{
break;
}
_exception = error;
if (Interlocked.CompareExchange(ref _observers, Terminated, observers) == observers)
{
foreach (var observer in observers)
{
observer.Observer?.OnError(error);
}
break;
}
}
}
///
/// Notifies all subscribed observers about the arrival of the specified element in the sequence.
///
/// The value to send to all currently subscribed observers.
public override void OnNext(T value)
{
var observers = Volatile.Read(ref _observers);
if (observers == Disposed)
{
_exception = null;
ThrowDisposed();
return;
}
foreach (var observer in observers)
{
observer.Observer?.OnNext(value);
}
}
#endregion
#region IObservable implementation
///
/// Subscribes an observer to the subject.
///
/// Observer to subscribe to the subject.
/// Disposable object that can be used to unsubscribe the observer from the subject.
/// is null.
public override IDisposable Subscribe(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}
var disposable = default(SubjectDisposable);
for (; ; )
{
var observers = Volatile.Read(ref _observers);
if (observers == Disposed)
{
_exception = null;
ThrowDisposed();
break;
}
if (observers == Terminated)
{
var ex = _exception;
if (ex != null)
{
observer.OnError(ex);
}
else
{
observer.OnCompleted();
}
break;
}
disposable ??= new SubjectDisposable(this, observer);
var n = observers.Length;
var b = new SubjectDisposable[n + 1];
Array.Copy(observers, 0, b, 0, n);
b[n] = disposable;
if (Interlocked.CompareExchange(ref _observers, b, observers) == observers)
{
return disposable;
}
}
return Disposable.Empty;
}
private void Unsubscribe(SubjectDisposable observer)
{
for (; ; )
{
var a = Volatile.Read(ref _observers);
var n = a.Length;
if (n == 0)
{
break;
}
var j = Array.IndexOf(a, observer);
if (j < 0)
{
break;
}
SubjectDisposable[] b;
if (n == 1)
{
b = Array.Empty();
}
else
{
b = new SubjectDisposable[n - 1];
Array.Copy(a, 0, b, 0, j);
Array.Copy(a, j + 1, b, j, n - j - 1);
}
if (Interlocked.CompareExchange(ref _observers, b, a) == a)
{
break;
}
}
}
private sealed class SubjectDisposable : IDisposable
{
private Subject _subject;
private volatile IObserver? _observer;
public SubjectDisposable(Subject subject, IObserver observer)
{
_subject = subject;
_observer = observer;
}
public IObserver? Observer => _observer;
public void Dispose()
{
var observer = Interlocked.Exchange(ref _observer, null);
if (observer == null)
{
return;
}
_subject.Unsubscribe(this);
_subject = null!;
}
}
#endregion
#region IDisposable implementation
///
/// Releases all resources used by the current instance of the class and unsubscribes all observers.
///
public override void Dispose()
{
Interlocked.Exchange(ref _observers, Disposed);
_exception = null;
}
#endregion
#endregion
}
}