using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Npgsql.BackendMessages;
using Npgsql.Internal;
using Npgsql.Internal.TypeHandling;
using Npgsql.Logging;
using Npgsql.TypeMapping;
using NpgsqlTypes;
using static Npgsql.Util.Statics;
namespace Npgsql;
///
/// Provides an API for a binary COPY TO operation, a high-performance data export mechanism from
/// a PostgreSQL table. Initiated by
///
public sealed class NpgsqlBinaryExporter : ICancelable
{
#region Fields and Properties
NpgsqlConnector _connector;
NpgsqlReadBuffer _buf;
ConnectorTypeMapper _typeMapper;
bool _isConsumed, _isDisposed;
int _leftToReadInDataMsg, _columnLen;
short _column;
///
/// The number of columns, as returned from the backend in the CopyInResponse.
///
internal int NumColumns { get; private set; }
NpgsqlTypeHandler?[] _typeHandlerCache;
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(NpgsqlBinaryExporter));
///
/// Current timeout
///
public TimeSpan Timeout
{
set
{
_buf.Timeout = value;
// While calling Complete(), we're using the connector, which overwrites the buffer's timeout with it's own
_connector.UserTimeout = (int)value.TotalMilliseconds;
}
}
#endregion
#region Construction / Initialization
internal NpgsqlBinaryExporter(NpgsqlConnector connector)
{
_connector = connector;
_buf = connector.ReadBuffer;
_typeMapper = connector.TypeMapper;
_columnLen = int.MinValue; // Mark that the (first) column length hasn't been read yet
_column = -1;
_typeHandlerCache = null!;
}
internal async Task Init(string copyToCommand, bool async, CancellationToken cancellationToken = default)
{
await _connector.WriteQuery(copyToCommand, async, cancellationToken);
await _connector.Flush(async, cancellationToken);
using var registration = _connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);
CopyOutResponseMessage copyOutResponse;
var msg = await _connector.ReadMessage(async);
switch (msg.Code)
{
case BackendMessageCode.CopyOutResponse:
copyOutResponse = (CopyOutResponseMessage) msg;
if (!copyOutResponse.IsBinary)
{
throw _connector.Break(
new ArgumentException("copyToCommand triggered a text transfer, only binary is allowed",
nameof(copyToCommand)));
}
break;
case BackendMessageCode.CommandComplete:
throw new InvalidOperationException(
"This API only supports import/export from the client, i.e. COPY commands containing TO/FROM STDIN. " +
"To import/export with files on your PostgreSQL machine, simply execute the command with ExecuteNonQuery. " +
"Note that your data has been successfully imported/exported.");
default:
throw _connector.UnexpectedMessageReceived(msg.Code);
}
NumColumns = copyOutResponse.NumColumns;
_typeHandlerCache = new NpgsqlTypeHandler[NumColumns];
await ReadHeader(async);
}
async Task ReadHeader(bool async)
{
_leftToReadInDataMsg = Expect(await _connector.ReadMessage(async), _connector).Length;
var headerLen = NpgsqlRawCopyStream.BinarySignature.Length + 4 + 4;
await _buf.Ensure(headerLen, async);
if (NpgsqlRawCopyStream.BinarySignature.Any(t => _buf.ReadByte() != t))
throw new NpgsqlException("Invalid COPY binary signature at beginning!");
var flags = _buf.ReadInt32();
if (flags != 0)
throw new NotSupportedException("Unsupported flags in COPY operation (OID inclusion?)");
_buf.ReadInt32(); // Header extensions, currently unused
_leftToReadInDataMsg -= headerLen;
}
#endregion
#region Read
///
/// Starts reading a single row, must be invoked before reading any columns.
///
///
/// The number of columns in the row. -1 if there are no further rows.
/// Note: This will currently be the same value for all rows, but this may change in the future.
///
public int StartRow() => StartRow(false).GetAwaiter().GetResult();
///
/// Starts reading a single row, must be invoked before reading any columns.
///
///
/// The number of columns in the row. -1 if there are no further rows.
/// Note: This will currently be the same value for all rows, but this may change in the future.
///
public ValueTask StartRowAsync(CancellationToken cancellationToken = default)
{
using (NoSynchronizationContextScope.Enter())
return StartRow(true, cancellationToken);
}
async ValueTask StartRow(bool async, CancellationToken cancellationToken = default)
{
CheckDisposed();
if (_isConsumed)
return -1;
using var registration = _connector.StartNestedCancellableOperation(cancellationToken);
// The very first row (i.e. _column == -1) is included in the header's CopyData message.
// Otherwise we need to read in a new CopyData row (the docs specify that there's a CopyData
// message per row).
if (_column == NumColumns)
_leftToReadInDataMsg = Expect(await _connector.ReadMessage(async), _connector).Length;
else if (_column != -1)
throw new InvalidOperationException("Already in the middle of a row");
await _buf.Ensure(2, async);
_leftToReadInDataMsg -= 2;
var numColumns = _buf.ReadInt16();
if (numColumns == -1)
{
Debug.Assert(_leftToReadInDataMsg == 0);
Expect(await _connector.ReadMessage(async), _connector);
Expect(await _connector.ReadMessage(async), _connector);
Expect(await _connector.ReadMessage(async), _connector);
_column = -1;
_isConsumed = true;
return -1;
}
Debug.Assert(numColumns == NumColumns);
_column = 0;
return NumColumns;
}
///
/// Reads the current column, returns its value and moves ahead to the next column.
/// If the column is null an exception is thrown.
///
///
/// The type of the column to be read. This must correspond to the actual type or data
/// corruption will occur. If in doubt, use to manually
/// specify the type.
///
/// The value of the column
public T Read() => Read(false).GetAwaiter().GetResult();
///
/// Reads the current column, returns its value and moves ahead to the next column.
/// If the column is null an exception is thrown.
///
///
/// The type of the column to be read. This must correspond to the actual type or data
/// corruption will occur. If in doubt, use to manually
/// specify the type.
///
/// The value of the column
public ValueTask ReadAsync(CancellationToken cancellationToken = default)
{
using (NoSynchronizationContextScope.Enter())
return Read(true, cancellationToken);
}
ValueTask Read(bool async, CancellationToken cancellationToken = default)
{
CheckDisposed();
if (_column == -1 || _column == NumColumns)
throw new InvalidOperationException("Not reading a row");
var type = typeof(T);
var handler = _typeHandlerCache[_column];
if (handler == null)
handler = _typeHandlerCache[_column] = _typeMapper.ResolveByClrType(type);
return DoRead(handler, async, cancellationToken);
}
///
/// Reads the current column, returns its value according to and
/// moves ahead to the next column.
/// If the column is null an exception is thrown.
///
///
/// In some cases isn't enough to infer the data type coming in from the
/// database. This parameter can be used to unambiguously specify the type. An example is the JSONB
/// type, for which will be a simple string but for which
/// must be specified as .
///
/// The .NET type of the column to be read.
/// The value of the column
public T Read(NpgsqlDbType type) => Read(type, false).GetAwaiter().GetResult();
///
/// Reads the current column, returns its value according to and
/// moves ahead to the next column.
/// If the column is null an exception is thrown.
///
///
/// In some cases isn't enough to infer the data type coming in from the
/// database. This parameter can be used to unambiguously specify the type. An example is the JSONB
/// type, for which will be a simple string but for which
/// must be specified as .
///
///
/// An optional token to cancel the asynchronous operation. The default value is .
///
/// The .NET type of the column to be read.
/// The value of the column
public ValueTask ReadAsync(NpgsqlDbType type, CancellationToken cancellationToken = default)
{
using (NoSynchronizationContextScope.Enter())
return Read(type, true, cancellationToken);
}
ValueTask Read(NpgsqlDbType type, bool async, CancellationToken cancellationToken = default)
{
CheckDisposed();
if (_column == -1 || _column == NumColumns)
throw new InvalidOperationException("Not reading a row");
var handler = _typeHandlerCache[_column];
if (handler == null)
handler = _typeHandlerCache[_column] = _typeMapper.ResolveByNpgsqlDbType(type);
return DoRead(handler, async, cancellationToken);
}
async ValueTask DoRead(NpgsqlTypeHandler handler, bool async, CancellationToken cancellationToken = default)
{
try
{
using var registration = _connector.StartNestedCancellableOperation(cancellationToken);
await ReadColumnLenIfNeeded(async);
if (_columnLen == -1)
{
#pragma warning disable CS8653 // A default expression introduces a null value when 'T' is a non-nullable reference type.
// When T is a Nullable, we support returning null
if (NullableHandler.Exists)
return default!;
#pragma warning restore CS8653
throw new InvalidCastException("Column is null");
}
// If we know the entire column is already in memory, use the code path without async
var result = NullableHandler.Exists
? _columnLen <= _buf.ReadBytesLeft
? NullableHandler.Read(handler, _buf, _columnLen)
: await NullableHandler.ReadAsync(handler, _buf, _columnLen, async)
: _columnLen <= _buf.ReadBytesLeft
? handler.Read(_buf, _columnLen)
: await handler.Read(_buf, _columnLen, async);
_leftToReadInDataMsg -= _columnLen;
_columnLen = int.MinValue; // Mark that the (next) column length hasn't been read yet
_column++;
return result;
}
catch (Exception e)
{
_connector.Break(e);
throw;
}
}
///
/// Returns whether the current column is null.
///
public bool IsNull
{
get
{
ReadColumnLenIfNeeded(false).GetAwaiter().GetResult();
return _columnLen == -1;
}
}
///
/// Skips the current column without interpreting its value.
///
public void Skip() => Skip(false).GetAwaiter().GetResult();
///
/// Skips the current column without interpreting its value.
///
public Task SkipAsync(CancellationToken cancellationToken = default)
{
using (NoSynchronizationContextScope.Enter())
return Skip(true, cancellationToken);
}
async Task Skip(bool async, CancellationToken cancellationToken = default)
{
CheckDisposed();
using var registration = _connector.StartNestedCancellableOperation(cancellationToken);
await ReadColumnLenIfNeeded(async);
if (_columnLen != -1)
await _buf.Skip(_columnLen, async);
_columnLen = int.MinValue;
_column++;
}
#endregion
#region Utilities
async Task ReadColumnLenIfNeeded(bool async)
{
if (_columnLen == int.MinValue)
{
await _buf.Ensure(4, async);
_columnLen = _buf.ReadInt32();
_leftToReadInDataMsg -= 4;
}
}
void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(GetType().FullName, "The COPY operation has already ended.");
}
#endregion
#region Cancel / Close / Dispose
///
/// Cancels an ongoing export.
///
public void Cancel() => _connector.PerformUserCancellation();
///
/// Async cancels an ongoing export.
///
public Task CancelAsync()
{
Cancel();
return Task.CompletedTask;
}
///
/// Completes that binary export and sets the connection back to idle state
///
public void Dispose() => DisposeAsync(false).GetAwaiter().GetResult();
///
/// Async completes that binary export and sets the connection back to idle state
///
///
public ValueTask DisposeAsync()
{
using (NoSynchronizationContextScope.Enter())
return DisposeAsync(true);
}
async ValueTask DisposeAsync(bool async)
{
if (_isDisposed)
return;
if (!_isConsumed && !_connector.IsBroken)
{
try
{
using var registration = _connector.StartNestedCancellableOperation(attemptPgCancellation: false);
// Finish the current CopyData message
_buf.Skip(_leftToReadInDataMsg);
// Read to the end
_connector.SkipUntil(BackendMessageCode.CopyDone);
// We intentionally do not pass a CancellationToken since we don't want to cancel cleanup
Expect(await _connector.ReadMessage(async), _connector);
Expect(await _connector.ReadMessage(async), _connector);
}
catch (OperationCanceledException e) when (e.InnerException is PostgresException pg && pg.SqlState == PostgresErrorCodes.QueryCanceled)
{
Log.Debug($"Caught an exception while disposing the {nameof(NpgsqlBinaryExporter)}, indicating that it was cancelled.", e, _connector.Id);
}
catch (Exception e)
{
Log.Error($"Caught an exception while disposing the {nameof(NpgsqlBinaryExporter)}.", e, _connector.Id);
}
}
_connector.EndUserAction();
Cleanup();
}
#pragma warning disable CS8625
void Cleanup()
{
Debug.Assert(!_isDisposed);
var connector = _connector;
Log.Debug("COPY operation ended", connector?.Id ?? -1);
if (connector != null)
{
connector.CurrentCopyOperation = null;
_connector.Connection?.EndBindingScope(ConnectorBindingScope.Copy);
_connector = null;
}
_typeMapper = null;
_buf = null;
_isDisposed = true;
}
#pragma warning restore CS8625
#endregion
}