using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Globalization;
using Npgsql.BackendMessages;
using Npgsql.Logging;
using Npgsql.Util;
using NpgsqlTypes;
using static Npgsql.Util.Statics;
using System.Collections;
using System.Diagnostics.CodeAnalysis;
using Npgsql.Internal;
namespace Npgsql;
///
/// Represents a SQL statement or function (stored procedure) to execute
/// against a PostgreSQL database. This class cannot be inherited.
///
// ReSharper disable once RedundantNameQualifier
[System.ComponentModel.DesignerCategory("")]
public sealed class NpgsqlCommand : DbCommand, ICloneable, IComponent
{
#region Fields
NpgsqlConnection? _connection;
readonly NpgsqlConnector? _connector;
///
/// If this command is (explicitly) prepared, references the connector on which the preparation happened.
/// Used to detect when the connector was changed (i.e. connection open/close), meaning that the command
/// is no longer prepared.
///
NpgsqlConnector? _connectorPreparedOn;
string _commandText;
CommandBehavior _behavior;
int? _timeout;
readonly NpgsqlParameterCollection _parameters;
///
/// Whether this is wrapped by an .
///
internal bool IsWrappedByBatch { get; }
internal List InternalBatchCommands { get; }
Activity? CurrentActivity;
///
/// Returns details about each statement that this command has executed.
/// Is only populated when an Execute* method is called.
///
[Obsolete("Use the new DbBatch API")]
public IReadOnlyList Statements => InternalBatchCommands.AsReadOnly();
UpdateRowSource _updateRowSource = UpdateRowSource.Both;
bool IsExplicitlyPrepared => _connectorPreparedOn != null;
///
/// Whether this command is cached by and returned by .
///
internal bool IsCached { get; set; }
#if DEBUG
internal static bool EnableSqlRewriting;
#else
internal static readonly bool EnableSqlRewriting;
#endif
static readonly List EmptyParameters = new();
static readonly SingleThreadSynchronizationContext SingleThreadSynchronizationContext = new("NpgsqlRemainingAsyncSendWorker");
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlCommand));
#endregion Fields
#region Constants
internal const int DefaultTimeout = 30;
#endregion
#region Constructors
static NpgsqlCommand()
=> EnableSqlRewriting = !AppContext.TryGetSwitch("Npgsql.EnableSqlRewriting", out var enabled) || enabled;
///
/// Initializes a new instance of the class.
///
public NpgsqlCommand() : this(null, null, null) {}
///
/// Initializes a new instance of the class with the text of the query.
///
/// The text of the query.
// ReSharper disable once IntroduceOptionalParameters.Global
public NpgsqlCommand(string? cmdText) : this(cmdText, null, null) {}
///
/// Initializes a new instance of the class with the text of the query and a
/// .
///
/// The text of the query.
/// A that represents the connection to a PostgreSQL server.
// ReSharper disable once IntroduceOptionalParameters.Global
public NpgsqlCommand(string? cmdText, NpgsqlConnection? connection) : this(cmdText, connection, null) {}
///
/// Initializes a new instance of the class with the text of the query, a
/// , and the .
///
/// The text of the query.
/// A that represents the connection to a PostgreSQL server.
/// The in which the executes.
public NpgsqlCommand(string? cmdText, NpgsqlConnection? connection, NpgsqlTransaction? transaction)
{
GC.SuppressFinalize(this);
InternalBatchCommands = new List(1);
_parameters = new NpgsqlParameterCollection();
_commandText = cmdText ?? string.Empty;
_connection = connection;
Transaction = transaction;
CommandType = CommandType.Text;
}
///
/// Used when this instance is wrapped inside an .
///
internal NpgsqlCommand(List batchCommands)
{
GC.SuppressFinalize(this);
InternalBatchCommands = batchCommands;
CommandType = CommandType.Text;
IsWrappedByBatch = true;
// These can/should never be used in this mode
_commandText = null!;
_parameters = null!;
}
internal NpgsqlCommand(string? cmdText, NpgsqlConnector connector) : this(cmdText)
=> _connector = connector;
///
/// Used when this instance is wrapped inside an .
///
internal NpgsqlCommand(NpgsqlConnector connector, List batchCommands)
: this(batchCommands)
=> _connector = connector;
internal static NpgsqlCommand CreateCachedCommand(NpgsqlConnection connection)
=> new(null, connection) { IsCached = true };
#endregion Constructors
#region Public properties
///
/// Gets or sets the SQL statement or function (stored procedure) to execute at the data source.
///
/// The Transact-SQL statement or stored procedure to execute. The default is an empty string.
[AllowNull, DefaultValue("")]
[Category("Data")]
public override string CommandText
{
get => _commandText;
set
{
Debug.Assert(!IsWrappedByBatch);
_commandText = State == CommandState.Idle
? value ?? string.Empty
: throw new InvalidOperationException("An open data reader exists for this command.");
ResetExplicitPreparation();
// TODO: Technically should do this also if the parameter list (or type) changes
}
}
///
/// Gets or sets the wait time (in seconds) before terminating the attempt to execute a command and generating an error.
///
/// The time (in seconds) to wait for the command to execute. The default value is 30 seconds.
[DefaultValue(DefaultTimeout)]
public override int CommandTimeout
{
get => _timeout ?? (_connection?.CommandTimeout ?? DefaultTimeout);
set
{
if (value < 0) {
throw new ArgumentOutOfRangeException(nameof(value), value, "CommandTimeout can't be less than zero.");
}
_timeout = value;
}
}
///
/// Gets or sets a value indicating how the property is to be interpreted.
///
///
/// One of the values. The default is .
///
[DefaultValue(CommandType.Text)]
[Category("Data")]
public override CommandType CommandType { get; set; }
///
/// DB connection.
///
protected override DbConnection? DbConnection
{
get => _connection;
set => _connection = (NpgsqlConnection?)value;
}
///
/// Gets or sets the used by this instance of the .
///
/// The connection to a data source. The default value is .
[DefaultValue(null)]
[Category("Behavior")]
public new NpgsqlConnection? Connection
{
get => _connection;
set
{
if (_connection == value)
return;
_connection = State == CommandState.Idle
? value
: throw new InvalidOperationException("An open data reader exists for this command.");
Transaction = null;
}
}
///
/// Design time visible.
///
public override bool DesignTimeVisible { get; set; }
///
/// Gets or sets how command results are applied to the DataRow when used by the
/// DbDataAdapter.Update(DataSet) method.
///
/// One of the values.
[Category("Behavior"), DefaultValue(UpdateRowSource.Both)]
public override UpdateRowSource UpdatedRowSource
{
get => _updateRowSource;
set
{
switch (value)
{
// validate value (required based on base type contract)
case UpdateRowSource.None:
case UpdateRowSource.OutputParameters:
case UpdateRowSource.FirstReturnedRecord:
case UpdateRowSource.Both:
_updateRowSource = value;
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}
///
/// Returns whether this query will execute as a prepared (compiled) query.
///
public bool IsPrepared =>
_connectorPreparedOn == (_connection?.Connector ?? _connector) &&
InternalBatchCommands.Any() && InternalBatchCommands.All(s => s.PreparedStatement?.IsPrepared == true);
#endregion Public properties
#region Known/unknown Result Types Management
///
/// Marks all of the query's result columns as either known or unknown.
/// Unknown results column are requested them from PostgreSQL in text format, and Npgsql makes no
/// attempt to parse them. They will be accessible as strings only.
///
public bool AllResultTypesAreUnknown
{
get => _allResultTypesAreUnknown;
set
{
// TODO: Check that this isn't modified after calling prepare
_unknownResultTypeList = null;
_allResultTypesAreUnknown = value;
}
}
bool _allResultTypesAreUnknown;
///
/// Marks the query's result columns as known or unknown, on a column-by-column basis.
/// Unknown results column are requested them from PostgreSQL in text format, and Npgsql makes no
/// attempt to parse them. They will be accessible as strings only.
///
///
/// If the query includes several queries (e.g. SELECT 1; SELECT 2), this will only apply to the first
/// one. The rest of the queries will be fetched and parsed as usual.
///
/// The array size must correspond exactly to the number of result columns the query returns, or an
/// error will be raised.
///
public bool[]? UnknownResultTypeList
{
get => _unknownResultTypeList;
set
{
// TODO: Check that this isn't modified after calling prepare
_allResultTypesAreUnknown = false;
_unknownResultTypeList = value;
}
}
bool[]? _unknownResultTypeList;
#endregion
#region Result Types Management
///
/// Marks result types to be used when using GetValue on a data reader, on a column-by-column basis.
/// Used for Entity Framework 5-6 compability.
/// Only primitive numerical types and DateTimeOffset are supported.
/// Set the whole array or just a value to null to use default type.
///
internal Type[]? ObjectResultTypes { get; set; }
#endregion
#region State management
volatile int _state;
///
/// The current state of the command
///
internal CommandState State
{
get => (CommandState)_state;
set
{
var newState = (int)value;
if (newState == _state)
return;
_state = newState;
}
}
void ResetExplicitPreparation() => _connectorPreparedOn = null;
#endregion State management
#region Parameters
///
/// Creates a new instance of an object.
///
/// A object.
protected override DbParameter CreateDbParameter() => CreateParameter();
///
/// Creates a new instance of a object.
///
/// An object.
public new NpgsqlParameter CreateParameter() => new();
///
/// DB parameter collection.
///
protected override DbParameterCollection DbParameterCollection => Parameters;
///
/// Gets the .
///
/// The parameters of the SQL statement or function (stored procedure). The default is an empty collection.
public new NpgsqlParameterCollection Parameters => _parameters;
#endregion
#region DeriveParameters
const string DeriveParametersForFunctionQuery = @"
SELECT
CASE
WHEN pg_proc.proargnames IS NULL THEN array_cat(array_fill(''::name,ARRAY[pg_proc.pronargs]),array_agg(pg_attribute.attname ORDER BY pg_attribute.attnum))
ELSE pg_proc.proargnames
END AS proargnames,
pg_proc.proargtypes,
CASE
WHEN pg_proc.proallargtypes IS NULL AND (array_agg(pg_attribute.atttypid))[1] IS NOT NULL THEN array_cat(string_to_array(pg_proc.proargtypes::text,' ')::oid[],array_agg(pg_attribute.atttypid ORDER BY pg_attribute.attnum))
ELSE pg_proc.proallargtypes
END AS proallargtypes,
CASE
WHEN pg_proc.proargmodes IS NULL AND (array_agg(pg_attribute.atttypid))[1] IS NOT NULL THEN array_cat(array_fill('i'::""char"",ARRAY[pg_proc.pronargs]),array_fill('o'::""char"",ARRAY[array_length(array_agg(pg_attribute.atttypid), 1)]))
ELSE pg_proc.proargmodes
END AS proargmodes
FROM pg_proc
LEFT JOIN pg_type ON pg_proc.prorettype = pg_type.oid
LEFT JOIN pg_attribute ON pg_type.typrelid = pg_attribute.attrelid AND pg_attribute.attnum >= 1 AND NOT pg_attribute.attisdropped
WHERE pg_proc.oid = :proname::regproc
GROUP BY pg_proc.proargnames, pg_proc.proargtypes, pg_proc.proallargtypes, pg_proc.proargmodes, pg_proc.pronargs;
";
internal void DeriveParameters()
{
var conn = CheckAndGetConnection();
Debug.Assert(conn is not null);
if (string.IsNullOrEmpty(CommandText))
throw new InvalidOperationException("CommandText property has not been initialized");
using var _ = conn.StartTemporaryBindingScope(out var connector);
if (InternalBatchCommands.Any(s => s.PreparedStatement?.IsExplicit == true))
throw new NpgsqlException("Deriving parameters isn't supported for commands that are already prepared.");
// Here we unprepare statements that possibly are auto-prepared
Unprepare();
Parameters.Clear();
switch (CommandType)
{
case CommandType.Text:
DeriveParametersForQuery(connector);
break;
case CommandType.StoredProcedure:
DeriveParametersForFunction();
break;
default:
throw new NotSupportedException("Cannot derive parameters for CommandType " + CommandType);
}
}
void DeriveParametersForFunction()
{
using var c = new NpgsqlCommand(DeriveParametersForFunctionQuery, _connection);
c.Parameters.Add(new NpgsqlParameter("proname", NpgsqlDbType.Text));
c.Parameters[0].Value = CommandText;
string[]? names = null;
uint[]? types = null;
char[]? modes = null;
using (var rdr = c.ExecuteReader(CommandBehavior.SingleRow | CommandBehavior.SingleResult))
{
if (rdr.Read())
{
if (!rdr.IsDBNull(0))
names = rdr.GetFieldValue(0);
if (!rdr.IsDBNull(2))
types = rdr.GetFieldValue(2);
if (!rdr.IsDBNull(3))
modes = rdr.GetFieldValue(3);
if (types == null)
{
if (rdr.IsDBNull(1) || rdr.GetFieldValue(1).Length == 0)
return; // Parameter-less function
types = rdr.GetFieldValue(1);
}
}
else
throw new InvalidOperationException($"{CommandText} does not exist in pg_proc");
}
var typeMapper = c._connection!.Connector!.TypeMapper;
for (var i = 0; i < types.Length; i++)
{
var param = new NpgsqlParameter();
var (npgsqlDbType, postgresType) = typeMapper.GetTypeInfoByOid(types[i]);
param.DataTypeName = postgresType.DisplayName;
param.PostgresType = postgresType;
if (npgsqlDbType.HasValue)
param.NpgsqlDbType = npgsqlDbType.Value;
if (names != null && i < names.Length)
param.ParameterName = names[i];
else
param.ParameterName = "parameter" + (i + 1);
if (modes == null) // All params are IN, or server < 8.1.0 (and only IN is supported)
param.Direction = ParameterDirection.Input;
else
{
param.Direction = modes[i] switch
{
'i' => ParameterDirection.Input,
'o' => ParameterDirection.Output,
't' => ParameterDirection.Output,
'b' => ParameterDirection.InputOutput,
'v' => throw new NotSupportedException("Cannot derive function parameter of type VARIADIC"),
_ => throw new ArgumentOutOfRangeException("Unknown code in proargmodes while deriving: " + modes[i])
};
}
Parameters.Add(param);
}
}
void DeriveParametersForQuery(NpgsqlConnector connector)
{
using (connector.StartUserAction())
{
Log.Debug($"Deriving Parameters for query: {CommandText}", connector.Id);
if (IsWrappedByBatch)
foreach (var batchCommand in InternalBatchCommands)
connector.SqlQueryParser.ParseRawQuery(batchCommand, connector.UseConformingStrings, deriveParameters: true);
else
connector.SqlQueryParser.ParseRawQuery(this, connector.UseConformingStrings, deriveParameters: true);
var sendTask = SendDeriveParameters(connector, false);
if (sendTask.IsFaulted)
sendTask.GetAwaiter().GetResult();
foreach (var batchCommand in InternalBatchCommands)
{
Expect(
connector.ReadMessage(async: false).GetAwaiter().GetResult(), connector);
var paramTypeOIDs = Expect(
connector.ReadMessage(async: false).GetAwaiter().GetResult(), connector).TypeOIDs;
if (batchCommand.PositionalParameters.Count != paramTypeOIDs.Count)
{
connector.SkipUntil(BackendMessageCode.ReadyForQuery);
Parameters.Clear();
throw new NpgsqlException("There was a mismatch in the number of derived parameters between the Npgsql SQL parser and the PostgreSQL parser. Please report this as bug to the Npgsql developers (https://github.com/npgsql/npgsql/issues).");
}
for (var i = 0; i < paramTypeOIDs.Count; i++)
{
try
{
var param = batchCommand.PositionalParameters[i];
var paramOid = paramTypeOIDs[i];
var (npgsqlDbType, postgresType) = connector.TypeMapper.GetTypeInfoByOid(paramOid);
if (param.NpgsqlDbType != NpgsqlDbType.Unknown && param.NpgsqlDbType != npgsqlDbType)
throw new NpgsqlException("The backend parser inferred different types for parameters with the same name. Please try explicit casting within your SQL statement or batch or use different placeholder names.");
param.DataTypeName = postgresType.DisplayName;
param.PostgresType = postgresType;
if (npgsqlDbType.HasValue)
param.NpgsqlDbType = npgsqlDbType.Value;
}
catch
{
connector.SkipUntil(BackendMessageCode.ReadyForQuery);
Parameters.Clear();
throw;
}
}
var msg = connector.ReadMessage(async: false).GetAwaiter().GetResult();
switch (msg.Code)
{
case BackendMessageCode.RowDescription:
case BackendMessageCode.NoData:
break;
default:
throw connector.UnexpectedMessageReceived(msg.Code);
}
}
Expect(connector.ReadMessage(async: false).GetAwaiter().GetResult(), connector);
sendTask.GetAwaiter().GetResult();
}
}
#endregion
#region Prepare
///
/// Creates a server-side prepared statement on the PostgreSQL server.
/// This will make repeated future executions of this command much faster.
///
public override void Prepare() => Prepare(false).GetAwaiter().GetResult();
///
/// Creates a server-side prepared statement on the PostgreSQL server.
/// This will make repeated future executions of this command much faster.
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
#if NETSTANDARD2_0
public Task PrepareAsync(CancellationToken cancellationToken = default)
#else
public override Task PrepareAsync(CancellationToken cancellationToken = default)
#endif
{
using (NoSynchronizationContextScope.Enter())
return Prepare(true, cancellationToken);
}
Task Prepare(bool async, CancellationToken cancellationToken = default)
{
var connection = CheckAndGetConnection();
Debug.Assert(connection is not null);
if (connection.Settings.Multiplexing)
throw new NotSupportedException("Explicit preparation not supported with multiplexing");
var connector = connection.Connector!;
var needToPrepare = false;
if (IsWrappedByBatch)
{
foreach (var batchCommand in InternalBatchCommands)
{
batchCommand.Parameters.HasOutputParameters = false;
batchCommand.Parameters.PlaceholderType = PlaceholderType.NoParameters;
foreach (var p in batchCommand.Parameters.InternalList)
{
batchCommand.Parameters.CalculatePlaceholderType(p);
p.Bind(connector.TypeMapper);
}
ProcessRawQuery(connector.SqlQueryParser, connector.UseConformingStrings, batchCommand);
needToPrepare = batchCommand.ExplicitPrepare(connector) || needToPrepare;
}
if (Log.IsEnabled(NpgsqlLogLevel.Debug))
Log.Debug(string.Join("; ", InternalBatchCommands.Select(c => c.CommandText)), connector.Id);
}
else
{
Parameters.HasOutputParameters = false;
Parameters.PlaceholderType = PlaceholderType.NoParameters;
foreach (var p in Parameters.InternalList)
{
Parameters.CalculatePlaceholderType(p);
p.Bind(connector.TypeMapper);
}
ProcessRawQuery(connector.SqlQueryParser, connector.UseConformingStrings, batchCommand: null);
foreach (var batchCommand in InternalBatchCommands)
needToPrepare = batchCommand.ExplicitPrepare(connector) || needToPrepare;
if (Log.IsEnabled(NpgsqlLogLevel.Debug))
Log.Debug($"Preparing: {CommandText}", connector.Id);
}
_connectorPreparedOn = connector;
// It's possible the command was already prepared, or that persistent prepared statements were found for
// all statements. Nothing to do here, move along.
return needToPrepare
? PrepareLong(this, async, connector, cancellationToken)
: Task.CompletedTask;
static async Task PrepareLong(NpgsqlCommand command, bool async, NpgsqlConnector connector, CancellationToken cancellationToken)
{
try
{
using (connector.StartUserAction(cancellationToken))
{
var sendTask = command.SendPrepare(connector, async, CancellationToken.None);
if (sendTask.IsFaulted)
sendTask.GetAwaiter().GetResult();
// Loop over statements, skipping those that are already prepared (because they were persisted)
var isFirst = true;
foreach (var batchCommand in command.InternalBatchCommands)
{
if (!batchCommand.IsPreparing)
continue;
var pStatement = batchCommand.PreparedStatement!;
if (pStatement.StatementBeingReplaced != null)
{
Expect(await connector.ReadMessage(async), connector);
pStatement.StatementBeingReplaced.CompleteUnprepare();
pStatement.StatementBeingReplaced = null;
}
Expect(await connector.ReadMessage(async), connector);
Expect(await connector.ReadMessage(async), connector);
var msg = await connector.ReadMessage(async);
switch (msg.Code)
{
case BackendMessageCode.RowDescription:
// Clone the RowDescription for use with the prepared statement (the one we have is reused
// by the connection)
var description = ((RowDescriptionMessage)msg).Clone();
command.FixupRowDescription(description, isFirst);
batchCommand.Description = description;
break;
case BackendMessageCode.NoData:
batchCommand.Description = null;
break;
default:
throw connector.UnexpectedMessageReceived(msg.Code);
}
pStatement.State = PreparedState.Prepared;
connector.PreparedStatementManager.NumPrepared++;
batchCommand.IsPreparing = false;
isFirst = false;
}
Expect(await connector.ReadMessage(async), connector);
if (async)
await sendTask;
else
sendTask.GetAwaiter().GetResult();
}
}
catch
{
// The statements weren't prepared successfully, update the bookkeeping for them
foreach (var batchCommand in command.InternalBatchCommands)
{
if (batchCommand.IsPreparing)
{
batchCommand.IsPreparing = false;
batchCommand.PreparedStatement!.AbortPrepare();
}
}
throw;
}
}
}
///
/// Unprepares a command, closing server-side statements associated with it.
/// Note that this only affects commands explicitly prepared with , not
/// automatically prepared statements.
///
public void Unprepare()
=> Unprepare(false).GetAwaiter().GetResult();
///
/// Unprepares a command, closing server-side statements associated with it.
/// Note that this only affects commands explicitly prepared with , not
/// automatically prepared statements.
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
public Task UnprepareAsync(CancellationToken cancellationToken = default)
{
using (NoSynchronizationContextScope.Enter())
return Unprepare(true, cancellationToken);
}
async Task Unprepare(bool async, CancellationToken cancellationToken = default)
{
var connection = CheckAndGetConnection();
Debug.Assert(connection is not null);
if (connection.Settings.Multiplexing)
throw new NotSupportedException("Explicit preparation not supported with multiplexing");
if (InternalBatchCommands.All(s => !s.IsPrepared))
return;
var connector = connection.Connector!;
Log.Debug("Closing command's prepared statements", connector.Id);
using (connector.StartUserAction(cancellationToken))
{
var sendTask = SendClose(connector, async, cancellationToken);
if (sendTask.IsFaulted)
sendTask.GetAwaiter().GetResult();
foreach (var batchCommand in InternalBatchCommands)
{
if (batchCommand.PreparedStatement?.State == PreparedState.BeingUnprepared)
{
Expect(await connector.ReadMessage(async), connector);
var pStatement = batchCommand.PreparedStatement;
pStatement.CompleteUnprepare();
if (!pStatement.IsExplicit)
connector.PreparedStatementManager.AutoPrepared[pStatement.AutoPreparedSlotIndex] = null;
batchCommand.PreparedStatement = null;
}
}
Expect(await connector.ReadMessage(async), connector);
if (async)
await sendTask;
else
sendTask.GetAwaiter().GetResult();
}
}
#endregion Prepare
#region Query analysis
internal void ProcessRawQuery(SqlQueryParser? parser, bool standardConformingStrings, NpgsqlBatchCommand? batchCommand)
{
var (commandText, commandType, parameters) = batchCommand is null
? (CommandText, CommandType, Parameters)
: (batchCommand.CommandText, batchCommand.CommandType, batchCommand.Parameters);
if (string.IsNullOrEmpty(commandText))
throw new InvalidOperationException("CommandText property has not been initialized");
switch (commandType)
{
case CommandType.Text:
switch (parameters.PlaceholderType)
{
case PlaceholderType.Positional:
// In positional parameter mode, we don't need to parse/rewrite the CommandText or reorder the parameters - just use
// them as is. If the SQL contains a semicolon (legacy batching) when positional parameters are in use, we just send
// that and PostgreSQL will error (this behavior is by-design - use the new batching API).
if (batchCommand is null)
{
batchCommand = TruncateStatementsToOne();
batchCommand.FinalCommandText = CommandText;
batchCommand.PositionalParameters = Parameters.InternalList;
}
else
{
batchCommand.FinalCommandText = batchCommand.CommandText;
batchCommand.PositionalParameters = batchCommand.Parameters.InternalList;
}
ValidateParameterCount(batchCommand);
break;
case PlaceholderType.NoParameters:
// Unless the EnableSqlRewriting AppContext switch is explicitly disabled, queries with no parameters are parsed just
// like queries with named parameters, since they may contain a semicolon (legacy batching).
if (EnableSqlRewriting)
goto case PlaceholderType.Named;
else
goto case PlaceholderType.Positional;
case PlaceholderType.Named:
if (!EnableSqlRewriting)
throw new NotSupportedException($"Named parameters are not supported when Npgsql.{nameof(EnableSqlRewriting)} is disabled");
// The parser is cached on NpgsqlConnector - unless we're in multiplexing mode.
parser ??= new SqlQueryParser();
if (batchCommand is null)
{
parser.ParseRawQuery(this, standardConformingStrings);
if (InternalBatchCommands.Count > 1 && _parameters.HasOutputParameters)
throw new NotSupportedException("Commands with multiple queries cannot have out parameters");
for (var i = 0; i < InternalBatchCommands.Count; i++)
ValidateParameterCount(InternalBatchCommands[i]);
}
else
{
parser.ParseRawQuery(batchCommand, standardConformingStrings);
if (batchCommand.Parameters.HasOutputParameters)
throw new NotSupportedException("Batches cannot cannot have out parameters");
ValidateParameterCount(batchCommand);
}
break;
case PlaceholderType.Mixed:
throw new NotSupportedException("Mixing named and positional parameters isn't supported");
default:
throw new ArgumentOutOfRangeException(
nameof(PlaceholderType), $"Unknown {nameof(PlaceholderType)} value: {Parameters.PlaceholderType}");
}
break;
case CommandType.TableDirect:
batchCommand ??= TruncateStatementsToOne();
batchCommand.FinalCommandText = "SELECT * FROM " + CommandText;
break;
case CommandType.StoredProcedure:
var inputList = parameters.Where(p => p.IsInputDirection).ToList();
var numInput = inputList.Count;
var sb = new StringBuilder();
sb.Append("SELECT * FROM ");
sb.Append(CommandText);
sb.Append('(');
var hasWrittenFirst = false;
for (var i = 1; i <= numInput; i++) {
var param = inputList[i - 1];
if (param.IsPositional)
{
if (hasWrittenFirst)
sb.Append(',');
sb.Append('$');
sb.Append(i);
hasWrittenFirst = true;
}
}
for (var i = 1; i <= numInput; i++)
{
var param = inputList[i - 1];
if (!param.IsPositional)
{
if (hasWrittenFirst)
sb.Append(',');
sb.Append('"');
sb.Append(param.TrimmedName.Replace("\"", "\"\""));
sb.Append("\" := ");
sb.Append('$');
sb.Append(i);
hasWrittenFirst = true;
}
}
sb.Append(')');
batchCommand ??= TruncateStatementsToOne();
batchCommand.FinalCommandText = sb.ToString();
batchCommand.PositionalParameters.AddRange(inputList);
ValidateParameterCount(batchCommand);
break;
default:
throw new InvalidOperationException($"Internal Npgsql bug: unexpected value {CommandType} of enum {nameof(CommandType)}. Please file a bug.");
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
static void ValidateParameterCount(NpgsqlBatchCommand batchCommand)
{
if (batchCommand.PositionalParameters.Count > ushort.MaxValue)
throw new NpgsqlException($"A statement cannot have more than {ushort.MaxValue} parameters");
}
}
#endregion
#region Message Creation / Population
void BeginSend(NpgsqlConnector connector)
=> connector.WriteBuffer.Timeout = TimeSpan.FromSeconds(CommandTimeout);
internal Task Write(NpgsqlConnector connector, bool async, bool flush, CancellationToken cancellationToken = default)
{
return (_behavior & CommandBehavior.SchemaOnly) == 0
? WriteExecute(connector, async, flush, cancellationToken)
: WriteExecuteSchemaOnly(connector, async, flush, cancellationToken);
async Task WriteExecute(NpgsqlConnector connector, bool async, bool flush, CancellationToken cancellationToken)
{
for (var i = 0; i < InternalBatchCommands.Count; i++)
{
// The following is only for deadlock avoidance when doing sync I/O (so never in multiplexing)
ForceAsyncIfNecessary(ref async, i);
var batchCommand = InternalBatchCommands[i];
var pStatement = batchCommand.PreparedStatement;
Debug.Assert(batchCommand.FinalCommandText is not null);
if (pStatement == null || batchCommand.IsPreparing)
{
// The statement should either execute unprepared, or is being auto-prepared.
// Send Parse, Bind, Describe
// We may have a prepared statement that replaces an existing statement - close the latter first.
if (pStatement?.StatementBeingReplaced != null)
await connector.WriteClose(StatementOrPortal.Statement, pStatement.StatementBeingReplaced.Name!, async, cancellationToken);
await connector.WriteParse(batchCommand.FinalCommandText, batchCommand.StatementName, batchCommand.PositionalParameters, async, cancellationToken);
await connector.WriteBind(
batchCommand.PositionalParameters, string.Empty, batchCommand.StatementName, AllResultTypesAreUnknown,
i == 0 ? UnknownResultTypeList : null,
async, cancellationToken);
await connector.WriteDescribe(StatementOrPortal.Portal, string.Empty, async, cancellationToken);
}
else
{
// The statement is already prepared, only a Bind is needed
await connector.WriteBind(
batchCommand.PositionalParameters, string.Empty, batchCommand.StatementName, AllResultTypesAreUnknown,
i == 0 ? UnknownResultTypeList : null,
async, cancellationToken);
}
await connector.WriteExecute(0, async, cancellationToken);
if (pStatement != null)
pStatement.LastUsed = DateTime.UtcNow;
}
await connector.WriteSync(async, cancellationToken);
if (flush)
await connector.Flush(async, cancellationToken);
}
async Task WriteExecuteSchemaOnly(NpgsqlConnector connector, bool async, bool flush, CancellationToken cancellationToken)
{
var wroteSomething = false;
for (var i = 0; i < InternalBatchCommands.Count; i++)
{
ForceAsyncIfNecessary(ref async, i);
var batchCommand = InternalBatchCommands[i];
if (batchCommand.PreparedStatement?.State == PreparedState.Prepared)
continue; // Prepared, we already have the RowDescription
await connector.WriteParse(batchCommand.FinalCommandText!, batchCommand.StatementName, batchCommand.PositionalParameters, async, cancellationToken);
await connector.WriteDescribe(StatementOrPortal.Statement, batchCommand.StatementName, async, cancellationToken);
wroteSomething = true;
}
if (wroteSomething)
{
await connector.WriteSync(async, cancellationToken);
if (flush)
await connector.Flush(async, cancellationToken);
}
}
}
async Task SendDeriveParameters(NpgsqlConnector connector, bool async, CancellationToken cancellationToken = default)
{
BeginSend(connector);
for (var i = 0; i < InternalBatchCommands.Count; i++)
{
ForceAsyncIfNecessary(ref async, i);
var batchCommand = InternalBatchCommands[i];
await connector.WriteParse(batchCommand.FinalCommandText!, string.Empty, EmptyParameters, async, cancellationToken);
await connector.WriteDescribe(StatementOrPortal.Statement, string.Empty, async, cancellationToken);
}
await connector.WriteSync(async, cancellationToken);
await connector.Flush(async, cancellationToken);
}
async Task SendPrepare(NpgsqlConnector connector, bool async, CancellationToken cancellationToken = default)
{
BeginSend(connector);
for (var i = 0; i < InternalBatchCommands.Count; i++)
{
ForceAsyncIfNecessary(ref async, i);
var batchCommand = InternalBatchCommands[i];
var pStatement = batchCommand.PreparedStatement;
// A statement may be already prepared, already in preparation (i.e. same statement twice
// in the same command), or we can't prepare (overloaded SQL)
if (!batchCommand.IsPreparing)
continue;
// We may have a prepared statement that replaces an existing statement - close the latter first.
var statementToClose = pStatement!.StatementBeingReplaced;
if (statementToClose != null)
await connector.WriteClose(StatementOrPortal.Statement, statementToClose.Name!, async, cancellationToken);
await connector.WriteParse(batchCommand.FinalCommandText!, pStatement.Name!, batchCommand.PositionalParameters, async, cancellationToken);
await connector.WriteDescribe(StatementOrPortal.Statement, pStatement.Name!, async, cancellationToken);
}
await connector.WriteSync(async, cancellationToken);
await connector.Flush(async, cancellationToken);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void ForceAsyncIfNecessary(ref bool async, int numberOfStatementInBatch)
{
if (!async && numberOfStatementInBatch > 0)
{
// We're synchronously sending the non-first statement in a batch - switch to async writing.
// See long comment in Execute() above.
// TODO: we can simply do all batch writing asynchronously, instead of starting with the 2nd statement.
// For now, writing the first statement synchronously gives us a better chance of handle and bubbling up errors correctly
// (see sendTask.IsFaulted in Execute()). Once #1323 is done, that shouldn't be needed any more and entire batches should
// be written asynchronously.
async = true;
SynchronizationContext.SetSynchronizationContext(SingleThreadSynchronizationContext);
}
}
async Task SendClose(NpgsqlConnector connector, bool async, CancellationToken cancellationToken = default)
{
BeginSend(connector);
var i = 0;
foreach (var batchCommand in InternalBatchCommands.Where(s => s.IsPrepared))
{
ForceAsyncIfNecessary(ref async, i);
await connector.WriteClose(StatementOrPortal.Statement, batchCommand.StatementName, async, cancellationToken);
batchCommand.PreparedStatement!.State = PreparedState.BeingUnprepared;
i++;
}
await connector.WriteSync(async, cancellationToken);
await connector.Flush(async, cancellationToken);
}
#endregion
#region Execute Non Query
///
/// Executes a SQL statement against the connection and returns the number of rows affected.
///
/// The number of rows affected if known; -1 otherwise.
public override int ExecuteNonQuery() => ExecuteNonQuery(false, CancellationToken.None).GetAwaiter().GetResult();
///
/// Asynchronous version of
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// A task representing the asynchronous operation, with the number of rows affected if known; -1 otherwise.
public override Task ExecuteNonQueryAsync(CancellationToken cancellationToken)
{
using (NoSynchronizationContextScope.Enter())
return ExecuteNonQuery(true, cancellationToken);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
async Task ExecuteNonQuery(bool async, CancellationToken cancellationToken)
{
var reader = await ExecuteReader(CommandBehavior.Default, async, cancellationToken);
try
{
while (async ? await reader.NextResultAsync(cancellationToken) : reader.NextResult()) ;
return reader.RecordsAffected;
}
finally
{
if (async)
await reader.DisposeAsync();
else
reader.Dispose();
}
}
#endregion Execute Non Query
#region Execute Scalar
///
/// Executes the query, and returns the first column of the first row
/// in the result set returned by the query. Extra columns or rows are ignored.
///
/// The first column of the first row in the result set,
/// or a null reference if the result set is empty.
public override object? ExecuteScalar() => ExecuteScalar(false, CancellationToken.None).GetAwaiter().GetResult();
///
/// Asynchronous version of
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// A task representing the asynchronous operation, with the first column of the
/// first row in the result set, or a null reference if the result set is empty.
public override Task