DIY Multi-threaded Throttling System in .NET Core
Development | Borna Gajić

DIY Multi-threaded Throttling System in .NET Core

Monday, Feb 27, 2023 • 6 min read
A client request that led to an all-purpose multi-threaded throttling system which uses queues and inter-process communication.

Intro

A few weeks ago, I had a very interesting request from a client. They wanted to have a way of controlling the amount of concurrently generated financial reports while also being able to observe progress. Up until that day, reports were generated with zero throttle, meaning, if 100 users started generating a report at the same time, it would create a strain on the server (most computing power would go to crunching mind-numbing amounts of data). So, it was only natural that some kind of throttle be implemented. The final request given by the client was that it should be reusable; if another part of the app required throttling, it should be able to use it.

Now that all the requirements are set, lets sketch out an interface! FYI, you can find the link to the implementation at the end of the blog post 😉.

The interface

Let’s break it down into the following bullets:

  • Create a queue,
  • Create a state management system,
  • Add a functionality that notifies the consumer on the progress being made (state),
  • Add the ability to cancel a process at any time,
  • Should be generic (all purpose).
public interface IProcessWorker : IDisposable
{
    Task CancelWorkItemAsync(Guid processId, int? millisecondsDelay = null);

    ProcessInfo EnqueueWorkItem<TState>(Func<CancellationToken, Task> process, Func<Guid, TState, Task> statusChangeCallback = null)
        where TState : ProcessWorkerState;
}

What the implementation should look like can be deduced from method names:

  • CancelWorkItemAsync will be used to cancel processing,
  • EnqueueWorkItem will be used to enqueue a work item (process) and notify the consuming code on the progress being made for that work item,(statusChangeCallback).

ProcessInfo contains information that is relevant to the consuming code - we will get to that in a bit.

public class ProcessInfo
{
    public CancellationToken StoppingToken { get; init; }
    public Task WaitingToken { get; init; }
    public Guid ProcessId { get; init; }
}

ProcessWorkerState is used for state management. You can either save it in-memory or some external storage (MongoDB, Redis, SQL…). Here’s a good starting point:

public record ProcessWorkerState
{
    ...
    public ProcessStatus Status { get; set; }
    public Guid ProcessId { get; init; }
    public int UserId { get; init; }
    ...
}

[Flags]
public enum ProcessStatus
{
    Queued = 0,
    Running = 1 << 0,
    Done = 1 << 1,
    Canceled = 1 << 2,
    Failed = 1 << 3,
    CancellationRequested = 1 << 4,
}

Of course, you can always opt out of using state management altogether; I included it in the ProcessWorker because it produces historic records, and allows for some fun FE stuff to be developed (read: real-time notifications and loading bars 😎).

public class ProcessWorker : IProcessWorker
{
    public ProcessWorkerConfiguration Configuration { get; private set; }

    protected readonly ConcurrentQueue<ProcessMetadata> _queue = new();
    protected readonly ConcurrentDictionary<Guid, WorkItemToolbox> _workItemToolbox = new();
    protected int CountOfWorkingItems { get; set; } = 0;

    private readonly Timer _timer;

    public ProcessWorkerService(ProcessWorkerConfiguration configuration = null)
    {
         Configuration = configuration ?? new();

        _timer = new(Configuration.TimerIntervalMs)
        {
            AutoReset = true,
            Enabled = false
        };
        _timer.Elapsed += async (sender, e) => await RunWorkerAsync();
        _timer.Start();
    }

    // EnqueueWorkItem, CancelWorkItemAsync, RunWorkerAsync, Dispose...
}

ProcessWorkerConfiguration is a class that contains configuration options for the process worker:

public record ProcessWorkerConfiguration
{
    // How many work items will be ran at the same time?
    public int Concurrency { get; init; } = 3;
    // After how many milliseconds should RunWorkerAsync be called?
    public int TimerIntervalMs { get; init; } = 15_000;
}

EnqueueWorkItem

The point of this method is simple: queue a work item and return relevant stuff to the consumer. The idea is to give an option to await the completion of the method, because it might be queued for some time; to achieve that, I used the TaskCompletionSource. IProgress is used to report progress to the consumer and CancellationTokenSource is used for cancellation.

Note that the contract of this method is not asynchronous. It can be changed to ValueTask (a win-win scenario), although this can cause unwanted side effects (e.g. awating the same instance twice). Thus, for simplicity’s sake, it just stays fully synchronous. Have in mind that this implementation is based on the .NET Core. If it was on the ASP.NET platfrom, we would have to use Task.ConfigureAwait(false) wherever possible.

public ProcessInfo EnqueueWorkItem<TState>(Func<CancellationToken, Task> process, ProcessWorkerStatusChangeCallback<TState> statusChangeCallback)
        where TState : ProcessWorkerState
{
    var cancelTokenSource = new CancellationTokenSource();
    var taskCompletionSrc = new TaskCompletionSource();

    var processId = Guid.NewGuid();

    var processInfo = new ProcessInfo
    {
        StoppingToken = cancelTokenSource.Token,
        WaitingToken = taskCompletionSrc.Task,
        ProcessId = processId
    };

    var processMetadata = new ProcessMetadata
    {
        ProcessInfo = processInfo,
        Process = async () =>
        {
            await process(cancelTokenSource.Token);
        }
    };

    var progress = new Progress<ProcessStatus>();

    progress.ProgressChanged += async (sender, newStatus) =>
    {
        var state = await statusChangeCallback(processId, newStatus) as ProcessWorkerState;

        // Do your state management here...
    };

    (progress as IProgress<ProcessStatus>).Report(ProcessStatus.Queued);

    _workItemToolbox.TryAdd(processId, new WorkItemToolbox
    {
        CancellationTokenSrc = cancelTokenSource,
        TaskCompletionSrc = taskCompletionSrc,
        Progress = progress
    });

    _queue.Enqueue(processMetadata);

    return processInfo;
}

CancelWorkItemAsync

As the name suggests, this method is used for cancellation of work items (who would have guessed?!). The thing about this method is that it can be implemented in many ways; e.g. if you’re using states, we can just grab one with the same processId and get the CancellationTokenSource from the _workItemToolbox, and do with it whatever we want. All possible implemenations should converge to the same outcome, i.e. the cancellation of a work item.

public async Task CancelWorkItemAsync(Guid processId, int? millisecondsDelay = null)
{
    var processState = ...;
    _workItemToolbox.TryGetValue(processId, out var toolbox);

    if (processState.Status is ProcessStatus.Queued)
    {
        // Short block, we can live with it, more info: ConcurrentQueue.GetEnumerator
        var cancelItem = _queue.FirstOrDefault(processItem => processItem.ProcessInfo.ProcessId == processId);

        if (cancelItem is not null)
        {
            cancelItem.IsCanceledBeforeRunning = true;

            toolbox.Progress.Report(ProcessStatus.Canceled);
            toolbox.TaskCompletionSrc.TrySetResult();
        }
    }
    else if (processState.Status is ProcessStatus.Running)
    {
        toolbox.Progress.Report(ProcessStatus.CancellationRequested);

        if (millisecondsDelay is not null)
            toolbox.CancellationTokenSrc.CancelAfter(millisecondsDelay.Value);
        else
            toolbox.CancellationTokenSrc.Cancel();
    }
}

RunWorkerAsync

Finally, the beating heart of this class, it gets called every Configuration.TimerIntervalMs milliseconds, checks whether it can start running the next work item, and finally creates X amount of Tasks (based on the Configuration.Concurrency) to start doing the actual work. It’s a long method, so I’ll break it down into its crucial parts. The full-code example will be linked at the end of the blog so you can check it out afterwards.

Instead of using concurrencyOffset, you can try using SemaphoreSlim. In the end, it should function exactly the same (the former uses less memory, but it’s less readable).

protected async Task RunWorkerAsync()
{
    if (_queue.IsEmpty || CountOfWorkingItems == Configuration.Concurrency)
        return;

    async Task workItem()
    {
        // Steps:
        // 1. TryDequeue, if it fails, try again with a retry policy, log Environment.StackTrace if that fails (worst-case scenario, shouldn't ever happen)
        // 2. Check for IsCanceledBeforeRunning, IsCancellationRequested, call IProgress.Report accordingly
        // 3. Call ProcessMetadata.Process
        // 4. Clean up resources
    };

    var concurrencyOffset = Configuration.Concurrency - CountOfWorkingItems;

    var workerSlotsLeft = Math.Clamp(
        concurrencyOffset,
        0,
        concurrencyOffset >= _queue.Count ? _queue.Count : concurrencyOffset
    );

    await Task.WhenAll(
        Enumerable
            .Range(0, workerSlotsLeft)
            .Select(_ => workItem())
    );
}

Final Words

A careful reader might have already noticed that we have fulfilled all the requirments

  • Create a queue ✔️
  • Create a state management system ✔️
  • Add a functionality that notifies the consumer on the progress being made (state) ✔️
  • Add the ability to cancel a process at any time ✔️
  • Should be generic ✔️

We can, of course, complicate things even further. Why use API’s resources to do the heavy work when we can have a separate application that will do it instead? This will boost the overall performance of the app and, with that, it will compute stuff at a better pace while the worker is doing its sweaty work. This topic is out-of-scope for this blog post, but here’s a quick how-to:

Create a Console Application (worker), use RabbitMQ to send messages between these two apps, use ProcessWorker for throttling, and that’s it! We have ourselves a fully-fledged throttling system that has its own memory block, thread pool and, depending on how you deploy the worker, it might also have its own CPU! Instead of using RabbitMQ, you could use SignalR; create a hub on the API side that will communicate with the client set on the worker’s side. This way, there are fewer configurations and the approach itself is generally less complicated.

DIY Process Worker upgrades

There’s a bunch of ways you can upgrade your ProcessWorker:

  • Create an IProcessWorkerProvider which will create and manage multiple instances of the ProcessWorker,
    • Use it to share the same instance of ProcessWorker between multiple consumers.
  • Create a “priority queue” where certain work items should be prioritized over others,
    • Sometimes there might be a scenario where certain users (or code, or…) should have their work prioritized by ProcessWorker.
    • Hint: use LinkedList<T>
  • Work load balancing,
    • One instance can steal work from the other if the other one has too much work.
  • Dynamic changes to ProcessWorkerConfiguration,
    • With some heuristics dynamically change configuration to be able to function optimally (e.g. less work items requires less frequent calls to RunWorkerAsync).
  • Add an option to return a result from the queued method,
    • Currently it’s implemented as a fire-and-forget kind of action.

💾 The promised link to the code - it’s a proof-of-concept project which demonstrates a minimal example of usage.

Thanks for reading!