124 lines
3.4 KiB
C#
124 lines
3.4 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Diagnostics;
|
|
using System.Threading;
|
|
using Npgsql.Logging;
|
|
|
|
namespace Npgsql;
|
|
|
|
sealed class SingleThreadSynchronizationContext : SynchronizationContext, IDisposable
|
|
{
|
|
readonly BlockingCollection<CallbackAndState> _tasks = new();
|
|
readonly object _lockObject = new();
|
|
volatile Thread? _thread;
|
|
bool _doingWork;
|
|
|
|
const int ThreadStayAliveMs = 10000;
|
|
readonly string _threadName;
|
|
|
|
static readonly NpgsqlLogger Log = NpgsqlLogManager.CreateLogger(nameof(SingleThreadSynchronizationContext));
|
|
|
|
internal SingleThreadSynchronizationContext(string threadName)
|
|
=> _threadName = threadName;
|
|
|
|
internal Disposable Enter() => new(this);
|
|
|
|
public override void Post(SendOrPostCallback callback, object? state)
|
|
{
|
|
_tasks.Add(new CallbackAndState { Callback = callback, State = state });
|
|
|
|
lock (_lockObject)
|
|
{
|
|
if (!_doingWork)
|
|
{
|
|
// Either there is no thread, or the current thread is exiting
|
|
// In which case, wait for it to complete
|
|
var currentThread = _thread;
|
|
currentThread?.Join();
|
|
Debug.Assert(_thread is null);
|
|
_doingWork = true;
|
|
_thread = new Thread(WorkLoop) { Name = _threadName, IsBackground = true };
|
|
_thread.Start();
|
|
}
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_tasks.CompleteAdding();
|
|
|
|
var thread = _thread;
|
|
thread?.Join();
|
|
|
|
_tasks.Dispose();
|
|
}
|
|
|
|
void WorkLoop()
|
|
{
|
|
SetSynchronizationContext(this);
|
|
|
|
try
|
|
{
|
|
while (true)
|
|
{
|
|
var taken = _tasks.TryTake(out var callbackAndState, ThreadStayAliveMs);
|
|
if (!taken)
|
|
{
|
|
lock (_lockObject)
|
|
{
|
|
if (_tasks.Count == 0)
|
|
{
|
|
_doingWork = false;
|
|
return;
|
|
}
|
|
}
|
|
|
|
continue;
|
|
}
|
|
|
|
try
|
|
{
|
|
Debug.Assert(_doingWork);
|
|
callbackAndState.Callback(callbackAndState.State);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Log.Error($"Exception caught in {nameof(SingleThreadSynchronizationContext)}", e);
|
|
}
|
|
}
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
// Here we attempt to catch any exception coming from BlockingCollection _tasks
|
|
Log.Error($"Exception caught in {nameof(SingleThreadSynchronizationContext)}", e);
|
|
lock (_lockObject)
|
|
_doingWork = false;
|
|
}
|
|
finally
|
|
{
|
|
Debug.Assert(!_doingWork);
|
|
_thread = null;
|
|
}
|
|
}
|
|
|
|
struct CallbackAndState
|
|
{
|
|
internal SendOrPostCallback Callback;
|
|
internal object? State;
|
|
}
|
|
|
|
internal readonly struct Disposable : IDisposable
|
|
{
|
|
readonly SynchronizationContext? _synchronizationContext;
|
|
|
|
internal Disposable(SynchronizationContext synchronizationContext)
|
|
{
|
|
_synchronizationContext = Current;
|
|
SetSynchronizationContext(synchronizationContext);
|
|
}
|
|
|
|
public void Dispose()
|
|
=> SetSynchronizationContext(_synchronizationContext);
|
|
}
|
|
}
|