using System.Buffers; using System.Data; using System.Diagnostics; using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Runtime.InteropServices; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading.Channels; using System.Transactions; using Npgsql.BackendMessages; using Npgsql.Logging; using Npgsql.TypeMapping; using Npgsql.Util; using static Npgsql.Util.Statics; namespace Npgsql.Internal; /// /// Represents a connection to a PostgreSQL backend. Unlike NpgsqlConnection objects, which are /// exposed to users, connectors are internal to Npgsql and are recycled by the connection pool. /// public sealed partial class NpgsqlConnector : IDisposable { #region Fields and Properties /// /// The physical connection socket to the backend. /// Socket _socket = default!; /// /// The physical connection stream to the backend, without anything on top. /// NetworkStream _baseStream = default!; /// /// The physical connection stream to the backend, layered with an SSL/TLS stream if in secure mode. /// Stream _stream = default!; /// /// The parsed connection string. /// public NpgsqlConnectionStringBuilder Settings { get; } ProvideClientCertificatesCallback? ProvideClientCertificatesCallback { get; } RemoteCertificateValidationCallback? UserCertificateValidationCallback { get; } ProvidePasswordCallback? ProvidePasswordCallback { get; } #pragma warning disable CA2252 // Experimental API PhysicalOpenCallback? PhysicalOpenCallback { get; } PhysicalOpenAsyncCallback? PhysicalOpenAsyncCallback { get; } #pragma warning restore CA2252 public Encoding TextEncoding { get; private set; } = default!; /// /// Same as , except that it does not throw an exception if an invalid char is /// encountered (exception fallback), but rather replaces it with a question mark character (replacement /// fallback). /// internal Encoding RelaxedTextEncoding { get; private set; } = default!; /// /// Buffer used for reading data. /// internal NpgsqlReadBuffer ReadBuffer { get; private set; } = default!; /// /// If we read a data row that's bigger than , we allocate an oversize buffer. /// The original (smaller) buffer is stored here, and restored when the connection is reset. /// NpgsqlReadBuffer? _origReadBuffer; /// /// Buffer used for writing data. /// internal NpgsqlWriteBuffer WriteBuffer { get; private set; } = default!; /// /// The secret key of the backend for this connector, used for query cancellation. /// int _backendSecretKey; /// /// The process ID of the backend for this connector. /// internal int BackendProcessId { get; private set; } bool SupportsPostgresCancellation => BackendProcessId != 0; /// /// A unique ID identifying this connector, used for logging. Currently mapped to BackendProcessId /// internal int Id => BackendProcessId; /// /// Information about PostgreSQL and PostgreSQL-like databases (e.g. type definitions, capabilities...). /// public NpgsqlDatabaseInfo DatabaseInfo { get; internal set; } = default!; internal ConnectorTypeMapper TypeMapper { get; set; } = default!; /// /// The current transaction status for this connector. /// internal TransactionStatus TransactionStatus { get; set; } /// /// A transaction object for this connector. Since only one transaction can be in progress at any given time, /// this instance is recycled. To check whether a transaction is currently in progress on this connector, /// see . /// internal NpgsqlTransaction? Transaction { get; set; } internal NpgsqlTransaction? UnboundTransaction { get; set; } /// /// The NpgsqlConnection that (currently) owns this connector. Null if the connector isn't /// owned (i.e. idle in the pool) /// internal NpgsqlConnection? Connection { get; set; } /// /// The number of messages that were prepended to the current message chain, but not yet sent. /// Note that this only tracks messages which produce a ReadyForQuery message /// internal int PendingPrependedResponses { get; set; } /// /// A ManualResetEventSlim used to make sure a cancellation request doesn't run /// while we're reading responses for the prepended query /// as we can't gracefully handle their cancellation. /// readonly ManualResetEventSlim ReadingPrependedMessagesMRE = new(initialState: true); internal NpgsqlDataReader? CurrentReader; internal PreparedStatementManager PreparedStatementManager { get; } internal SqlQueryParser SqlQueryParser { get; } = new(); /// /// If the connector is currently in COPY mode, holds a reference to the importer/exporter object. /// Otherwise null. /// internal ICancelable? CurrentCopyOperation; /// /// Holds all run-time parameters received from the backend (via ParameterStatus messages) /// internal Dictionary PostgresParameters { get; } /// /// Holds all run-time parameters in raw, binary format for efficient handling without allocations. /// readonly List<(byte[] Name, byte[] Value)> _rawParameters = new(); /// /// If this connector was broken, this contains the exception that caused the break. /// volatile Exception? _breakReason; /// /// Semaphore, used to synchronize DatabaseInfo between multiple connections, so it wouldn't be loaded in parallel. /// static readonly SemaphoreSlim DatabaseInfoSemaphore = new(1); /// /// /// Used by the pool to indicate that I/O is currently in progress on this connector, so that another write /// isn't started concurrently. Note that since we have only one write loop, this is only ever usedto /// protect against an over-capacity writes into a connector that's currently *asynchronously* writing. /// /// /// It is guaranteed that the currently-executing /// Specifically, reading may occur - and the connector may even be returned to the pool - before this is /// released. /// /// internal volatile int MultiplexAsyncWritingLock; /// internal void FlagAsNotWritableForMultiplexing() { Debug.Assert(Settings.Multiplexing); Debug.Assert(CommandsInFlightCount > 0 || IsBroken || IsClosed, $"About to mark multiplexing connector as non-writable, but {nameof(CommandsInFlightCount)} is {CommandsInFlightCount}"); Interlocked.Exchange(ref MultiplexAsyncWritingLock, 1); } /// internal void FlagAsWritableForMultiplexing() { Debug.Assert(Settings.Multiplexing); if (Interlocked.CompareExchange(ref MultiplexAsyncWritingLock, 0, 1) != 1) throw new Exception("Multiplexing lock was not taken when releasing. Please report a bug."); } /// /// The timeout for reading messages that are part of the user's command /// (i.e. which aren't internal prepended commands). /// /// Precision is milliseconds internal int UserTimeout { private get; set; } /// /// A lock that's taken while a user action is in progress, e.g. a command being executed. /// Only used when keepalive is enabled, otherwise null. /// SemaphoreSlim? _userLock; /// /// A lock that's taken while a cancellation is being delivered; new queries are blocked until the /// cancellation is delivered. This reduces the chance that a cancellation meant for a previous /// command will accidentally cancel a later one, see #615. /// object CancelLock { get; } = new(); /// /// A lock that's taken to make sure no other concurrent operation is running. /// Break takes it to set the state of the connector. /// Anyone else should immediately check the state and exit /// if the connector is closed. /// object SyncObj { get; } = new(); /// /// A lock that's used to wait for the Cleanup to complete while breaking the connection. /// object CleanupLock { get; } = new(); readonly bool _isKeepAliveEnabled; readonly Timer? _keepAliveTimer; /// /// The command currently being executed by the connector, null otherwise. /// Used only for concurrent use error reporting purposes. /// NpgsqlCommand? _currentCommand; bool _sendResetOnClose; /// /// The connector source (e.g. pool) from where this connector came, and to which it will be returned. /// Note that in multi-host scenarios, this references the host-specific rather than the /// , /// readonly ConnectorSource _connectorSource; internal string UserFacingConnectionString => _connectorSource.UserFacingConnectionString; /// /// Contains the UTC timestamp when this connector was opened, used to implement /// . /// internal DateTime OpenTimestamp { get; private set; } internal int ClearCounter { get; set; } volatile bool _postgresCancellationPerformed; internal bool PostgresCancellationPerformed { get => _postgresCancellationPerformed; private set => _postgresCancellationPerformed = value; } volatile bool _userCancellationRequested; CancellationTokenRegistration _cancellationTokenRegistration; internal bool UserCancellationRequested => _userCancellationRequested; internal CancellationToken UserCancellationToken { get; set; } internal bool AttemptPostgresCancellation { get; private set; } static readonly TimeSpan _cancelImmediatelyTimeout = TimeSpan.FromMilliseconds(-1); X509Certificate2? _certificate; static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlConnector)); internal readonly Stopwatch QueryLogStopWatch = new(); internal EndPoint? ConnectedEndPoint { get; private set; } #endregion #region Constants /// /// The minimum timeout that can be set on internal commands such as COMMIT, ROLLBACK. /// /// Precision is seconds internal const int MinimumInternalCommandTimeout = 3; #endregion #region Reusable Message Objects byte[]? _resetWithoutDeallocateMessage; int _resetWithoutDeallocateResponseCount; // Backend readonly CommandCompleteMessage _commandCompleteMessage = new(); readonly ReadyForQueryMessage _readyForQueryMessage = new(); readonly ParameterDescriptionMessage _parameterDescriptionMessage = new(); readonly DataRowMessage _dataRowMessage = new(); readonly RowDescriptionMessage _rowDescriptionMessage = new(); // Since COPY is rarely used, allocate these lazily CopyInResponseMessage? _copyInResponseMessage; CopyOutResponseMessage? _copyOutResponseMessage; CopyDataMessage? _copyDataMessage; CopyBothResponseMessage? _copyBothResponseMessage; #endregion internal NpgsqlDataReader DataReader { get; set; } internal NpgsqlDataReader? UnboundDataReader { get; set; } #region Constructors internal NpgsqlConnector(ConnectorSource connectorSource, NpgsqlConnection conn) : this(connectorSource) { ProvideClientCertificatesCallback = conn.ProvideClientCertificatesCallback; UserCertificateValidationCallback = conn.UserCertificateValidationCallback; ProvidePasswordCallback = conn.ProvidePasswordCallback; #pragma warning disable CA2252 // Experimental API PhysicalOpenCallback = conn.PhysicalOpenCallback; PhysicalOpenAsyncCallback = conn.PhysicalOpenAsyncCallback; #pragma warning restore CA2252 } NpgsqlConnector(NpgsqlConnector connector) : this(connector._connectorSource) { ProvideClientCertificatesCallback = connector.ProvideClientCertificatesCallback; UserCertificateValidationCallback = connector.UserCertificateValidationCallback; ProvidePasswordCallback = connector.ProvidePasswordCallback; } NpgsqlConnector(ConnectorSource connectorSource) { Debug.Assert(connectorSource.OwnsConnectors); _connectorSource = connectorSource; State = ConnectorState.Closed; TransactionStatus = TransactionStatus.Idle; Settings = connectorSource.Settings; PostgresParameters = new Dictionary(); _isKeepAliveEnabled = Settings.KeepAlive > 0; if (_isKeepAliveEnabled) { _userLock = new SemaphoreSlim(1, 1); _keepAliveTimer = new Timer(PerformKeepAlive, null, Timeout.Infinite, Timeout.Infinite); } DataReader = new NpgsqlDataReader(this); // TODO: Not just for automatic preparation anymore... PreparedStatementManager = new PreparedStatementManager(this); if (Settings.Multiplexing) { // Note: It's OK for this channel to be unbounded: each command enqueued to it is accompanied by sending // it to PostgreSQL. If we overload it, a TCP zero window will make us block on the networking side // anyway. // Note: the in-flight channel can probably be single-writer, but that doesn't actually do anything // at this point. And we currently rely on being able to complete the channel at any point (from // Break). We may want to revisit this if an optimized, SingleWriter implementation is introduced. var commandsInFlightChannel = Channel.CreateUnbounded( new UnboundedChannelOptions { SingleReader = true }); CommandsInFlightReader = commandsInFlightChannel.Reader; CommandsInFlightWriter = commandsInFlightChannel.Writer; // TODO: Properly implement this if (_isKeepAliveEnabled) throw new NotImplementedException("Keepalive not yet implemented for multiplexing"); } } #endregion #region Configuration settings internal string Host => Settings.Host!; internal int Port => Settings.Port; internal string Database => Settings.Database!; string KerberosServiceName => Settings.KerberosServiceName; int ConnectionTimeout => Settings.Timeout; bool IntegratedSecurity => Settings.IntegratedSecurity; /// /// The actual command timeout value that gets set on internal commands. /// /// Precision is milliseconds int InternalCommandTimeout { get { var internalTimeout = Settings.InternalCommandTimeout; if (internalTimeout == -1) return Math.Max(Settings.CommandTimeout, MinimumInternalCommandTimeout) * 1000; // Todo: Decide what we really want here // This assertion can easily fail if InternalCommandTimeout is set to 1 or 2 in the connection string // We probably don't want to allow these values but in that case a Debug.Assert is the wrong way to enforce it. Debug.Assert(internalTimeout == 0 || internalTimeout >= MinimumInternalCommandTimeout); return internalTimeout * 1000; } } #endregion Configuration settings #region State management int _state; /// /// Gets the current state of the connector /// internal ConnectorState State { get => (ConnectorState)_state; set { var newState = (int)value; if (newState == _state) return; Interlocked.Exchange(ref _state, newState); } } /// /// Returns whether the connector is open, regardless of any task it is currently performing /// bool IsConnected => State switch { ConnectorState.Ready => true, ConnectorState.Executing => true, ConnectorState.Fetching => true, ConnectorState.Waiting => true, ConnectorState.Copy => true, ConnectorState.Replication => true, ConnectorState.Closed => false, ConnectorState.Connecting => false, ConnectorState.Broken => false, _ => throw new ArgumentOutOfRangeException("Unknown state: " + State) }; internal bool IsReady => State == ConnectorState.Ready; internal bool IsClosed => State == ConnectorState.Closed; internal bool IsBroken => State == ConnectorState.Broken; #endregion #region Open /// /// Opens the physical connection to the server. /// /// Usually called by the RequestConnector /// Method of the connection pool manager. internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken) { Debug.Assert(State == ConnectorState.Closed); State = ConnectorState.Connecting; try { await OpenCore(this, Settings.SslMode, timeout, async, cancellationToken); await LoadDatabaseInfo(forceReload: false, timeout, async, cancellationToken); if (Settings.Pooling && !Settings.Multiplexing && !Settings.NoResetOnClose && DatabaseInfo.SupportsDiscard) { _sendResetOnClose = true; GenerateResetMessage(); } OpenTimestamp = DateTime.UtcNow; Log.Trace($"Opened connection to {Host}:{Port}"); #pragma warning disable CA2252 // Experimental API if (async && PhysicalOpenAsyncCallback is not null) await PhysicalOpenAsyncCallback(this); else if (!async && PhysicalOpenCallback is not null) PhysicalOpenCallback(this); #pragma warning restore CA2252 if (Settings.Multiplexing) { // Start an infinite async loop, which processes incoming multiplexing traffic. // It is intentionally not awaited and will run as long as the connector is alive. // The CommandsInFlightWriter channel is completed in Cleanup, which should cause this task // to complete. _ = Task.Run(MultiplexingReadLoop, CancellationToken.None) .ContinueWith(t => { // Note that we *must* observe the exception if the task is faulted. Log.Error("Exception bubbled out of multiplexing read loop", t.Exception!, Id); }, TaskContinuationOptions.OnlyOnFaulted); } if (_isKeepAliveEnabled) { // Start the keep alive mechanism to work by scheduling the timer. // Otherwise, it doesn't work for cases when no query executed during // the connection lifetime in case of a new connector. lock (SyncObj) { var keepAlive = Settings.KeepAlive * 1000; _keepAliveTimer!.Change(keepAlive, keepAlive); } } } catch (Exception e) { Break(e); throw; } static async Task OpenCore( NpgsqlConnector conn, SslMode sslMode, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken, bool isFirstAttempt = true) { await conn.RawOpen(sslMode, timeout, async, cancellationToken, isFirstAttempt); var username = conn.GetUsername(); if (conn.Settings.Database == null) conn.Settings.Database = username; timeout.CheckAndApply(conn); conn.WriteStartupMessage(username); await conn.Flush(async, cancellationToken); var cancellationRegistration = conn.StartCancellableOperation(cancellationToken, attemptPgCancellation: false); try { await conn.Authenticate(username, timeout, async, cancellationToken); } catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.InvalidAuthorizationSpecification && (sslMode == SslMode.Prefer && conn.IsSecure || sslMode == SslMode.Allow && !conn.IsSecure)) { cancellationRegistration.Dispose(); Debug.Assert(!conn.IsBroken); conn.Cleanup(); // If Prefer was specified and we failed (with SSL), retry without SSL. // If Allow was specified and we failed (without SSL), retry with SSL await OpenCore( conn, sslMode == SslMode.Prefer ? SslMode.Disable : SslMode.Require, timeout, async, cancellationToken, isFirstAttempt: false); return; } using var _ = cancellationRegistration; // We treat BackendKeyData as optional because some PostgreSQL-like database // don't send it (CockroachDB, CrateDB) var msg = await conn.ReadMessage(async); if (msg.Code == BackendMessageCode.BackendKeyData) { var keyDataMsg = (BackendKeyDataMessage)msg; conn.BackendProcessId = keyDataMsg.BackendProcessId; conn._backendSecretKey = keyDataMsg.BackendSecretKey; msg = await conn.ReadMessage(async); } if (msg.Code != BackendMessageCode.ReadyForQuery) throw new NpgsqlException($"Received backend message {msg.Code} while expecting ReadyForQuery. Please file a bug."); conn.State = ConnectorState.Ready; } } internal async ValueTask LoadDatabaseInfo(bool forceReload, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken = default) { // The type loading below will need to send queries to the database, and that depends on a type mapper being set up (even if its // empty). So we set up here, and then later inject the DatabaseInfo. // For multiplexing connectors, the type mapper is the shared pool-wide one (since when validating/binding parameters on // multiplexing there's no connector yet). However, in the very first multiplexing connection (bootstrap phase) we create // a connector-specific mapper, which will later become shared pool-wide one. TypeMapper = Settings.Multiplexing && ((MultiplexingConnectorPool)_connectorSource).MultiplexingTypeMapper is { } multiplexingTypeMapper ? multiplexingTypeMapper : new ConnectorTypeMapper(this); var key = new NpgsqlDatabaseInfoCacheKey(Settings); if (forceReload || !NpgsqlDatabaseInfo.Cache.TryGetValue(key, out var database)) { var hasSemaphore = async ? await DatabaseInfoSemaphore.WaitAsync(timeout.CheckAndGetTimeLeft(), cancellationToken) : DatabaseInfoSemaphore.Wait(timeout.CheckAndGetTimeLeft(), cancellationToken); // We've timed out - calling Check, to throw the correct exception if (!hasSemaphore) timeout.Check(); try { if (forceReload || !NpgsqlDatabaseInfo.Cache.TryGetValue(key, out database)) { NpgsqlDatabaseInfo.Cache[key] = database = await NpgsqlDatabaseInfo.Load(this, timeout, async); } } finally { DatabaseInfoSemaphore.Release(); } } DatabaseInfo = database; TypeMapper.DatabaseInfo = database; } internal async ValueTask QueryClusterState( NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken = default) { using var batch = CreateBatch(); batch.BatchCommands.Add(new NpgsqlBatchCommand("select pg_is_in_recovery()")); batch.BatchCommands.Add(new NpgsqlBatchCommand("SHOW default_transaction_read_only")); batch.Timeout = (int)timeout.CheckAndGetTimeLeft().TotalSeconds; var reader = async ? await batch.ExecuteReaderAsync(cancellationToken) : batch.ExecuteReader(); try { if (async) { await reader.ReadAsync(cancellationToken); _isHotStandBy = reader.GetBoolean(0); await reader.NextResultAsync(cancellationToken); await reader.ReadAsync(cancellationToken); } else { reader.Read(); _isHotStandBy = reader.GetBoolean(0); reader.NextResult(); reader.Read(); } _isTransactionReadOnly = reader.GetString(0) != "off"; var clusterState = UpdateClusterState(); Debug.Assert(clusterState.HasValue); return clusterState.Value; } finally { if (async) await reader.DisposeAsync(); else reader.Dispose(); } } void WriteStartupMessage(string username) { var startupParams = new Dictionary { ["user"] = username, ["client_encoding"] = Settings.ClientEncoding ?? PostgresEnvironment.ClientEncoding ?? "UTF8", ["database"] = Settings.Database! }; if (Settings.ApplicationName?.Length > 0) startupParams["application_name"] = Settings.ApplicationName; if (Settings.SearchPath?.Length > 0) startupParams["search_path"] = Settings.SearchPath; var timezone = Settings.Timezone ?? PostgresEnvironment.TimeZone; if (timezone != null) startupParams["TimeZone"] = timezone; var options = Settings.Options ?? PostgresEnvironment.Options; if (options?.Length > 0) startupParams["options"] = options; switch (Settings.ReplicationMode) { case ReplicationMode.Logical: startupParams["replication"] = "database"; break; case ReplicationMode.Physical: startupParams["replication"] = "true"; break; } WriteStartup(startupParams); } string GetUsername() { var username = Settings.Username; if (username?.Length > 0) return username; username = PostgresEnvironment.User; if (username?.Length > 0) return username; if (!RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { username = KerberosUsernameProvider.GetUsername(Settings.IncludeRealm); if (username?.Length > 0) return username; } username = Environment.UserName; if (username?.Length > 0) return username; throw new NpgsqlException("No username could be found, please specify one explicitly"); } async Task RawOpen(SslMode sslMode, NpgsqlTimeout timeout, bool async, CancellationToken cancellationToken, bool isFirstAttempt = true) { try { if (async) await ConnectAsync(timeout, cancellationToken); else Connect(timeout); _baseStream = new NetworkStream(_socket, true); _stream = _baseStream; if (Settings.Encoding == "UTF8") { TextEncoding = PGUtil.UTF8Encoding; RelaxedTextEncoding = PGUtil.RelaxedUTF8Encoding; } else { TextEncoding = Encoding.GetEncoding(Settings.Encoding, EncoderFallback.ExceptionFallback, DecoderFallback.ExceptionFallback); RelaxedTextEncoding = Encoding.GetEncoding(Settings.Encoding, EncoderFallback.ReplacementFallback, DecoderFallback.ReplacementFallback); } ReadBuffer = new NpgsqlReadBuffer(this, _stream, _socket, Settings.ReadBufferSize, TextEncoding, RelaxedTextEncoding); WriteBuffer = new NpgsqlWriteBuffer(this, _stream, _socket, Settings.WriteBufferSize, TextEncoding); timeout.CheckAndApply(this); IsSecure = false; if (sslMode is SslMode.Prefer or SslMode.Require or SslMode.VerifyCA or SslMode.VerifyFull) { WriteSslRequest(); await Flush(async, cancellationToken); await ReadBuffer.Ensure(1, async); var response = (char)ReadBuffer.ReadByte(); timeout.CheckAndApply(this); switch (response) { default: throw new NpgsqlException($"Received unknown response {response} for SSLRequest (expecting S or N)"); case 'N': if (sslMode != SslMode.Prefer) throw new NpgsqlException("SSL connection requested. No SSL enabled connection from this host is configured."); break; case 'S': var clientCertificates = new X509Certificate2Collection(); var certPath = Settings.SslCertificate ?? PostgresEnvironment.SslCert ?? PostgresEnvironment.SslCertDefault; if (certPath != null) { var password = Settings.SslPassword; if (Path.GetExtension(certPath).ToUpperInvariant() != ".PFX") { #if NET5_0_OR_GREATER // It's PEM time var keyPath = Settings.SslKey ?? PostgresEnvironment.SslKey ?? PostgresEnvironment.SslKeyDefault; _certificate = string.IsNullOrEmpty(password) ? X509Certificate2.CreateFromPemFile(certPath, keyPath) : X509Certificate2.CreateFromEncryptedPemFile(certPath, password, keyPath); if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { // Windows crypto API has a bug with pem certs // See #3650 using var previousCert = _certificate; _certificate = new X509Certificate2(_certificate.Export(X509ContentType.Pkcs12)); } #else throw new NotSupportedException("PEM certificates are only supported with .NET 5 and higher"); #endif } if (_certificate is null) _certificate = new X509Certificate2(certPath, password); clientCertificates.Add(_certificate); } ProvideClientCertificatesCallback?.Invoke(clientCertificates); var checkCertificateRevocation = Settings.CheckCertificateRevocation; RemoteCertificateValidationCallback? certificateValidationCallback; if (UserCertificateValidationCallback is not null) { if (sslMode is SslMode.VerifyCA or SslMode.VerifyFull) throw new ArgumentException(string.Format(NpgsqlStrings.CannotUseSslVerifyWithUserCallback, sslMode)); if (Settings.RootCertificate is not null) throw new ArgumentException(NpgsqlStrings.CannotUseSslRootCertificateWithUserCallback); certificateValidationCallback = UserCertificateValidationCallback; } else if (sslMode is SslMode.Prefer or SslMode.Require) { if (isFirstAttempt && sslMode is SslMode.Require && !Settings.TrustServerCertificate) throw new ArgumentException(NpgsqlStrings.CannotUseSslModeRequireWithoutTrustServerCertificate); certificateValidationCallback = SslTrustServerValidation; checkCertificateRevocation = false; } else if ((Settings.RootCertificate ?? PostgresEnvironment.SslCertRoot ?? PostgresEnvironment.SslCertRootDefault) is { } certRootPath) { certificateValidationCallback = SslRootValidation(certRootPath, sslMode == SslMode.VerifyFull); } else if (sslMode == SslMode.VerifyCA) { certificateValidationCallback = SslVerifyCAValidation; } else { Debug.Assert(sslMode == SslMode.VerifyFull); certificateValidationCallback = SslVerifyFullValidation; } timeout.CheckAndApply(this); try { var sslStream = new SslStream(_stream, leaveInnerStreamOpen: false, certificateValidationCallback); var sslProtocols = SslProtocols.None; // On .NET Framework SslProtocols.None can be disabled, see #3718 #if NETSTANDARD2_0 sslProtocols = SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12; #endif if (async) await sslStream.AuthenticateAsClientAsync(Host, clientCertificates, sslProtocols, checkCertificateRevocation); else sslStream.AuthenticateAsClient(Host, clientCertificates, sslProtocols, checkCertificateRevocation); _stream = sslStream; } catch (Exception e) { throw new NpgsqlException("Exception while performing SSL handshake", e); } ReadBuffer.Underlying = _stream; WriteBuffer.Underlying = _stream; IsSecure = true; Log.Trace("SSL negotiation successful"); break; } if (ReadBuffer.ReadBytesLeft > 0) throw new NpgsqlException("Additional unencrypted data received after SSL negotiation - this should never happen, and may be an indication of a man-in-the-middle attack."); } Log.Trace($"Socket connected to {Host}:{Port}"); } catch { _certificate?.Dispose(); _certificate = null; _stream?.Dispose(); _stream = null!; _baseStream?.Dispose(); _baseStream = null!; _socket?.Dispose(); _socket = null!; throw; } } void Connect(NpgsqlTimeout timeout) { // Note that there aren't any timeout-able or cancellable DNS methods var endpoints = NpgsqlConnectionStringBuilder.IsUnixSocket(Host, Port, out var socketPath) ? new EndPoint[] { new UnixDomainSocketEndPoint(socketPath) } : Dns.GetHostAddresses(Host).Select(a => new IPEndPoint(a, Port)).ToArray(); timeout.Check(); // Give each endpoint an equal share of the remaining time var perEndpointTimeout = -1; // Default to infinity if (timeout.IsSet) perEndpointTimeout = (int)(timeout.CheckAndGetTimeLeft().Ticks / endpoints.Length / 10); for (var i = 0; i < endpoints.Length; i++) { var endpoint = endpoints[i]; Log.Trace($"Attempting to connect to {endpoint}"); var protocolType = endpoint.AddressFamily == AddressFamily.InterNetwork || endpoint.AddressFamily == AddressFamily.InterNetworkV6 ? ProtocolType.Tcp : ProtocolType.IP; var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, protocolType) { Blocking = false }; try { try { socket.Connect(endpoint); } catch (SocketException e) { if (e.SocketErrorCode != SocketError.WouldBlock) throw; } var write = new List { socket }; var error = new List { socket }; Socket.Select(null, write, error, perEndpointTimeout); var errorCode = (int)socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Error)!; if (errorCode != 0) throw new SocketException(errorCode); if (!write.Any()) throw new TimeoutException("Timeout during connection attempt"); socket.Blocking = true; SetSocketOptions(socket); _socket = socket; ConnectedEndPoint = endpoint; return; } catch (Exception e) { try { socket.Dispose(); } catch { // ignored } Log.Trace($"Failed to connect to {endpoint}", e); if (i == endpoints.Length - 1) throw new NpgsqlException($"Failed to connect to {endpoint}", e); } } } async Task ConnectAsync(NpgsqlTimeout timeout, CancellationToken cancellationToken) { // Note that there aren't any timeout-able or cancellable DNS methods var endpoints = NpgsqlConnectionStringBuilder.IsUnixSocket(Host, Port, out var socketPath) ? new EndPoint[] { new UnixDomainSocketEndPoint(socketPath) } : (await GetHostAddressesAsync(timeout, cancellationToken)) .Select(a => new IPEndPoint(a, Port)).ToArray(); // Give each IP an equal share of the remaining time var perIpTimespan = default(TimeSpan); var perIpTimeout = timeout; if (timeout.IsSet) { perIpTimespan = new TimeSpan(timeout.CheckAndGetTimeLeft().Ticks / endpoints.Length); perIpTimeout = new NpgsqlTimeout(perIpTimespan); } for (var i = 0; i < endpoints.Length; i++) { var endpoint = endpoints[i]; Log.Trace($"Attempting to connect to {endpoint}"); var protocolType = endpoint.AddressFamily == AddressFamily.InterNetwork || endpoint.AddressFamily == AddressFamily.InterNetworkV6 ? ProtocolType.Tcp : ProtocolType.IP; var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, protocolType); try { await OpenSocketConnectionAsync(socket, endpoint, perIpTimeout, cancellationToken); SetSocketOptions(socket); _socket = socket; ConnectedEndPoint = endpoint; return; } catch (Exception e) { try { socket.Dispose(); } catch { // ignored } cancellationToken.ThrowIfCancellationRequested(); if (e is OperationCanceledException) e = new TimeoutException("Timeout during connection attempt"); Log.Trace($"Failed to connect to {endpoint}", e); if (i == endpoints.Length - 1) throw new NpgsqlException($"Failed to connect to {endpoint}", e); } } Task GetHostAddressesAsync(NpgsqlTimeout timeout, CancellationToken cancellationToken) { // .NET 6.0 added cancellation support to GetHostAddressesAsync, which allows us to implement real // cancellation and timeout. On older TFMs, we fake-cancel the operation, i.e. stop waiting // and raise the exception, but the actual connection task is left running. #if NET6_0_OR_GREATER var task = TaskExtensions.ExecuteWithTimeout( ct => Dns.GetHostAddressesAsync(Host, ct), timeout, cancellationToken); #else var task = Dns.GetHostAddressesAsync(Host); #endif // As the cancellation support of GetHostAddressesAsync is not guaranteed on all platforms // we apply the fake-cancel mechanism in all cases. return task.WithCancellationAndTimeout(timeout, cancellationToken); } static Task OpenSocketConnectionAsync(Socket socket, EndPoint endpoint, NpgsqlTimeout perIpTimeout, CancellationToken cancellationToken) { // .NET 5.0 added cancellation support to ConnectAsync, which allows us to implement real // cancellation and timeout. On older TFMs, we fake-cancel the operation, i.e. stop waiting // and raise the exception, but the actual connection task is left running. #if NET5_0_OR_GREATER return TaskExtensions.ExecuteWithTimeout( ct => socket.ConnectAsync(endpoint, ct).AsTask(), perIpTimeout, cancellationToken); #else return socket.ConnectAsync(endpoint) .WithCancellationAndTimeout(perIpTimeout, cancellationToken); #endif } } void SetSocketOptions(Socket socket) { if (socket.AddressFamily == AddressFamily.InterNetwork || socket.AddressFamily == AddressFamily.InterNetworkV6) socket.NoDelay = true; if (Settings.SocketReceiveBufferSize > 0) socket.ReceiveBufferSize = Settings.SocketReceiveBufferSize; if (Settings.SocketSendBufferSize > 0) socket.SendBufferSize = Settings.SocketSendBufferSize; if (Settings.TcpKeepAlive) socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); if (Settings.TcpKeepAliveInterval > 0 && Settings.TcpKeepAliveTime == 0) throw new ArgumentException("If TcpKeepAliveInterval is defined, TcpKeepAliveTime must be defined as well"); if (Settings.TcpKeepAliveTime > 0) { var timeSeconds = Settings.TcpKeepAliveTime; var intervalSeconds = Settings.TcpKeepAliveInterval > 0 ? Settings.TcpKeepAliveInterval : Settings.TcpKeepAliveTime; #if NETSTANDARD2_0 || NETSTANDARD2_1 var timeMilliseconds = timeSeconds * 1000; var intervalMilliseconds = intervalSeconds * 1000; // For the following see https://msdn.microsoft.com/en-us/library/dd877220.aspx var uintSize = Marshal.SizeOf(typeof(uint)); var inOptionValues = new byte[uintSize * 3]; BitConverter.GetBytes((uint)1).CopyTo(inOptionValues, 0); BitConverter.GetBytes((uint)timeMilliseconds).CopyTo(inOptionValues, uintSize); BitConverter.GetBytes((uint)intervalMilliseconds).CopyTo(inOptionValues, uintSize * 2); var result = 0; try { result = socket.IOControl(IOControlCode.KeepAliveValues, inOptionValues, null); } catch (PlatformNotSupportedException) { throw new PlatformNotSupportedException("Setting TCP Keepalive Time and TCP Keepalive Interval is supported only on Windows, Mono and .NET Core 3.1+. " + "TCP keepalives can still be used on other systems but are enabled via the TcpKeepAlive option or configured globally for the machine, see the relevant docs."); } if (result != 0) throw new NpgsqlException($"Got non-zero value when trying to set TCP keepalive: {result}"); #else socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, timeSeconds); socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, intervalSeconds); #endif } } #endregion #region I/O readonly ChannelReader? CommandsInFlightReader; internal readonly ChannelWriter? CommandsInFlightWriter; internal volatile int CommandsInFlightCount; internal ManualResetValueTaskSource ReaderCompleted { get; } = new() { RunContinuationsAsynchronously = true }; async Task MultiplexingReadLoop() { Debug.Assert(Settings.Multiplexing); Debug.Assert(CommandsInFlightReader != null); NpgsqlCommand? command = null; var commandsRead = 0; try { while (await CommandsInFlightReader.WaitToReadAsync()) { commandsRead = 0; Debug.Assert(!InTransaction); while (CommandsInFlightReader.TryRead(out command)) { commandsRead++; await ReadBuffer.Ensure(5, true); // We have a resultset for the command - hand back control to the command (which will // return it to the user) command.TraceReceivedFirstResponse(); ReaderCompleted.Reset(); command.ExecutionCompletion.SetResult(this); // Now wait until that command's reader is disposed. Note that RunContinuationsAsynchronously is // true, so that the user code calling NpgsqlDataReader.Dispose will not continue executing // synchronously here. The prevents issues if the code after the next command's execution // completion blocks. await new ValueTask(ReaderCompleted, ReaderCompleted.Version); Debug.Assert(!InTransaction); } // Atomically update the commands in-flight counter, and check if it reached 0. If so, the // connector is idle and can be returned. // Note that this is racing with over-capacity writing, which can select any connector at any // time (see MultiplexingWriteLoop), and we must make absolutely sure that if a connector is // returned to the pool, it is *never* written to unless properly dequeued from the Idle channel. if (Interlocked.Add(ref CommandsInFlightCount, -commandsRead) == 0) { // There's a race condition where the continuation of an asynchronous multiplexing write may not // have executed yet, and the flush may still be in progress. We know all I/O has already // been sent - because the reader has already consumed the entire resultset. So we wait until // the connector's write lock has been released (long waiting will never occur here). SpinWait.SpinUntil(() => MultiplexAsyncWritingLock == 0 || IsBroken); ResetReadBuffer(); _connectorSource.Return(this); } } Log.Trace("Exiting multiplexing read loop", Id); } catch (Exception e) { Debug.Assert(IsBroken); // Decrement the commands already dequeued from the in-flight counter Interlocked.Add(ref CommandsInFlightCount, -commandsRead); // When a connector is broken, the causing exception is stored on it. We fail commands with // that exception - rather than the one thrown here - since the break may have happened during // writing, and we want to bubble that one up. // Drain any pending in-flight commands and fail them. Note that some have only been written // to the buffer, and not sent to the server. command?.ExecutionCompletion.SetException(_breakReason!); try { while (true) { var pendingCommand = await CommandsInFlightReader.ReadAsync(); // TODO: the exception we have here is sometimes just the result of the write loop breaking // the connector, so it doesn't represent the actual root cause. pendingCommand.ExecutionCompletion.SetException(_breakReason!); } } catch (ChannelClosedException) { // All good, drained to the channel and failed all commands } // "Return" the connector to the pool to for cleanup (e.g. update total connector count) _connectorSource.Return(this); Log.Error("Exception in multiplexing read loop", e, Id); } Debug.Assert(CommandsInFlightCount == 0); } #endregion #region Frontend message processing /// /// Prepends a message to be sent at the beginning of the next message chain. /// internal void PrependInternalMessage(byte[] rawMessage, int responseMessageCount) { PendingPrependedResponses += responseMessageCount; var t = WritePregenerated(rawMessage); Debug.Assert(t.IsCompleted, "Could not fully write pregenerated message into the buffer"); } #endregion #region Backend message processing internal IBackendMessage ReadMessage(DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential) => ReadMessage(async: false, dataRowLoadingMode).GetAwaiter().GetResult(); internal ValueTask ReadMessage(bool async, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential) => ReadMessage(async, dataRowLoadingMode, readingNotifications: false)!; internal ValueTask ReadMessageWithNotifications(bool async) => ReadMessage(async, DataRowLoadingMode.NonSequential, readingNotifications: true); internal ValueTask ReadMessage( bool async, DataRowLoadingMode dataRowLoadingMode, bool readingNotifications) { if (PendingPrependedResponses > 0 || dataRowLoadingMode != DataRowLoadingMode.NonSequential || readingNotifications || ReadBuffer.ReadBytesLeft < 5) { return ReadMessageLong(this, async, dataRowLoadingMode, readingNotifications); } var messageCode = (BackendMessageCode)ReadBuffer.ReadByte(); switch (messageCode) { case BackendMessageCode.NoticeResponse: case BackendMessageCode.NotificationResponse: case BackendMessageCode.ParameterStatus: case BackendMessageCode.ErrorResponse: ReadBuffer.ReadPosition--; return ReadMessageLong(this, async, dataRowLoadingMode, readingNotifications: false); case BackendMessageCode.ReadyForQuery: break; } PGUtil.ValidateBackendMessageCode(messageCode); var len = ReadBuffer.ReadInt32() - 4; // Transmitted length includes itself if (len > ReadBuffer.ReadBytesLeft) { ReadBuffer.ReadPosition -= 5; return ReadMessageLong(this, async, dataRowLoadingMode, readingNotifications: false); } return new ValueTask(ParseServerMessage(ReadBuffer, messageCode, len, false)); static async ValueTask ReadMessageLong( NpgsqlConnector connector, bool async, DataRowLoadingMode dataRowLoadingMode, bool readingNotifications, bool isReadingPrependedMessage = false) { // First read the responses of any prepended messages. if (connector.PendingPrependedResponses > 0 && !isReadingPrependedMessage) { try { // TODO: There could be room for optimization here, rather than the async call(s) connector.ReadBuffer.Timeout = TimeSpan.FromMilliseconds(connector.InternalCommandTimeout); for (; connector.PendingPrependedResponses > 0; connector.PendingPrependedResponses--) await ReadMessageLong(connector, async, DataRowLoadingMode.Skip, readingNotifications: false, isReadingPrependedMessage: true); // We've read all the prepended response. // Allow cancellation to proceed. connector.ReadingPrependedMessagesMRE.Set(); } catch (Exception e) { // Prepended queries should never fail. // If they do, we're not even going to attempt to salvage the connector. throw connector.Break(e); } } PostgresException? error = null; try { connector.ReadBuffer.Timeout = TimeSpan.FromMilliseconds(connector.UserTimeout); while (true) { await connector.ReadBuffer.Ensure(5, async, readingNotifications); var messageCode = (BackendMessageCode)connector.ReadBuffer.ReadByte(); PGUtil.ValidateBackendMessageCode(messageCode); var len = connector.ReadBuffer.ReadInt32() - 4; // Transmitted length includes itself if ((messageCode == BackendMessageCode.DataRow && dataRowLoadingMode != DataRowLoadingMode.NonSequential) || messageCode == BackendMessageCode.CopyData) { if (dataRowLoadingMode == DataRowLoadingMode.Skip) { await connector.ReadBuffer.Skip(len, async); continue; } } else if (len > connector.ReadBuffer.ReadBytesLeft) { if (len > connector.ReadBuffer.Size) { var oversizeBuffer = connector.ReadBuffer.AllocateOversize(len); if (connector._origReadBuffer == null) connector._origReadBuffer = connector.ReadBuffer; else connector.ReadBuffer.Dispose(); connector.ReadBuffer = oversizeBuffer; } await connector.ReadBuffer.Ensure(len, async); } var msg = connector.ParseServerMessage(connector.ReadBuffer, messageCode, len, isReadingPrependedMessage); switch (messageCode) { case BackendMessageCode.ErrorResponse: Debug.Assert(msg == null); // An ErrorResponse is (almost) always followed by a ReadyForQuery. Save the error // and throw it as an exception when the ReadyForQuery is received (next). error = PostgresException.Load(connector.ReadBuffer, connector.Settings.IncludeErrorDetail); if (connector.State == ConnectorState.Connecting) { // During the startup/authentication phase, an ErrorResponse isn't followed by // an RFQ. Instead, the server closes the connection immediately throw error; } else if (PostgresErrorCodes.IsCriticalFailure(error, clusterError: false)) { // Consider the connection dead throw connector.Break(error); } continue; case BackendMessageCode.ReadyForQuery: if (error != null) { NpgsqlEventSource.Log.CommandFailed(); throw error; } break; // Asynchronous messages which can come anytime, they have already been handled // in ParseServerMessage. Read the next message. case BackendMessageCode.NoticeResponse: case BackendMessageCode.NotificationResponse: case BackendMessageCode.ParameterStatus: Debug.Assert(msg == null); if (!readingNotifications) continue; return null; } Debug.Assert(msg != null, "Message is null for code: " + messageCode); return msg; } } catch (PostgresException e) { // TODO: move it up the stack, like #3126 did (relevant for non-command-execution scenarios, like COPY) if (connector.CurrentReader is null) connector.EndUserAction(); if (e.SqlState == PostgresErrorCodes.QueryCanceled && connector.PostgresCancellationPerformed) { // The query could be canceled because of a user cancellation or a timeout - raise the proper exception. // If _postgresCancellationPerformed is false, this is an unsolicited cancellation - // just bubble up thePostgresException. throw connector.UserCancellationRequested ? new OperationCanceledException("Query was cancelled", e, connector.UserCancellationToken) : new NpgsqlException("Exception while reading from stream", new TimeoutException("Timeout during reading attempt")); } throw; } catch (NpgsqlException) { // An ErrorResponse isn't followed by ReadyForQuery if (error != null) ExceptionDispatchInfo.Capture(error).Throw(); throw; } } } internal IBackendMessage? ParseServerMessage(NpgsqlReadBuffer buf, BackendMessageCode code, int len, bool isPrependedMessage) { switch (code) { case BackendMessageCode.RowDescription: return _rowDescriptionMessage.Load(buf, TypeMapper); case BackendMessageCode.DataRow: return _dataRowMessage.Load(len); case BackendMessageCode.CommandComplete: return _commandCompleteMessage.Load(buf, len); case BackendMessageCode.ReadyForQuery: var rfq = _readyForQueryMessage.Load(buf); if (!isPrependedMessage) { // Transaction status on prepended messages shouldn't be processed, because there may be prepended messages // before the begin transaction message. In this case, they will contain transaction status Idle, which will // clear our Pending transaction status. Only process transaction status on RFQ's from user-provided, non // prepended messages. ProcessNewTransactionStatus(rfq.TransactionStatusIndicator); } return rfq; case BackendMessageCode.EmptyQueryResponse: return EmptyQueryMessage.Instance; case BackendMessageCode.ParseComplete: return ParseCompleteMessage.Instance; case BackendMessageCode.ParameterDescription: return _parameterDescriptionMessage.Load(buf); case BackendMessageCode.BindComplete: return BindCompleteMessage.Instance; case BackendMessageCode.NoData: return NoDataMessage.Instance; case BackendMessageCode.CloseComplete: return CloseCompletedMessage.Instance; case BackendMessageCode.ParameterStatus: ReadParameterStatus(buf.GetNullTerminatedBytes(), buf.GetNullTerminatedBytes()); return null; case BackendMessageCode.NoticeResponse: var notice = PostgresNotice.Load(buf, Settings.IncludeErrorDetail); Log.Debug($"Received notice: {notice.MessageText}", Id); Connection?.OnNotice(notice); return null; case BackendMessageCode.NotificationResponse: Connection?.OnNotification(new NpgsqlNotificationEventArgs(buf)); return null; case BackendMessageCode.AuthenticationRequest: var authType = (AuthenticationRequestType)buf.ReadInt32(); return authType switch { AuthenticationRequestType.AuthenticationOk => (AuthenticationRequestMessage)AuthenticationOkMessage.Instance, AuthenticationRequestType.AuthenticationCleartextPassword => AuthenticationCleartextPasswordMessage.Instance, AuthenticationRequestType.AuthenticationMD5Password => AuthenticationMD5PasswordMessage.Load(buf), AuthenticationRequestType.AuthenticationGSS => AuthenticationGSSMessage.Instance, AuthenticationRequestType.AuthenticationSSPI => AuthenticationSSPIMessage.Instance, AuthenticationRequestType.AuthenticationGSSContinue => AuthenticationGSSContinueMessage.Load(buf, len), AuthenticationRequestType.AuthenticationSASL => new AuthenticationSASLMessage(buf), AuthenticationRequestType.AuthenticationSASLContinue => new AuthenticationSASLContinueMessage(buf, len - 4), AuthenticationRequestType.AuthenticationSASLFinal => new AuthenticationSASLFinalMessage(buf, len - 4), _ => throw new NotSupportedException($"Authentication method not supported (Received: {authType})") }; case BackendMessageCode.BackendKeyData: return new BackendKeyDataMessage(buf); case BackendMessageCode.CopyInResponse: return (_copyInResponseMessage ??= new CopyInResponseMessage()).Load(ReadBuffer); case BackendMessageCode.CopyOutResponse: return (_copyOutResponseMessage ??= new CopyOutResponseMessage()).Load(ReadBuffer); case BackendMessageCode.CopyData: return (_copyDataMessage ??= new CopyDataMessage()).Load(len); case BackendMessageCode.CopyBothResponse: return (_copyBothResponseMessage ??= new CopyBothResponseMessage()).Load(ReadBuffer); case BackendMessageCode.CopyDone: return CopyDoneMessage.Instance; case BackendMessageCode.PortalSuspended: throw new NpgsqlException("Unimplemented message: " + code); case BackendMessageCode.ErrorResponse: return null; case BackendMessageCode.FunctionCallResponse: // We don't use the obsolete function call protocol throw new NpgsqlException("Unexpected backend message: " + code); default: throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {code} of enum {nameof(BackendMessageCode)}. Please file a bug."); } } /// /// Reads backend messages and discards them, stopping only after a message of the given type has /// been seen. Only a sync I/O version of this method exists - in async flows we inline the loop /// rather than calling an additional async method, in order to avoid the overhead. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] internal IBackendMessage SkipUntil(BackendMessageCode stopAt) { Debug.Assert(stopAt != BackendMessageCode.DataRow, "Shouldn't be used for rows, doesn't know about sequential"); while (true) { var msg = ReadMessage(async: false, DataRowLoadingMode.Skip).GetAwaiter().GetResult()!; Debug.Assert(!(msg is DataRowMessage)); if (msg.Code == stopAt) return msg; } } #endregion Backend message processing #region Transactions internal Task Rollback(bool async, CancellationToken cancellationToken = default) { Log.Debug("Rolling back transaction", Id); return ExecuteInternalCommand(PregeneratedMessages.RollbackTransaction, async, cancellationToken); } internal bool InTransaction => TransactionStatus switch { TransactionStatus.Idle => false, TransactionStatus.Pending => true, TransactionStatus.InTransactionBlock => true, TransactionStatus.InFailedTransactionBlock => true, _ => throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {TransactionStatus} of enum {nameof(TransactionStatus)}. Please file a bug.") }; /// /// Handles a new transaction indicator received on a ReadyForQuery message /// void ProcessNewTransactionStatus(TransactionStatus newStatus) { if (newStatus == TransactionStatus) return; TransactionStatus = newStatus; switch (newStatus) { case TransactionStatus.Idle: break; case TransactionStatus.InTransactionBlock: case TransactionStatus.InFailedTransactionBlock: // In multiplexing mode, we can't support transaction in SQL: the connector must be removed from the // writable connectors list, otherwise other commands may get written to it. So the user must tell us // about the transaction via BeginTransaction. if (Connection is null) { Debug.Assert(Settings.Multiplexing); throw new NotSupportedException("In multiplexing mode, transactions must be started with BeginTransaction"); } break; case TransactionStatus.Pending: throw new Exception($"Internal Npgsql bug: invalid TransactionStatus {nameof(TransactionStatus.Pending)} received, should be frontend-only"); default: throw new InvalidOperationException( $"Internal Npgsql bug: unexpected value {newStatus} of enum {nameof(TransactionStatus)}. Please file a bug."); } } internal void ClearTransaction(Exception? disposeReason = null) { Transaction?.DisposeImmediately(disposeReason); TransactionStatus = TransactionStatus.Idle; } #endregion #region SSL /// /// Returns whether SSL is being used for the connection /// internal bool IsSecure { get; private set; } /// /// Returns whether SCRAM-SHA256 is being user for the connection /// internal bool IsScram { get; private set; } /// /// Returns whether SCRAM-SHA256-PLUS is being user for the connection /// internal bool IsScramPlus { get; private set; } static readonly RemoteCertificateValidationCallback SslVerifyFullValidation = (sender, certificate, chain, sslPolicyErrors) => sslPolicyErrors == SslPolicyErrors.None; static readonly RemoteCertificateValidationCallback SslVerifyCAValidation = (sender, certificate, chain, sslPolicyErrors) => sslPolicyErrors == SslPolicyErrors.None || sslPolicyErrors == SslPolicyErrors.RemoteCertificateNameMismatch; static readonly RemoteCertificateValidationCallback SslTrustServerValidation = (sender, certificate, chain, sslPolicyErrors) => true; static RemoteCertificateValidationCallback SslRootValidation(string certRootPath, bool verifyFull) => (sender, certificate, chain, sslPolicyErrors) => { if (certificate is null || chain is null) return false; // No errors here - no reason to check further if (sslPolicyErrors == SslPolicyErrors.None) return true; // That's VerifyCA check and the only error is name mismatch - no reason to check further if (!verifyFull && sslPolicyErrors == SslPolicyErrors.RemoteCertificateNameMismatch) return true; // That's VerifyFull check and we have name mismatch - no reason to check further if (verifyFull && sslPolicyErrors.HasFlag(SslPolicyErrors.RemoteCertificateNameMismatch)) return false; var certs = new X509Certificate2Collection(); #if NET5_0_OR_GREATER if (Path.GetExtension(certRootPath).ToUpperInvariant() != ".PFX") certs.ImportFromPemFile(certRootPath); #endif if (certs.Count == 0) certs.Add(new X509Certificate2(certRootPath)); #if NET5_0_OR_GREATER chain.ChainPolicy.CustomTrustStore.AddRange(certs); chain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust; #endif chain.ChainPolicy.ExtraStore.AddRange(certs); return chain.Build(certificate as X509Certificate2 ?? new X509Certificate2(certificate)); }; #endregion SSL #region Cancel internal void ResetCancellation() { // If a cancellation is in progress, wait for it to "complete" before proceeding (#615) lock (CancelLock) { if (PendingPrependedResponses > 0) ReadingPrependedMessagesMRE.Reset(); Debug.Assert(ReadingPrependedMessagesMRE.IsSet || PendingPrependedResponses > 0); } } internal void PerformUserCancellation() { var connection = Connection; if (connection is null || connection.ConnectorBindingScope == ConnectorBindingScope.Reader || UserCancellationRequested) return; // Take the lock first to make sure there is no concurrent Break. // We should be safe to take it as Break only take it to set the state. lock (SyncObj) { // The connector is dead, exit gracefully. if (!IsConnected) return; // The connector is still alive, take the CancelLock before exiting SingleUseLock. // If a break will happen after, it's going to wait for the cancellation to complete. Monitor.Enter(CancelLock); } try { // Wait before we've read all responses for the prepended queries // as we can't gracefully handle their cancellation. // Break makes sure that it's going to be set even if we fail while reading them. // We don't wait indefinitely to avoid deadlocks from synchronous CancellationToken.Register // See #5032 if (!ReadingPrependedMessagesMRE.Wait(0)) return; _userCancellationRequested = true; if (AttemptPostgresCancellation && SupportsPostgresCancellation) { var cancellationTimeout = Settings.CancellationTimeout; if (PerformPostgresCancellation() && cancellationTimeout >= 0) { if (cancellationTimeout > 0) { UserTimeout = cancellationTimeout; ReadBuffer.Timeout = TimeSpan.FromMilliseconds(cancellationTimeout); ReadBuffer.Cts.CancelAfter(cancellationTimeout); } return; } } UserTimeout = -1; ReadBuffer.Timeout = _cancelImmediatelyTimeout; ReadBuffer.Cts.Cancel(); } finally { Monitor.Exit(CancelLock); } } /// /// Creates another connector and sends a cancel request through it for this connector. This method never throws, but returns /// whether the cancellation attempt failed. /// /// /// /// if the cancellation request was successfully delivered, or if it was skipped because a previous /// request was already sent. if the cancellation request could not be delivered because of an exception /// (the method logs internally). /// /// /// This does not indicate whether the cancellation attempt was successful on the PostgreSQL side - only if the request was /// delivered. /// /// internal bool PerformPostgresCancellation() { Debug.Assert(BackendProcessId != 0, "PostgreSQL cancellation requested by the backend doesn't support it"); lock (CancelLock) { if (PostgresCancellationPerformed) return true; Log.Debug("Sending cancellation...", Id); PostgresCancellationPerformed = true; try { var cancelConnector = new NpgsqlConnector(this); cancelConnector.DoCancelRequest(BackendProcessId, _backendSecretKey); } catch (Exception e) { var socketException = e.InnerException as SocketException; if (socketException == null || socketException.SocketErrorCode != SocketError.ConnectionReset) { Log.Debug("Exception caught while attempting to cancel command", e, Id); return false; } } return true; } } void DoCancelRequest(int backendProcessId, int backendSecretKey) { Debug.Assert(State == ConnectorState.Closed); try { RawOpen(Settings.SslMode, new NpgsqlTimeout(TimeSpan.FromSeconds(ConnectionTimeout)), false, CancellationToken.None) .GetAwaiter().GetResult(); WriteCancelRequest(backendProcessId, backendSecretKey); Flush(); Debug.Assert(ReadBuffer.ReadBytesLeft == 0); // Now wait for the server to close the connection, better chance of the cancellation // actually being delivered before we continue with the user's logic. var count = _stream.Read(ReadBuffer.Buffer, 0, 1); if (count > 0) Log.Error("Received response after sending cancel request, shouldn't happen! First byte: " + ReadBuffer.Buffer[0]); } finally { FullCleanup(); } } [MethodImpl(MethodImplOptions.AggressiveInlining)] internal CancellationTokenRegistration StartCancellableOperation( CancellationToken cancellationToken = default, bool attemptPgCancellation = true) { _userCancellationRequested = PostgresCancellationPerformed = false; UserCancellationToken = cancellationToken; ReadBuffer.Cts.ResetCts(); AttemptPostgresCancellation = attemptPgCancellation; return _cancellationTokenRegistration = cancellationToken.Register(static c => ((NpgsqlConnector)c!).PerformUserCancellation(), this); } /// /// Starts a new cancellable operation within an ongoing user action. This should only be used if a single user /// action spans several different actions which each has its own cancellation tokens. For example, a command /// execution is a single user action, but spans ExecuteReaderQuery, NextResult, Read and so forth. /// /// /// Only one level of nested operations is supported. It is an error to call this method if it has previously /// been called, and the returned was not disposed. /// /// /// The cancellation token provided by the user. Callbacks will be registered on this token for executing the /// cancellation, and the token will be included in any thrown . /// /// /// If , PostgreSQL cancellation will be attempted when the user requests cancellation or /// a timeout occurs, followed by a client-side socket cancellation once /// has elapsed. If , /// PostgreSQL cancellation will be skipped and client-socket cancellation will occur immediately. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] internal CancellationTokenRegistration StartNestedCancellableOperation( CancellationToken cancellationToken = default, bool attemptPgCancellation = true) { UserCancellationToken = cancellationToken; AttemptPostgresCancellation = attemptPgCancellation; return _cancellationTokenRegistration = cancellationToken.Register(static c => ((NpgsqlConnector)c!).PerformUserCancellation(), this); } #endregion Cancel #region Close / Reset /// /// Closes ongoing operations, i.e. an open reader exists or a COPY operation still in progress, as /// part of a connection close. /// internal async Task CloseOngoingOperations(bool async) { var reader = CurrentReader; var copyOperation = CurrentCopyOperation; if (reader != null) await reader.Close(connectionClosing: true, async, isDisposing: false); else if (copyOperation != null) { // TODO: There's probably a race condition as the COPY operation may finish on its own during the next few lines // Note: we only want to cancel import operations, since in these cases cancel is safe. // Export cancellations go through the PostgreSQL "asynchronous" cancel mechanism and are // therefore vulnerable to the race condition in #615. if (copyOperation is NpgsqlBinaryImporter || copyOperation is NpgsqlCopyTextWriter || copyOperation is NpgsqlRawCopyStream rawCopyStream && rawCopyStream.CanWrite) { try { if (async) await copyOperation.CancelAsync(); else copyOperation.Cancel(); } catch (Exception e) { Log.Warn("Error while cancelling COPY on connector close", e, Id); } } try { if (async) await copyOperation.DisposeAsync(); else copyOperation.Dispose(); } catch (Exception e) { Log.Warn("Error while disposing cancelled COPY on connector close", e, Id); } } } // TODO in theory this should be async-optional, but the only I/O done here is the Terminate Flush, which is // very unlikely to block (plus locking would need to be worked out) internal void Close() { lock (SyncObj) { Log.Trace("Closing connector", Id); if (IsReady) { try { // At this point, there could be some prepended commands (like DISCARD ALL) // which make no sense to send on connection close // see https://github.com/npgsql/npgsql/issues/3592 WriteBuffer.Clear(); WriteTerminate(); Flush(); } catch (Exception e) { Log.Error("Exception while closing connector", e, Id); Debug.Assert(IsBroken); } } switch (State) { case ConnectorState.Broken: case ConnectorState.Closed: return; } State = ConnectorState.Closed; } FullCleanup(); } internal bool TryRemovePendingEnlistedConnector(Transaction transaction) => _connectorSource.TryRemovePendingEnlistedConnector(this, transaction); internal void Return() => _connectorSource.Return(this); /// public void Dispose() => Close(); /// /// Called when an unexpected message has been received during an action. Breaks the /// connector and returns the appropriate message. /// internal Exception UnexpectedMessageReceived(BackendMessageCode received) => throw Break(new Exception($"Received unexpected backend message {received}. Please file a bug.")); /// /// Called when a connector becomes completely unusable, e.g. when an unexpected I/O exception is raised or when /// we lose protocol sync. /// Note that fatal errors during the Open phase do *not* pass through here. /// /// The exception that caused the break. /// The exception given in for chaining calls. internal Exception Break(Exception reason) { Debug.Assert(!IsClosed); Monitor.Enter(SyncObj); if (State == ConnectorState.Broken) { // We're already broken. // Exit SingleUseLock to unblock other threads (like cancellation). Monitor.Exit(SyncObj); // Wait for the break to complete before going forward. lock (CleanupLock) { } return reason; } try { // If we're broken while reading prepended messages // the cancellation request might still be waiting on the MRE. // Unblock it. ReadingPrependedMessagesMRE.Set(); Log.Error("Breaking connector", reason, Id); // Note that we may be reading and writing from the same connector concurrently, so safely set // the original reason for the break before actually closing the socket etc. Interlocked.CompareExchange(ref _breakReason, reason, null); State = ConnectorState.Broken; // Take the CleanupLock while in SingleUseLock to make sure concurrent Break doesn't take it first. Monitor.Enter(CleanupLock); } finally { // Unblock other threads (like cancellation) to proceed and exit gracefully. Monitor.Exit(SyncObj); } try { lock (CancelLock) { // Note we only set the cluster to offline and clear the pool if the connection is being broken (we're in this method), // *and* the exception indicates that the PG cluster really is down; the latter includes any IO/timeout issue, // but does not include e.g. authentication failure or timeouts with disabled cancellation. if (reason is NpgsqlException { IsTransient: true } ne && (ne.InnerException is not TimeoutException || Settings.CancellationTimeout != -1) || reason is PostgresException pe && PostgresErrorCodes.IsCriticalFailure(pe)) { ClusterStateCache.UpdateClusterState(Host, Port, ClusterState.Offline, DateTime.UtcNow, Settings.HostRecheckSecondsTranslated); _connectorSource.Clear(); } var connection = Connection; FullCleanup(); if (connection is not null) { var closeLockTaken = connection.TakeCloseLock(); Debug.Assert(closeLockTaken); if (Settings.ReplicationMode == ReplicationMode.Off) { Connection = null; if (connection.ConnectorBindingScope != ConnectorBindingScope.None) Return(); connection.EnlistedTransaction = null; connection.Connector = null; connection.ConnectorBindingScope = ConnectorBindingScope.None; } connection.FullState = ConnectionState.Broken; connection.ReleaseCloseLock(); } } return reason; } finally { Monitor.Exit(CleanupLock); } } void FullCleanup() { lock (CleanupLock) { if (Settings.Multiplexing) { FlagAsNotWritableForMultiplexing(); // Note that in multiplexing, this could be called from the read loop, while the write loop is // writing into the channel. To make sure this race condition isn't a problem, the channel currently // isn't set up with SingleWriter (since at this point it doesn't do anything). CommandsInFlightWriter!.Complete(); // The connector's read loop has a continuation to observe and log any exception coming out // (see Open) } Log.Trace("Cleaning up connector", Id); Cleanup(); if (_isKeepAliveEnabled) { _userLock!.Dispose(); _userLock = null; _keepAliveTimer!.Change(Timeout.Infinite, Timeout.Infinite); _keepAliveTimer.Dispose(); } } } /// /// Closes the socket and cleans up client-side resources associated with this connector. /// /// /// This method doesn't actually perform any meaningful I/O, and therefore is sync-only. /// void Cleanup() { try { _stream?.Dispose(); } catch { // ignored } if (CurrentReader != null) { CurrentReader.Command.State = CommandState.Idle; try { // Note that this never actually blocks on I/O, since the stream is also closed // (which is why we don't need to call CloseAsync) CurrentReader.Close(); } catch { // ignored } CurrentReader = null; } if (CurrentCopyOperation != null) { try { // Note that this never actually blocks on I/O, since the stream is also closed // (which is why we don't need to call DisposeAsync) CurrentCopyOperation.Dispose(); } catch { // ignored } CurrentCopyOperation = null; } ClearTransaction(_breakReason); _stream = null!; _baseStream = null!; _origReadBuffer?.Dispose(); _origReadBuffer = null; ReadBuffer?.Dispose(); ReadBuffer = null!; WriteBuffer?.Dispose(); WriteBuffer = null!; Connection = null; PostgresParameters.Clear(); _currentCommand = null; if (_certificate is not null) { _certificate.Dispose(); _certificate = null; } } void GenerateResetMessage() { var sb = new StringBuilder("SET SESSION AUTHORIZATION DEFAULT;RESET ALL;"); _resetWithoutDeallocateResponseCount = 2; if (DatabaseInfo.SupportsCloseAll) { sb.Append("CLOSE ALL;"); _resetWithoutDeallocateResponseCount++; } if (DatabaseInfo.SupportsUnlisten) { sb.Append("UNLISTEN *;"); _resetWithoutDeallocateResponseCount++; } if (DatabaseInfo.SupportsAdvisoryLocks) { sb.Append("SELECT pg_advisory_unlock_all();"); _resetWithoutDeallocateResponseCount += 2; } if (DatabaseInfo.SupportsDiscardSequences) { sb.Append("DISCARD SEQUENCES;"); _resetWithoutDeallocateResponseCount++; } if (DatabaseInfo.SupportsDiscardTemp) { sb.Append("DISCARD TEMP"); _resetWithoutDeallocateResponseCount++; } _resetWithoutDeallocateResponseCount++; // One ReadyForQuery at the end _resetWithoutDeallocateMessage = PregeneratedMessages.Generate(WriteBuffer, sb.ToString()); } /// /// Called when a pooled connection is closed, and its connector is returned to the pool. /// Resets the connector back to its initial state, releasing server-side sources /// (e.g. prepared statements), resetting parameters to their defaults, and resetting client-side /// state /// internal async Task Reset(bool async) { bool endBindingScope; // We start user action in case a keeplive happens concurrently, or a concurrent user command (bug) using (StartUserAction(attemptPgCancellation: false)) { // Our buffer may contain unsent prepended messages, so clear it out. // In practice, this is (currently) only done when beginning a transaction or a transaction savepoint. WriteBuffer.Clear(); PendingPrependedResponses = 0; ResetReadBuffer(); Transaction?.UnbindIfNecessary(); // Must rollback transaction before sending DISCARD ALL switch (TransactionStatus) { case TransactionStatus.Idle: // There is an undisposed transaction on multiplexing connection endBindingScope = Connection?.ConnectorBindingScope == ConnectorBindingScope.Transaction; break; case TransactionStatus.Pending: // BeginTransaction() was called, but was left in the write buffer and not yet sent to server. // Just clear the transaction state. ProcessNewTransactionStatus(TransactionStatus.Idle); ClearTransaction(); endBindingScope = true; break; case TransactionStatus.InTransactionBlock: case TransactionStatus.InFailedTransactionBlock: await Rollback(async); ClearTransaction(); endBindingScope = true; break; default: throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {TransactionStatus} of enum {nameof(TransactionStatus)}. Please file a bug."); } if (_sendResetOnClose) { if (PreparedStatementManager.NumPrepared > 0) { // We have prepared statements, so we can't reset the connection state with DISCARD ALL // Note: the send buffer has been cleared above, and we assume all this will fit in it. PrependInternalMessage(_resetWithoutDeallocateMessage!, _resetWithoutDeallocateResponseCount); } else { // There are no prepared statements. // We simply send DISCARD ALL which is more efficient than sending the above messages separately PrependInternalMessage(PregeneratedMessages.DiscardAll, 2); } } DataReader.UnbindIfNecessary(); } if (endBindingScope) { // Connection is null if a connection enlisted in a TransactionScope was closed before the // TransactionScope completed - the connector is still enlisted, but has no connection. Connection?.EndBindingScope(ConnectorBindingScope.Transaction); } } /// /// The connector may have allocated an oversize read buffer, to hold big rows in non-sequential reading. /// This switches us back to the original one and returns the buffer to . /// [MethodImpl(MethodImplOptions.AggressiveInlining)] void ResetReadBuffer() { if (_origReadBuffer != null) { Debug.Assert(_origReadBuffer.ReadBytesLeft == 0); Debug.Assert(_origReadBuffer.ReadPosition == 0); if (ReadBuffer.ReadBytesLeft > 0) { // There is still something in the buffer which we haven't read yet // In most cases it's ParameterStatus which can be sent asynchronously // If in some extreme case we have too much data left in the buffer to store in the original buffer // we just leave the oversize buffer as is and will try again on next reset if (ReadBuffer.ReadBytesLeft > _origReadBuffer.Size) return; ReadBuffer.CopyTo(_origReadBuffer); } ReadBuffer.Dispose(); ReadBuffer = _origReadBuffer; _origReadBuffer = null; } } internal void UnprepareAll() { ExecuteInternalCommand("DEALLOCATE ALL"); PreparedStatementManager.ClearAll(); } #endregion Close / Reset #region Locking internal UserAction StartUserAction(CancellationToken cancellationToken = default, bool attemptPgCancellation = true) => StartUserAction(ConnectorState.Executing, command: null, cancellationToken, attemptPgCancellation); internal UserAction StartUserAction( ConnectorState newState, CancellationToken cancellationToken = default, bool attemptPgCancellation = true) => StartUserAction(newState, command: null, cancellationToken, attemptPgCancellation); /// /// Starts a user action. This makes sure that another action isn't already in progress, handles synchronization with keepalive, /// and sets up cancellation. /// /// The new state to be set when entering this user action. /// /// The that is starting execution - if an is /// thrown, it will reference this. /// /// /// The cancellation token provided by the user. Callbacks will be registered on this token for executing the cancellation, /// and the token will be included in any thrown . /// /// /// If , PostgreSQL cancellation will be attempted when the user requests cancellation or a timeout /// occurs, followed by a client-side socket cancellation once has /// elapsed. If , PostgreSQL cancellation will be skipped and client-socket cancellation will occur /// immediately. /// internal UserAction StartUserAction( ConnectorState newState, NpgsqlCommand? command, CancellationToken cancellationToken = default, bool attemptPgCancellation = true) { // If keepalive is enabled, we must protect state transitions with a SemaphoreSlim // (which itself must be protected by a lock, since its dispose isn't thread-safe). // This will make the keepalive abort safely if a user query is in progress, and make // the user query wait if a keepalive is in progress. // If keepalive isn't enabled, we don't use the semaphore and rely only on the connector's // state (updated via Interlocked.Exchange) to detect concurrent use, on a best-effort basis. if (!_isKeepAliveEnabled) return DoStartUserAction(); lock (SyncObj) { if (!IsConnected) { throw IsBroken ? new NpgsqlException("The connection was previously broken because of the following exception", _breakReason) : new NpgsqlException("The connection is closed"); } if (!_userLock!.Wait(0)) { var currentCommand = _currentCommand; throw currentCommand == null ? new NpgsqlOperationInProgressException(State) : new NpgsqlOperationInProgressException(currentCommand); } try { // Disable keepalive, it will be restarted at the end of the user action _keepAliveTimer!.Change(Timeout.Infinite, Timeout.Infinite); // We now have both locks and are sure nothing else is running. // Check that the connector is ready. return DoStartUserAction(); } catch { _userLock.Release(); throw; } } UserAction DoStartUserAction() { switch (State) { case ConnectorState.Ready: break; case ConnectorState.Closed: case ConnectorState.Broken: throw new InvalidOperationException("Connection is not open"); case ConnectorState.Executing: case ConnectorState.Fetching: case ConnectorState.Waiting: case ConnectorState.Replication: case ConnectorState.Connecting: case ConnectorState.Copy: var currentCommand = _currentCommand; throw currentCommand == null ? new NpgsqlOperationInProgressException(State) : new NpgsqlOperationInProgressException(currentCommand); default: throw new ArgumentOutOfRangeException(nameof(State), State, "Invalid connector state: " + State); } Debug.Assert(IsReady); cancellationToken.ThrowIfCancellationRequested(); Log.Trace("Start user action", Id); State = newState; _currentCommand = command; StartCancellableOperation(cancellationToken, attemptPgCancellation); // We reset the UserTimeout for every user action, so it wouldn't leak from the previous query or action // For example, we might have successfully cancelled the previous query (so the connection is not broken) // But the next time, we call the Prepare, which doesn't set it's own timeout UserTimeout = (command?.CommandTimeout ?? Settings.CommandTimeout) * 1000; return new UserAction(this); } } internal void EndUserAction() { Debug.Assert(CurrentReader == null); _cancellationTokenRegistration.Dispose(); if (_isKeepAliveEnabled) { lock (SyncObj) { if (IsReady || !IsConnected) return; var keepAlive = Settings.KeepAlive * 1000; _keepAliveTimer!.Change(keepAlive, keepAlive); Log.Trace("End user action", Id); _currentCommand = null; _userLock!.Release(); State = ConnectorState.Ready; } } else { if (IsReady || !IsConnected) return; Log.Trace("End user action", Id); _currentCommand = null; State = ConnectorState.Ready; } } /// /// An IDisposable wrapper around . /// internal readonly struct UserAction : IDisposable { readonly NpgsqlConnector _connector; internal UserAction(NpgsqlConnector connector) => _connector = connector; public void Dispose() => _connector.EndUserAction(); } #endregion #region Keepalive #pragma warning disable CA1801 // Review unused parameters void PerformKeepAlive(object? state) { Debug.Assert(_isKeepAliveEnabled); // SemaphoreSlim.Dispose() isn't thread-safe - it may be in progress so we shouldn't try to wait on it; // we need a standard lock to protect it. if (!Monitor.TryEnter(SyncObj)) return; try { // There may already be a user action, or the connector may be closed etc. if (!IsReady) return; Log.Trace("Performing keepalive", Id); AttemptPostgresCancellation = false; var timeout = InternalCommandTimeout; WriteBuffer.Timeout = TimeSpan.FromSeconds(timeout); UserTimeout = timeout; WriteSync(async: false); Flush(); SkipUntil(BackendMessageCode.ReadyForQuery); Log.Trace("Performed keepalive", Id); } catch (Exception e) { Log.Error("Keepalive failure", e, Id); try { Break(new NpgsqlException("Exception while sending a keepalive", e)); } catch (Exception e2) { Log.Error("Further exception while breaking connector on keepalive failure", e2, Id); } } finally { Monitor.Exit(SyncObj); } } #pragma warning restore CA1801 // Review unused parameters #endregion #region Wait internal async Task Wait(bool async, int timeout, CancellationToken cancellationToken = default) { using var _ = StartUserAction(ConnectorState.Waiting, cancellationToken: cancellationToken, attemptPgCancellation: false); // We may have prepended messages in the connection's write buffer - these need to be flushed now. await Flush(async, cancellationToken); var keepaliveMs = Settings.KeepAlive * 1000; while (true) { cancellationToken.ThrowIfCancellationRequested(); var timeoutForKeepalive = _isKeepAliveEnabled && (timeout <= 0 || keepaliveMs < timeout); UserTimeout = timeoutForKeepalive ? keepaliveMs : timeout; try { var msg = await ReadMessageWithNotifications(async); if (msg != null) { throw Break( new NpgsqlException($"Received unexpected message of type {msg.Code} while waiting")); } return true; } catch (NpgsqlException e) when (e.InnerException is TimeoutException) { if (!timeoutForKeepalive) // We really timed out return false; } // Time for a keepalive var keepaliveTime = Stopwatch.StartNew(); await WriteSync(async, cancellationToken); await Flush(async, cancellationToken); var receivedNotification = false; var expectedMessageCode = BackendMessageCode.RowDescription; while (true) { IBackendMessage? msg; try { msg = await ReadMessageWithNotifications(async); } catch (Exception e) when (e is OperationCanceledException || e is NpgsqlException npgEx && npgEx.InnerException is TimeoutException) { // We're somewhere in the middle of a reading keepalive messages // Breaking the connection, as we've lost protocol sync throw Break(e); } if (msg == null) { receivedNotification = true; continue; } if (msg.Code != BackendMessageCode.ReadyForQuery) throw new NpgsqlException($"Received unexpected message of type {msg.Code} while expecting {expectedMessageCode} as part of keepalive"); Log.Trace("Performed keepalive", Id); if (receivedNotification) return true; // Notification was received during the keepalive cancellationToken.ThrowIfCancellationRequested(); break; } if (timeout > 0) timeout -= (keepaliveMs + (int)keepaliveTime.ElapsedMilliseconds); } } #endregion #region Supported features and PostgreSQL settings internal bool UseConformingStrings { get; private set; } /// /// The connection's timezone as reported by PostgreSQL, in the IANA/Olson database format. /// internal string Timezone { get; private set; } = default!; bool? _isTransactionReadOnly; bool? _isHotStandBy; #endregion Supported features and PostgreSQL settings #region Execute internal command internal void ExecuteInternalCommand(string query) => ExecuteInternalCommand(query, false).GetAwaiter().GetResult(); internal async Task ExecuteInternalCommand(string query, bool async, CancellationToken cancellationToken = default) { Log.Trace($"Executing internal command: {query}", Id); await WriteQuery(query, async, cancellationToken); await Flush(async, cancellationToken); Expect(await ReadMessage(async), this); Expect(await ReadMessage(async), this); } internal async Task ExecuteInternalCommand(byte[] data, bool async, CancellationToken cancellationToken = default) { Debug.Assert(State != ConnectorState.Ready, "Forgot to start a user action..."); Log.Trace("Executing internal pregenerated command", Id); await WritePregenerated(data, async, cancellationToken); await Flush(async, cancellationToken); Expect(await ReadMessage(async), this); Expect(await ReadMessage(async), this); } #endregion #region Misc /// /// Creates and returns a object associated with the . /// /// The text of the query. /// A object. public NpgsqlCommand CreateCommand(string? cmdText = null) => new(cmdText, this); /// /// Creates and returns a object associated with the . /// /// A object. public NpgsqlBatch CreateBatch() => new NpgsqlBatch(this); void ReadParameterStatus(ReadOnlySpan incomingName, ReadOnlySpan incomingValue) { byte[] rawName; byte[] rawValue; for (var i = 0; i < _rawParameters.Count; i++) { (var currentName, var currentValue) = _rawParameters[i]; if (incomingName.SequenceEqual(currentName)) { if (incomingValue.SequenceEqual(currentValue)) return; rawName = currentName; rawValue = incomingValue.ToArray(); _rawParameters[i] = (rawName, rawValue); goto ProcessParameter; } } rawName = incomingName.ToArray(); rawValue = incomingValue.ToArray(); _rawParameters.Add((rawName, rawValue)); ProcessParameter: var name = TextEncoding.GetString(rawName); var value = TextEncoding.GetString(rawValue); PostgresParameters[name] = value; switch (name) { case "standard_conforming_strings": if (value != "on" && Settings.Multiplexing) throw Break(new NotSupportedException("standard_conforming_strings must be on with multiplexing")); UseConformingStrings = value == "on"; return; case "TimeZone": Timezone = value; return; case "default_transaction_read_only": _isTransactionReadOnly = value == "on"; UpdateClusterState(); return; case "in_hot_standby": _isHotStandBy = value == "on"; UpdateClusterState(); return; } } ClusterState? UpdateClusterState() { if (_isTransactionReadOnly.HasValue && _isHotStandBy.HasValue) { var state = _isHotStandBy.Value ? ClusterState.Standby : _isTransactionReadOnly.Value ? ClusterState.PrimaryReadOnly : ClusterState.PrimaryReadWrite; return ClusterStateCache.UpdateClusterState(Settings.Host!, Settings.Port, state, DateTime.UtcNow, Settings.HostRecheckSecondsTranslated); } return null; } #endregion Misc } #region Enums /// /// Expresses the exact state of a connector. /// enum ConnectorState { /// /// The connector has either not yet been opened or has been closed. /// Closed, /// /// The connector is currently connecting to a PostgreSQL server. /// Connecting, /// /// The connector is connected and may be used to send a new query. /// Ready, /// /// The connector is waiting for a response to a query which has been sent to the server. /// Executing, /// /// The connector is currently fetching and processing query results. /// Fetching, /// /// The connector is currently waiting for asynchronous notifications to arrive. /// Waiting, /// /// The connection was broken because an unexpected error occurred which left it in an unknown state. /// This state isn't implemented yet. /// Broken, /// /// The connector is engaged in a COPY operation. /// Copy, /// /// The connector is engaged in streaming replication. /// Replication, } #pragma warning disable CA1717 enum TransactionStatus : byte #pragma warning restore CA1717 { /// /// Currently not in a transaction block /// Idle = (byte)'I', /// /// Currently in a transaction block /// InTransactionBlock = (byte)'T', /// /// Currently in a failed transaction block (queries will be rejected until block is ended) /// InFailedTransactionBlock = (byte)'E', /// /// A new transaction has been requested but not yet transmitted to the backend. It will be transmitted /// prepended to the next query. /// This is a client-side state option only, and is never transmitted from the backend. /// Pending = byte.MaxValue, } /// /// Specifies how to load/parse DataRow messages as they're received from the backend. /// internal enum DataRowLoadingMode { /// /// Load DataRows in non-sequential mode /// NonSequential, /// /// Load DataRows in sequential mode /// Sequential, /// /// Skip DataRow messages altogether /// Skip } #endregion