DIY Multi-threaded Throttling System in .NET Core
Development | Borna Gajić

DIY Multi-threaded Throttling System in .NET Core

Monday, Feb 27, 2023 • 6 min read
A client request that led to an all-purpose multi-threaded throttling system which uses queues and inter-process communication.

Intro

A few weeks ago, I had a very interesting request from a client. They wanted to have a way of controlling the amount of concurrently generated financial reports while also being able to observe progress. Up until that day, reports were generated with zero throttle, meaning, if 100 users started generating a report at the same time, it would create a strain on the server (most computing power would go to crunching mind-numbing amounts of data). So, it was only natural that some kind of throttle be implemented. The final request given by the client was that it should be reusable; if another part of the app required throttling, it should be able to use it.

Now that all the requirements are set, lets sketch out an interface! FYI, you can find the link to the implementation at the end of the blog post 😉.

The base interface

public interface IProcessWorker : IDisposable
{
    IProcessWorkerConsumer Consumer { get; }
    IProcessWorkerProducer Producer { get; }
}

Let’s break it down into the following bullets:

  • Create a queue,
  • Create a state management system,
  • Add a functionality that notifies the consumer on the progress being made (state),
  • Add the ability to cancel a process at any time,
  • Should be generic (all purpose).

Producer

public interface IProcessWorkerProducer
{
    // Cancels a process and removes it from the queue.
    void CancelWorkItemAsync(Guid processId, int? millisecondsDelay = null);

    // Enqueues a process to worker's queue with default state.
    Task<ProcessWorkerInfo> EnqueueWorkItemAsync(Func<CancellationToken, Task> process);

    // Enqueues a process to worker's queue with progress callback and default state.
    Task<ProcessWorkerInfo> EnqueueWorkItemAsync(Func<CancellationToken, Task> process, Func<Guid, ProcessStatus, Task> progressCallback);

    // Enqueues a process to worker's queue with progress callback and custom state.
    Task<ProcessWorkerInfo> EnqueueWorkItemAsync<TState>(Func<CancellationToken, Task> process, Func<Guid, ProcessStatus, Task<TState>> statusChangeCallback)
        where TState : ProcessWorkerState;
}

What the implementation should look like can be deduced from method names:

  • CancelWorkItemAsync will be used to cancel processing,
  • EnqueueWorkItemAsync will be used to enqueue a work item (process) and notify the consuming code on the progress being made for that work item

ProcessInfo contains information that is relevant to the consuming code - we will get to that in a bit.

public class ProcessInfo
{
    public CancellationToken StoppingToken { get; init; }
    public Task Completion { get; init; }
    public Guid ProcessId { get; init; }
}

ProcessWorkerState is used for state management. You can either save it in-memory or some external storage (MongoDB, Redis, SQL…). Here’s a good starting point:

public record ProcessWorkerState
{
    ...
    public ProcessStatus Status { get; set; }
    public Guid ProcessId { get; init; }
    public int UserId { get; init; }
    ...
}

[Flags]
public enum ProcessStatus
{
    Queued = 0,
    Running = 1 << 0,
    Done = 1 << 1,
    Canceled = 1 << 2,
    Failed = 1 << 3,
    CancellationRequested = 1 << 4,
}

Of course, you can always opt out of using state management altogether; I included it in the ProcessWorker because it produces historic records, and allows for some fun FE stuff to be developed (read: real-time notifications and loading bars 😎).

internal class ProcessWorkerProducer : IProcessWorkerProducer
{
    protected readonly ConcurrentDictionary<Guid, WorkItem> _workItems = new();

    public ProcessWorkerProducer(ChannelWriter<WorkItem> channelWriter)
    {
        ChannelWriter = channelWriter;
    }

    public ChannelWriter<WorkItem> ChannelWriter { get; }

    // EnqueueWorkItemAsync, CancelWorkItemAsync
}

EnqueueWorkItemAsync

The point of this method is simple: queue a work item and return relevant stuff to the consumer, we do this by writing work items to a Channel. Note that the contract of this method is asynchronous. It can be changed to ValueTask (a win-win scenario), although this can cause unwanted side effects (e.g. awating the same instance twice). Have in mind that this implementation is based on the .NET Core.

public async Task<ProcessWorkerInfo> EnqueueWorkItemAsync<TState>(Func<CancellationToken, Task> process, Func<Guid, ProcessStatus, Task<TState>> statusChangeCallback)
    where TState : ProcessWorkerState
{
    var cancelTokenSource = new CancellationTokenSource();
    var taskCompletionSrc = new TaskCompletionSource();

    var processId = Guid.NewGuid();

    async Task progressAsync(ProcessStatus newStatus)
    {
        var state = await statusChangeCallback(processId, newStatus);

        if (_workItems.TryGetValue(processId, out var workItem))
        {
            lock (workItem)
            {
                workItem.Status = newStatus;
            }
        }
    }

    await progressAsync(ProcessStatus.Queued);

    var processInfo = new ProcessWorkerInfo
    {
        StoppingToken = cancelTokenSource.Token,
        Completion = taskCompletionSrc.Task,
        ProcessId = processId
    };

    var workItem = new WorkItem(progressAsync)
    {
        Status = ProcessStatus.Queued,
        CancellationTokenSrc = cancelTokenSource,
        TaskCompletionSrc = taskCompletionSrc,
        ProcessMetadata = new ProcessMetadata
        {
            DoWorkAsync = async (stoppingToken) =>
            {
                try
                {
                    await process(stoppingToken);
                }
                finally
                {
                    _workItems.TryRemove(processInfo.ProcessId, out var _);
                }
            },
            ProcessInfo = processInfo
        }
    };

    _workItems.TryAdd(processId, workItem);

    // For Unbounded Channels this will always return true
    ChannelWriter.TryWrite(workItem);

    return processInfo;
}

CancelWorkItemAsync

As the name suggests, this method is used for cancellation of work items (who would have guessed?!). The thing about this method is that it can be implemented in many ways; e.g. if you’re using states, we can just grab one with the same processId and get the CancellationTokenSource from the _workItems, and do with it whatever we want. All possible implemenations should converge to the same outcome, i.e. the cancellation of a work item. Monitor ensures that only a single thread performs the cancellation, other threads should pass through wihout any action.

public void CancelWorkItemAsync(Guid processId, int? millisecondsDelay = null)
{
    if (_workItems.TryGetValue(processId, out var workItem) && Monitor.TryEnter(workItem))
    {
        try
        {
            if (workItem.Status is ProcessStatus.Queued)
            {
                workItem.ProcessMetadata.IsCanceledBeforeRunning = true;
                workItem.TaskCompletionSrc.TrySetCanceled();
                workItem.Progress(ProcessStatus.Canceled);
                _workItems.TryRemove(processId, out var _);
            }
            else if (workItem.Status is ProcessStatus.Running)
            {
                workItem.Progress(ProcessStatus.CancellationRequested);

                if (millisecondsDelay is not null)
                    workItem.CancellationTokenSrc.CancelAfter(millisecondsDelay.Value);
                else
                    workItem.CancellationTokenSrc.Cancel();
            }
        }
        finally
        {
            Monitor.Exit(workItem);
        }
    }
}

Consumer

public interface IProcessWorkerConsumer
{
    bool TryCreateConsumingThread();
}

Simple, right? Let’s see how it works.

internal class ProcessWorkerConsumer : IProcessWorkerConsumer
{
    private bool ThreadExists { get; set; }

    public ProcessWorkerConsumer(ChannelReader<WorkItem> channelReader, ProcessWorkerConfiguration configuration)
    {
        ChannelReader = channelReader;
        Configuration = configuration;
    }

    public readonly ChannelReader<WorkItem> ChannelReader;
    public ProcessWorkerConfiguration Configuration { get; }

    // TryCreateConsumingThread
}

ProcessWorkerConfiguration is a class that contains configuration options for the process worker:

public record ProcessWorkerConfiguration
{
    // How many work items will be ran at the same time?
    public int Concurrency { get; init; } = 3;
}

TryCreateConsumingThread

Finally, the beating heart of ProcessWorker, it spins up a thread, checks whether it can start running the next work item, and finally creates X amount of Tasks (based on the Configuration.Concurrency) to start doing the actual work.

public bool TryCreateConsumingThread()
{
    if (ThreadExists)
    {
        return false;
    }

    var workerThread = new Thread(async () =>
    {
        using var semaphore = new SemaphoreSlim(Configuration.Concurrency, Configuration.Concurrency);

        while (true)
        {
            await ChannelReader.WaitToReadAsync();

            await semaphore.WaitAsync();

            var workItem = await ChannelReader.ReadAsync();

            _ = RunWorkerAsync(workItem)
                .ContinueWith(result => semaphore.Release());
        }
    });

    workerThread.IsBackground = true;
    workerThread.Priority = ThreadPriority.Normal;

    ThreadExists = true;

    workerThread.Start();

    return true;
}

Notice that the execution inside our while loop can stop at 2 places. We should wait until something can be read from the channel, and then wait until a slot gets freed up. Combination of these stops save CPU time quite a lot, while is only being looped once there is something to process. If you want to check out RunWorkerAsync and get the full picture, visit the link at the bottom of the blog post 😉.

Final Words

A careful reader might have already noticed that we have fulfilled all the requirments

  • Create a queue ✔️
  • Create a state management system ✔️
  • Add a functionality that notifies the consumer on the progress being made (state) ✔️
  • Add the ability to cancel a process at any time ✔️
  • Should be generic ✔️

We can, of course, complicate things even further. Why use API’s resources to do the heavy work when we can have a separate application that will do it instead? This will boost the overall performance of the app and, with that, it will compute stuff at a better pace while the worker is doing its sweaty work. This topic is out-of-scope for this blog post, but here’s a quick how-to:

Create a Console Application (worker), use a messaging broker to send messages between these two apps, use ProcessWorker for throttling, socket based API for notifications, and that’s it! We have ourselves a fully-fledged throttling system that has its own memory block, thread pool and, depending on how you deploy the worker, it might also have its own CPU!

DIY Process Worker upgrades

There’s a bunch of ways you can upgrade your ProcessWorker:

  • Create a “priority queue” where certain work items should be prioritized over others,
    • Sometimes there might be a scenario where certain users (or code, or…) should have their work prioritized by ProcessWorker.
  • Work load balancing,
    • One instance can steal work from the other if the other one has too much work.
  • Dynamic changes to ProcessWorkerConfiguration,
    • With some heuristics dynamically change configuration to be able to function optimally.

💾 See the full example on Github!.

Thanks for reading!