using System;
using System.Data;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Transactions;
using Npgsql.Internal;
using Npgsql.Logging;
using Npgsql.Util;
namespace Npgsql;
class ConnectorPool : ConnectorSource
{
#region Fields and properties
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(ConnectorPool));
readonly int _max;
readonly int _min;
readonly TimeSpan _connectionLifetime;
volatile int _numConnectors;
volatile int _idleCount;
///
/// Tracks all connectors currently managed by this pool, whether idle or busy.
/// Only updated rarely - when physical connections are opened/closed - but is read in perf-sensitive contexts.
///
private protected readonly NpgsqlConnector?[] Connectors;
readonly MultiHostConnectorPool? _parentPool;
///
/// Reader side for the idle connector channel. Contains nulls in order to release waiting attempts after
/// a connector has been physically closed/broken.
///
readonly ChannelReader _idleConnectorReader;
internal ChannelWriter IdleConnectorWriter { get; }
///
/// Incremented every time this pool is cleared via or
/// . Allows us to identify connections which were
/// created before the clear.
///
volatile int _clearCounter;
static readonly TimerCallback PruningTimerCallback = PruneIdleConnectors;
readonly Timer _pruningTimer;
readonly TimeSpan _pruningSamplingInterval;
readonly int _pruningSampleSize;
readonly int[] _pruningSamples;
readonly int _pruningMedianIndex;
volatile bool _pruningTimerEnabled;
int _pruningSampleIndex;
volatile int _isClearing;
static readonly SingleThreadSynchronizationContext SingleThreadSynchronizationContext = new("NpgsqlRemainingAsyncSendWorker");
#endregion
internal sealed override (int Total, int Idle, int Busy) Statistics
{
get
{
var numConnectors = _numConnectors;
var idleCount = _idleCount;
return (numConnectors, idleCount, numConnectors - idleCount);
}
}
internal sealed override bool OwnsConnectors => true;
internal ConnectorPool(NpgsqlConnectionStringBuilder settings, string connString, MultiHostConnectorPool? parentPool = null)
: base(settings, connString)
{
if (settings.MaxPoolSize < settings.MinPoolSize)
throw new ArgumentException($"Connection can't have 'Max Pool Size' {settings.MaxPoolSize} under 'Min Pool Size' {settings.MinPoolSize}");
_parentPool = parentPool;
// We enforce Max Pool Size, so no need to to create a bounded channel (which is less efficient)
// On the consuming side, we have the multiplexing write loop but also non-multiplexing Rents
// On the producing side, we have connections being released back into the pool (both multiplexing and not)
var idleChannel = Channel.CreateUnbounded();
_idleConnectorReader = idleChannel.Reader;
IdleConnectorWriter = idleChannel.Writer;
_max = settings.MaxPoolSize;
_min = settings.MinPoolSize;
if (settings.ConnectionPruningInterval == 0)
throw new ArgumentException("ConnectionPruningInterval can't be 0.");
var connectionIdleLifetime = TimeSpan.FromSeconds(settings.ConnectionIdleLifetime);
var pruningSamplingInterval = TimeSpan.FromSeconds(settings.ConnectionPruningInterval);
if (connectionIdleLifetime < pruningSamplingInterval)
throw new ArgumentException($"Connection can't have ConnectionIdleLifetime {connectionIdleLifetime} under ConnectionPruningInterval {_pruningSamplingInterval}");
_pruningTimer = new Timer(PruningTimerCallback, this, Timeout.Infinite, Timeout.Infinite);
_pruningSampleSize = DivideRoundingUp(settings.ConnectionIdleLifetime, settings.ConnectionPruningInterval);
_pruningMedianIndex = DivideRoundingUp(_pruningSampleSize, 2) - 1; // - 1 to go from length to index
_pruningSamplingInterval = pruningSamplingInterval;
_pruningSamples = new int[_pruningSampleSize];
_pruningTimerEnabled = false;
_connectionLifetime = TimeSpan.FromSeconds(settings.ConnectionLifetime);
Connectors = new NpgsqlConnector[_max];
}
internal sealed override ValueTask Get(
NpgsqlConnection conn, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
return TryGetIdleConnector(out var connector)
? new ValueTask(connector)
: RentAsync(conn, timeout, async, cancellationToken);
async ValueTask RentAsync(
NpgsqlConnection conn, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
// First, try to open a new physical connector. This will fail if we're at max capacity.
var connector = await OpenNewConnector(conn, timeout, async, cancellationToken);
if (connector != null)
return connector;
// We're at max capacity. Block on the idle channel with a timeout.
// Note that Channels guarantee fair FIFO behavior to callers of ReadAsync (first-come first-
// served), which is crucial to us.
using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var finalToken = linkedSource.Token;
linkedSource.CancelAfter(timeout.CheckAndGetTimeLeft());
while (true)
{
try
{
if (async)
{
connector = await _idleConnectorReader.ReadAsync(finalToken);
if (CheckIdleConnector(connector))
return connector;
}
else
{
// Channels don't have a sync API. To avoid sync-over-async issues, we use a special single-
// thread synchronization context which ensures that callbacks are executed on a dedicated
// thread.
// Note that AsTask isn't safe here for getting the result, since it still causes some continuation code
// to get executed on the TP (which can cause deadlocks).
using (SingleThreadSynchronizationContext.Enter())
using (var mre = new ManualResetEventSlim())
{
_idleConnectorReader.WaitToReadAsync(finalToken).GetAwaiter().OnCompleted(() => mre.Set());
mre.Wait(finalToken);
}
}
}
catch (OperationCanceledException)
{
cancellationToken.ThrowIfCancellationRequested();
Debug.Assert(finalToken.IsCancellationRequested);
throw new NpgsqlException(
$"The connection pool has been exhausted, either raise 'Max Pool Size' (currently {_max}) " +
$"or 'Timeout' (currently {Settings.Timeout} seconds) in your connection string.",
new TimeoutException());
}
catch (ChannelClosedException)
{
throw new NpgsqlException("The connection pool has been shut down.");
}
// If we're here, our waiting attempt on the idle connector channel was released with a null
// (or bad connector), or we're in sync mode. Check again if a new idle connector has appeared since we last checked.
if (TryGetIdleConnector(out connector))
return connector;
// We might have closed a connector in the meantime and no longer be at max capacity
// so try to open a new connector and if that fails, loop again.
connector = await OpenNewConnector(conn, timeout, async, cancellationToken);
if (connector != null)
return connector;
}
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal sealed override bool TryGetIdleConnector([NotNullWhen(true)] out NpgsqlConnector? connector)
{
while (_idleConnectorReader.TryRead(out var nullableConnector))
{
if (CheckIdleConnector(nullableConnector))
{
connector = nullableConnector;
return true;
}
}
connector = null;
return false;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
bool CheckIdleConnector([NotNullWhen(true)] NpgsqlConnector? connector)
{
if (connector is null)
return false;
// Only decrement when the connector has a value.
Interlocked.Decrement(ref _idleCount);
// An connector could be broken because of a keepalive that occurred while it was
// idling in the pool
// TODO: Consider removing the pool from the keepalive code. The following branch is simply irrelevant
// if keepalive isn't turned on.
if (connector.IsBroken)
{
CloseConnector(connector);
return false;
}
if (_connectionLifetime != TimeSpan.Zero && DateTime.UtcNow > connector.OpenTimestamp + _connectionLifetime)
{
Log.Debug("Connection has exceeded its maximum lifetime and will be closed.", connector.Id);
CloseConnector(connector);
return false;
}
Debug.Assert(connector.State == ConnectorState.Ready,
$"Got idle connector but {nameof(connector.State)} is {connector.State}");
Debug.Assert(connector.CommandsInFlightCount == 0,
$"Got idle connector but {nameof(connector.CommandsInFlightCount)} is {connector.CommandsInFlightCount}");
Debug.Assert(connector.MultiplexAsyncWritingLock == 0,
$"Got idle connector but {nameof(connector.MultiplexAsyncWritingLock)} is 1");
return true;
}
internal sealed override async ValueTask OpenNewConnector(
NpgsqlConnection conn, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken)
{
// As long as we're under max capacity, attempt to increase the connector count and open a new connection.
for (var numConnectors = _numConnectors; numConnectors < _max; numConnectors = _numConnectors)
{
// Note that we purposefully don't use SpinWait for this: https://github.com/dotnet/coreclr/pull/21437
if (Interlocked.CompareExchange(ref _numConnectors, numConnectors + 1, numConnectors) != numConnectors)
continue;
try
{
// We've managed to increase the open counter, open a physical connections.
var connector = new NpgsqlConnector(this, conn) { ClearCounter = _clearCounter };
await connector.Open(timeout, async, cancellationToken);
var i = 0;
for (; i < _max; i++)
if (Interlocked.CompareExchange(ref Connectors[i], connector, null) == null)
break;
Debug.Assert(i < _max, $"Could not find free slot in {Connectors} when opening.");
if (i == _max)
throw new NpgsqlException($"Could not find free slot in {Connectors} when opening. Please report a bug.");
// Only start pruning if we've incremented open count past _min.
// Note that we don't do it only once, on equality, because the thread which incremented open count past _min might get exception
// on NpgsqlConnector.Open due to timeout, CancellationToken or other reasons.
if (numConnectors >= _min)
UpdatePruningTimer();
return connector;
}
catch
{
// Physical open failed, decrement the open and busy counter back down.
Interlocked.Decrement(ref _numConnectors);
// In case there's a waiting attempt on the channel, we write a null to the idle connector channel
// to wake it up, so it will try opening (and probably throw immediately)
// Statement order is important since we have synchronous completions on the channel.
IdleConnectorWriter.TryWrite(null);
// Just in case we always call UpdatePruningTimer for failed physical open
UpdatePruningTimer();
throw;
}
}
return null;
}
internal sealed override void Return(NpgsqlConnector connector)
{
Debug.Assert(!connector.InTransaction);
Debug.Assert(connector.MultiplexAsyncWritingLock == 0 || connector.IsBroken || connector.IsClosed,
$"About to return multiplexing connector to the pool, but {nameof(connector.MultiplexAsyncWritingLock)} is {connector.MultiplexAsyncWritingLock}");
// If Clear/ClearAll has been been called since this connector was first opened,
// throw it away. The same if it's broken (in which case CloseConnector is only
// used to update state/perf counter).
if (connector.ClearCounter != _clearCounter || connector.IsBroken)
{
CloseConnector(connector);
return;
}
// Statement order is important since we have synchronous completions on the channel.
Interlocked.Increment(ref _idleCount);
var written = IdleConnectorWriter.TryWrite(connector);
Debug.Assert(written);
}
internal override void Clear()
{
Interlocked.Increment(ref _clearCounter);
if (Interlocked.CompareExchange(ref _isClearing, 1, 0) == 1)
return;
try
{
var count = _idleCount;
while (count > 0 && _idleConnectorReader.TryRead(out var connector))
{
if (CheckIdleConnector(connector))
{
CloseConnector(connector);
count--;
}
}
}
finally
{
_isClearing = 0;
}
}
void CloseConnector(NpgsqlConnector connector)
{
try
{
connector.Close();
}
catch (Exception e)
{
Log.Warn("Exception while closing connector", e, connector.Id);
}
var i = 0;
for (; i < _max; i++)
if (Interlocked.CompareExchange(ref Connectors[i], null, connector) == connector)
break;
Debug.Assert(i < _max, $"Could not find free slot in {Connectors} when closing.");
if (i == _max)
throw new NpgsqlException($"Could not find free slot in {Connectors} when closing. Please report a bug.");
var numConnectors = Interlocked.Decrement(ref _numConnectors);
Debug.Assert(numConnectors >= 0);
// If a connector has been closed for any reason, we write a null to the idle connector channel to wake up
// a waiter, who will open a new physical connection
// Statement order is important since we have synchronous completions on the channel.
IdleConnectorWriter.TryWrite(null);
// Only turn off the timer one time, when it was this Close that brought Open back to _min.
if (numConnectors == _min)
UpdatePruningTimer();
}
internal override bool TryRemovePendingEnlistedConnector(NpgsqlConnector connector, Transaction transaction)
=> _parentPool is null
? base.TryRemovePendingEnlistedConnector(connector, transaction)
: _parentPool.TryRemovePendingEnlistedConnector(connector, transaction);
#region Pruning
void UpdatePruningTimer()
{
lock (_pruningTimer)
{
var numConnectors = _numConnectors;
if (numConnectors > _min && !_pruningTimerEnabled)
{
_pruningTimerEnabled = true;
_pruningTimer.Change(_pruningSamplingInterval, Timeout.InfiniteTimeSpan);
}
else if (numConnectors <= _min && _pruningTimerEnabled)
{
_pruningTimer.Change(Timeout.Infinite, Timeout.Infinite);
_pruningSampleIndex = 0;
_pruningTimerEnabled = false;
}
}
}
static void PruneIdleConnectors(object? state)
{
var pool = (ConnectorPool)state!;
var samples = pool._pruningSamples;
int toPrune;
lock (pool._pruningTimer)
{
// Check if we might have been contending with DisablePruning.
if (!pool._pruningTimerEnabled)
return;
var sampleIndex = pool._pruningSampleIndex;
samples[sampleIndex] = pool._idleCount;
if (sampleIndex != pool._pruningSampleSize - 1)
{
pool._pruningSampleIndex = sampleIndex + 1;
pool._pruningTimer.Change(pool._pruningSamplingInterval, Timeout.InfiniteTimeSpan);
return;
}
// Calculate median value for pruning, reset index and timer, and release the lock.
Array.Sort(samples);
toPrune = samples[pool._pruningMedianIndex];
pool._pruningSampleIndex = 0;
pool._pruningTimer.Change(pool._pruningSamplingInterval, Timeout.InfiniteTimeSpan);
}
while (toPrune > 0 &&
pool._numConnectors > pool._min &&
pool._idleConnectorReader.TryRead(out var connector) &&
connector != null)
{
if (pool.CheckIdleConnector(connector))
{
pool.CloseConnector(connector);
toPrune--;
}
}
}
static int DivideRoundingUp(int value, int divisor) => 1 + (value - 1) / divisor;
#endregion
}