// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Reactive.Subjects; using System.Threading; namespace System.Reactive.Linq.ObservableImpl { /// /// Automatically connect the upstream IConnectableObservable once the /// specified number of IObservers have subscribed to this IObservable. /// /// The upstream value type. internal sealed class AutoConnect : IObservable { private readonly IConnectableObservable _source; private readonly int _minObservers; private readonly Action? _onConnect; private int _count; internal AutoConnect(IConnectableObservable source, int minObservers, Action? onConnect) { _source = source; _minObservers = minObservers; _onConnect = onConnect; } public IDisposable Subscribe(IObserver observer) { var d = _source.Subscribe(observer); if (Volatile.Read(ref _count) < _minObservers) { if (Interlocked.Increment(ref _count) == _minObservers) { var c = _source.Connect(); _onConnect?.Invoke(c); } } return d; } } }