// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections; using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { #region Binary internal static class Zip { internal sealed class Observable : Producer { private readonly IObservable _first; private readonly IObservable _second; private readonly Func _resultSelector; public Observable(IObservable first, IObservable second, Func resultSelector) { _first = first; _second = second; _resultSelector = resultSelector; } protected override _ CreateSink(IObserver observer) => new(_resultSelector, observer); protected override void Run(_ sink) => sink.Run(_first, _second); internal sealed class _ : IdentitySink { private readonly Func _resultSelector; private readonly object _gate; private readonly FirstObserver _firstObserver; private SingleAssignmentDisposableValue _firstDisposable; private readonly SecondObserver _secondObserver; private SingleAssignmentDisposableValue _secondDisposable; public _(Func resultSelector, IObserver observer) : base(observer) { _gate = new object(); _firstObserver = new FirstObserver(this); _secondObserver = new SecondObserver(this); _firstObserver.SetOther(_secondObserver); _secondObserver.SetOther(_firstObserver); _resultSelector = resultSelector; } public void Run(IObservable first, IObservable second) { _firstDisposable.Disposable = first.SubscribeSafe(_firstObserver); _secondDisposable.Disposable = second.SubscribeSafe(_secondObserver); } protected override void Dispose(bool disposing) { if (disposing) { _firstDisposable.Dispose(); _secondDisposable.Dispose(); // clearing the queue should happen under the lock // as they are plain Queues, not concurrent queues. lock (_gate) { _firstObserver.Dispose(); _secondObserver.Dispose(); } } base.Dispose(disposing); } private sealed class FirstObserver : IObserver, IDisposable { private readonly _ _parent; private readonly Queue _queue; private SecondObserver _other; public FirstObserver(_ parent) { _parent = parent; _queue = new Queue(); _other = default!; // NB: Will be set by SetOther. } public void SetOther(SecondObserver other) { _other = other; } public Queue Queue => _queue; public bool Done { get; private set; } public void OnNext(TFirst value) { lock (_parent._gate) { if (_other.Queue.Count > 0) { var r = _other.Queue.Dequeue(); TResult res; try { res = _parent._resultSelector(value, r); } catch (Exception ex) { _parent.ForwardOnError(ex); return; } _parent.ForwardOnNext(res); } else { if (_other.Done) { _parent.ForwardOnCompleted(); return; } _queue.Enqueue(value); } } } public void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public void OnCompleted() { lock (_parent._gate) { Done = true; if (_other.Done) { _parent.ForwardOnCompleted(); } else { _parent._firstDisposable.Dispose(); } } } public void Dispose() { _queue.Clear(); } } private sealed class SecondObserver : IObserver, IDisposable { private readonly _ _parent; private readonly Queue _queue; private FirstObserver _other; public SecondObserver(_ parent) { _parent = parent; _queue = new Queue(); _other = default!; // NB: Will be set by SetOther. } public void SetOther(FirstObserver other) { _other = other; } public Queue Queue => _queue; public bool Done { get; private set; } public void OnNext(TSecond value) { lock (_parent._gate) { if (_other.Queue.Count > 0) { var l = _other.Queue.Dequeue(); TResult res; try { res = _parent._resultSelector(l, value); } catch (Exception ex) { _parent.ForwardOnError(ex); return; } _parent.ForwardOnNext(res); } else { if (_other.Done) { _parent.ForwardOnCompleted(); return; } _queue.Enqueue(value); } } } public void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public void OnCompleted() { lock (_parent._gate) { Done = true; if (_other.Done) { _parent.ForwardOnCompleted(); } else { _parent._secondDisposable.Dispose(); } } } public void Dispose() { _queue.Clear(); } } } } internal sealed class Enumerable : Producer { private readonly IObservable _first; private readonly IEnumerable _second; private readonly Func _resultSelector; public Enumerable(IObservable first, IEnumerable second, Func resultSelector) { _first = first; _second = second; _resultSelector = resultSelector; } protected override _ CreateSink(IObserver observer) => new(_resultSelector, observer); protected override void Run(_ sink) => sink.Run(_first, _second); internal sealed class _ : Sink { private readonly Func _resultSelector; public _(Func resultSelector, IObserver observer) : base(observer) { _resultSelector = resultSelector; } private int _enumerationInProgress; private IEnumerator? _rightEnumerator; private static readonly IEnumerator DisposedEnumerator = MakeDisposedEnumerator(); private static IEnumerator MakeDisposedEnumerator() { yield break; } public void Run(IObservable first, IEnumerable second) { // // Notice the evaluation order of obtaining the enumerator and subscribing to the // observable sequence is reversed compared to the operator's signature. This is // required to make sure the enumerator is available as soon as the observer can // be called. Otherwise, we end up having a race for the initialization and use // of the _rightEnumerator field. // try { var enumerator = second.GetEnumerator(); if (Interlocked.CompareExchange(ref _rightEnumerator, enumerator, null) != null) { enumerator.Dispose(); return; } } catch (Exception exception) { ForwardOnError(exception); return; } Run(first); } protected override void Dispose(bool disposing) { if (disposing) { if (Interlocked.Increment(ref _enumerationInProgress) == 1) { Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose(); } } base.Dispose(disposing); } public override void OnNext(TFirst value) { var currentEnumerator = Volatile.Read(ref _rightEnumerator); if (currentEnumerator == DisposedEnumerator) { return; } if (Interlocked.Increment(ref _enumerationInProgress) != 1) { return; } bool hasNext; TSecond? right = default; var wasDisposed = false; try { try { hasNext = currentEnumerator!.MoveNext(); if (hasNext) { right = currentEnumerator.Current; } } finally { if (Interlocked.Decrement(ref _enumerationInProgress) != 0) { Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose(); wasDisposed = true; } } } catch (Exception ex) { ForwardOnError(ex); return; } if (wasDisposed) { return; } if (hasNext) { TResult result; try { result = _resultSelector(value, right!); // NB: Not null when hasNext is true. } catch (Exception ex) { ForwardOnError(ex); return; } ForwardOnNext(result); } else { ForwardOnCompleted(); } } } } } #endregion #region [3,16]-ary #region Helpers for n-ary overloads internal interface IZip { void Next(int index); void Fail(Exception error); void Done(int index); } internal abstract class ZipSink : IdentitySink, IZip { protected readonly object _gate; private readonly ICollection[] _queues; private readonly bool[] _isDone; protected ZipSink(int arity, IObserver observer) : base(observer) { _gate = new object(); _isDone = new bool[arity]; _queues = new ICollection[arity]; } public ICollection[] Queues => _queues; public void Next(int index) { var hasValueAll = true; foreach (var queue in _queues) { if (queue.Count == 0) { hasValueAll = false; break; } } if (hasValueAll) { TResult res; try { res = GetResult(); } catch (Exception ex) { ForwardOnError(ex); return; } ForwardOnNext(res); } else { var allOthersDone = true; for (var i = 0; i < _isDone.Length; i++) { if (i != index && !_isDone[i]) { allOthersDone = false; break; } } if (allOthersDone) { ForwardOnCompleted(); } } } protected abstract TResult GetResult(); public void Fail(Exception error) { ForwardOnError(error); } public void Done(int index) { _isDone[index] = true; var allDone = true; foreach (var isDone in _isDone) { if (!isDone) { allDone = false; break; } } if (allDone) { ForwardOnCompleted(); } } } internal sealed class ZipObserver : SafeObserver { private readonly object _gate; private readonly IZip _parent; private readonly int _index; private readonly Queue _values; public ZipObserver(object gate, IZip parent, int index) { _gate = gate; _parent = parent; _index = index; _values = new Queue(); } public Queue Values => _values; protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { lock (_gate) { _values.Clear(); } } } public override void OnNext(T value) { lock (_gate) { _values.Enqueue(value); _parent.Next(_index); } } public override void OnError(Exception error) { Dispose(); lock (_gate) { _parent.Fail(error); } } public override void OnCompleted() { // Calling Dispose() here would clear the queue prematurely. // We only need to dispose the IDisposable of the upstream, // which is done by SafeObserver.Dispose(bool). base.Dispose(true); lock (_gate) { _parent.Done(_index); } } } #endregion #endregion #region N-ary internal sealed class Zip : Producer, Zip._> { private readonly IEnumerable> _sources; public Zip(IEnumerable> sources) { _sources = sources; } protected override _ CreateSink(IObserver> observer) => new(observer); protected override void Run(_ sink) => sink.Run(_sources); internal sealed class _ : IdentitySink> { private readonly object _gate; public _(IObserver> observer) : base(observer) { _gate = new object(); // NB: These will be set in Run before getting used. _queues = null!; _isDone = null!; } private Queue[] _queues; private bool[] _isDone; private SingleAssignmentDisposableValue[]? _subscriptions; public void Run(IEnumerable> sources) { var srcs = sources.ToArray(); var N = srcs.Length; _queues = new Queue[N]; for (var i = 0; i < N; i++) { _queues[i] = new Queue(); } _isDone = new bool[N]; var subscriptions = new SingleAssignmentDisposableValue[N]; if (Interlocked.CompareExchange(ref _subscriptions, subscriptions, null) == null) { for (var i = 0; i < N; i++) { var o = new SourceObserver(this, i); subscriptions[i].Disposable = srcs[i].SubscribeSafe(o); } } } protected override void Dispose(bool disposing) { if (disposing) { var subscriptions = Interlocked.Exchange(ref _subscriptions, Array.Empty()); if (subscriptions != null && subscriptions != Array.Empty()) { for (var i = 0; i < subscriptions.Length; i++) { subscriptions[i].Dispose(); } lock (_gate) { foreach (var q in _queues) { q.Clear(); } } } } base.Dispose(disposing); } private void OnNext(int index, TSource value) { lock (_gate) { _queues[index].Enqueue(value); if (_queues.All(q => q.Count > 0)) { var n = _queues.Length; var res = new List(n); for (var i = 0; i < n; i++) { res.Add(_queues[i].Dequeue()); } ForwardOnNext(res); } else if (_isDone.AllExcept(index)) { ForwardOnCompleted(); } } } private new void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } private void OnCompleted(int index) { lock (_gate) { _isDone[index] = true; if (_isDone.All()) { ForwardOnCompleted(); } else { var subscriptions = Volatile.Read(ref _subscriptions); if (subscriptions != null && subscriptions != Array.Empty()) { subscriptions[index].Dispose(); } } } } private sealed class SourceObserver : IObserver { private readonly _ _parent; private readonly int _index; public SourceObserver(_ parent, int index) { _parent = parent; _index = index; } public void OnNext(TSource value) { _parent.OnNext(_index, value); } public void OnError(Exception error) { _parent.OnError(error); } public void OnCompleted() { _parent.OnCompleted(_index); } } } } #endregion }