// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Disposables; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { internal sealed class Catch : Producer._> { private readonly IEnumerable> _sources; public Catch(IEnumerable> sources) { _sources = sources; } protected override _ CreateSink(IObserver observer) => new(observer); protected override void Run(_ sink) => sink.Run(_sources); internal sealed class _ : TailRecursiveSink { public _(IObserver observer) : base(observer) { } protected override IEnumerable>? Extract(IObservable source) { if (source is Catch @catch) { return @catch._sources; } return null; } private Exception? _lastException; public override void OnError(Exception error) { _lastException = error; Recurse(); } protected override void Done() { if (_lastException != null) { ForwardOnError(_lastException); } else { ForwardOnCompleted(); } } protected override bool Fail(Exception error) { // // Note that the invocation of _recurse in OnError will // cause the next MoveNext operation to be enqueued, so // we will still return to the caller immediately. // OnError(error); return true; } } } internal sealed class Catch : Producer._> where TException : Exception { private readonly IObservable _source; private readonly Func> _handler; public Catch(IObservable source, Func> handler) { _source = source; _handler = handler; } protected override _ CreateSink(IObserver observer) => new(_handler, observer); protected override void Run(_ sink) => sink.Run(_source); internal sealed class _ : IdentitySink { private readonly Func> _handler; public _(Func> handler, IObserver observer) : base(observer) { _handler = handler; } private bool _once; private SerialDisposableValue _subscription; public override void Run(IObservable source) { _subscription.TrySetFirst(source.SubscribeSafe(this)); } protected override void Dispose(bool disposing) { if (disposing) { _subscription.Dispose(); } base.Dispose(disposing); } public override void OnError(Exception error) { if (!Volatile.Read(ref _once) && error is TException e) { IObservable result; try { result = _handler(e); } catch (Exception ex) { ForwardOnError(ex); return; } Volatile.Write(ref _once, true); _subscription.Disposable = result.SubscribeSafe(this); } else { ForwardOnError(error); } } } } }