// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq.ObservableImpl { internal static class Merge { internal sealed class ObservablesMaxConcurrency : Producer { private readonly IObservable> _sources; private readonly int _maxConcurrent; public ObservablesMaxConcurrency(IObservable> sources, int maxConcurrent) { _sources = sources; _maxConcurrent = maxConcurrent; } protected override _ CreateSink(IObserver observer) => new(_maxConcurrent, observer); protected override void Run(_ sink) => sink.Run(_sources); internal sealed class _ : Sink, TSource> { private readonly object _gate = new(); private readonly int _maxConcurrent; private readonly Queue> _q = new(); private readonly CompositeDisposable _group = new(); public _(int maxConcurrent, IObserver observer) : base(observer) { _maxConcurrent = maxConcurrent; } private volatile bool _isStopped; private int _activeCount; public override void OnNext(IObservable value) { lock (_gate) { if (_activeCount < _maxConcurrent) { _activeCount++; Subscribe(value); } else { _q.Enqueue(value); } } } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { lock (_gate) { _isStopped = true; if (_activeCount == 0) { ForwardOnCompleted(); } else { DisposeUpstream(); } } } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { _group.Dispose(); } } private void Subscribe(IObservable innerSource) { var innerObserver = new InnerObserver(this); _group.Add(innerObserver); innerObserver.SetResource(innerSource.SubscribeSafe(innerObserver)); } private sealed class InnerObserver : SafeObserver { private readonly _ _parent; public InnerObserver(_ parent) { _parent = parent; } public override void OnNext(TSource value) { lock (_parent._gate) { _parent.ForwardOnNext(value); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { _parent._group.Remove(this); lock (_parent._gate) { if (_parent._q.Count > 0) { var s = _parent._q.Dequeue(); _parent.Subscribe(s); } else { _parent._activeCount--; if (_parent._isStopped && _parent._activeCount == 0) { _parent.ForwardOnCompleted(); } } } } } } } internal sealed class Observables : Producer { private readonly IObservable> _sources; public Observables(IObservable> sources) { _sources = sources; } protected override _ CreateSink(IObserver observer) => new(observer); protected override void Run(_ sink) => sink.Run(_sources); internal sealed class _ : Sink, TSource> { private readonly object _gate = new(); private readonly CompositeDisposable _group = new(); public _(IObserver observer) : base(observer) { } private volatile bool _isStopped; public override void OnNext(IObservable value) { var innerObserver = new InnerObserver(this); _group.Add(innerObserver); innerObserver.SetResource(value.SubscribeSafe(innerObserver)); } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { _isStopped = true; if (_group.Count == 0) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_gate) { ForwardOnCompleted(); } } else { DisposeUpstream(); } } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing) { _group.Dispose(); } } private sealed class InnerObserver : SafeObserver { private readonly _ _parent; public InnerObserver(_ parent) { _parent = parent; } public override void OnNext(TSource value) { lock (_parent._gate) { _parent.ForwardOnNext(value); } } public override void OnError(Exception error) { lock (_parent._gate) { _parent.ForwardOnError(error); } } public override void OnCompleted() { _parent._group.Remove(this); if (_parent._isStopped && _parent._group.Count == 0) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_parent._gate) { _parent.ForwardOnCompleted(); } } } } } } internal sealed class Tasks : Producer { private readonly IObservable> _sources; public Tasks(IObservable> sources) { _sources = sources; } protected override _ CreateSink(IObserver observer) => new(observer); protected override void Run(_ sink) => sink.Run(_sources); internal sealed class _ : Sink, TSource> { private readonly object _gate = new(); private readonly CancellationTokenSource _cts = new(); public _(IObserver observer) : base(observer) { } private volatile int _count = 1; public override void OnNext(Task value) { Interlocked.Increment(ref _count); if (value.IsCompleted) { OnCompletedTask(value); } else { value.ContinueWith((t, thisObject) => ((_)thisObject!).OnCompletedTask(t), this, _cts.Token); } } private void OnCompletedTask(Task task) { switch (task.Status) { case TaskStatus.RanToCompletion: { lock (_gate) { ForwardOnNext(task.Result); } OnCompleted(); } break; case TaskStatus.Faulted: { lock (_gate) { ForwardOnError(TaskHelpers.GetSingleException(task)); } } break; case TaskStatus.Canceled: { lock (_gate) { ForwardOnError(new TaskCanceledException(task)); } } break; } } public override void OnError(Exception error) { lock (_gate) { ForwardOnError(error); } } public override void OnCompleted() { if (Interlocked.Decrement(ref _count) == 0) { lock (_gate) { ForwardOnCompleted(); } } } protected override void Dispose(bool disposing) { if (disposing) _cts.Cancel(); base.Dispose(disposing); } } } } }