Kit.Core/LibExternal/Npgsql/Internal/NpgsqlReadBuffer.Stream.cs

238 lines
7.8 KiB
C#

using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Npgsql.Internal;
public sealed partial class NpgsqlReadBuffer
{
internal sealed class ColumnStream : Stream
{
readonly NpgsqlConnector _connector;
readonly NpgsqlReadBuffer _buf;
int _start, _len, _read;
bool _canSeek;
bool _startCancellableOperations;
internal bool IsDisposed { get; private set; }
internal ColumnStream(NpgsqlConnector connector, bool startCancellableOperations = true)
{
_connector = connector;
_buf = connector.ReadBuffer;
_startCancellableOperations = startCancellableOperations;
IsDisposed = true;
}
internal void Init(int len, bool canSeek)
{
Debug.Assert(!canSeek || _buf.ReadBytesLeft >= len,
"Seekable stream constructed but not all data is in buffer (sequential)");
_start = _buf.ReadPosition;
_len = len;
_read = 0;
_canSeek = canSeek;
IsDisposed = false;
}
public override bool CanRead => true;
public override bool CanWrite => false;
public override bool CanSeek => _canSeek;
public override long Length
{
get
{
CheckDisposed();
return _len;
}
}
public override void SetLength(long value)
=> throw new NotSupportedException();
public override long Position
{
get
{
CheckDisposed();
return _read;
}
set
{
if (value < 0)
throw new ArgumentOutOfRangeException(nameof(value), "Non - negative number required.");
Seek(value, SeekOrigin.Begin);
}
}
public override long Seek(long offset, SeekOrigin origin)
{
CheckDisposed();
if (!_canSeek)
throw new NotSupportedException();
if (offset > int.MaxValue)
throw new ArgumentOutOfRangeException(nameof(offset), "Stream length must be non-negative and less than 2^31 - 1 - origin.");
const string seekBeforeBegin = "An attempt was made to move the position before the beginning of the stream.";
switch (origin)
{
case SeekOrigin.Begin:
{
var tempPosition = unchecked(_start + (int)offset);
if (offset < 0 || tempPosition < _start)
throw new IOException(seekBeforeBegin);
_buf.ReadPosition = tempPosition;
_read = (int)offset;
return _read;
}
case SeekOrigin.Current:
{
var tempPosition = unchecked(_buf.ReadPosition + (int)offset);
if (unchecked(_buf.ReadPosition + offset) < _start || tempPosition < _start)
throw new IOException(seekBeforeBegin);
_buf.ReadPosition = tempPosition;
_read += (int)offset;
return _read;
}
case SeekOrigin.End:
{
var tempPosition = unchecked(_start + _len + (int)offset);
if (unchecked(_start + _len + offset) < _start || tempPosition < _start)
throw new IOException(seekBeforeBegin);
_buf.ReadPosition = tempPosition;
_read = _len + (int)offset;
return _read;
}
default:
throw new ArgumentOutOfRangeException(nameof(origin), "Invalid seek origin.");
}
}
public override void Flush()
=> CheckDisposed();
public override Task FlushAsync(CancellationToken cancellationToken)
{
CheckDisposed();
return cancellationToken.IsCancellationRequested
? Task.FromCanceled(cancellationToken) : Task.CompletedTask;
}
public override int Read(byte[] buffer, int offset, int count)
{
ValidateArguments(buffer, offset, count);
return Read(new Span<byte>(buffer, offset, count));
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValidateArguments(buffer, offset, count);
using (NoSynchronizationContextScope.Enter())
return ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
#if NETSTANDARD2_0
public int Read(Span<byte> span)
#else
public override int Read(Span<byte> span)
#endif
{
CheckDisposed();
var count = Math.Min(span.Length, _len - _read);
if (count == 0)
return 0;
var read = _buf.Read(span.Slice(0, count));
_read += read;
return read;
}
#if NETSTANDARD2_0
public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
#else
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
#endif
{
CheckDisposed();
var count = Math.Min(buffer.Length, _len - _read);
if (count == 0)
return new ValueTask<int>(0);
using (NoSynchronizationContextScope.Enter())
return ReadLong(this, buffer.Slice(0, count), cancellationToken);
static async ValueTask<int> ReadLong(ColumnStream stream, Memory<byte> buffer, CancellationToken cancellationToken = default)
{
using var registration = stream._startCancellableOperations
? stream._connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false)
: default;
var read = await stream._buf.ReadAsync(buffer, cancellationToken);
stream._read += read;
return read;
}
}
public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
void CheckDisposed()
{
if (IsDisposed)
throw new ObjectDisposedException(null);
}
protected override void Dispose(bool disposing)
=> DisposeAsync(disposing, async: false).GetAwaiter().GetResult();
#if NETSTANDARD2_0
public ValueTask DisposeAsync()
#else
public override ValueTask DisposeAsync()
#endif
{
using (NoSynchronizationContextScope.Enter())
return DisposeAsync(disposing: true, async: true);
}
async ValueTask DisposeAsync(bool disposing, bool async)
{
if (IsDisposed || !disposing)
return;
var leftToSkip = _len - _read;
if (leftToSkip > 0)
{
if (async)
await _buf.Skip(leftToSkip, async);
else
_buf.Skip(leftToSkip, async).GetAwaiter().GetResult();
}
IsDisposed = true;
}
}
static void ValidateArguments(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0)
throw new ArgumentNullException(nameof(offset));
if (count < 0)
throw new ArgumentNullException(nameof(count));
if (buffer.Length - offset < count)
throw new ArgumentException("Offset and length were out of bounds for the array or count is greater than the number of elements from index to the end of the source collection.");
}
}