Last active
September 19, 2024 20:11
-
-
Save OmidID/da234a6cfbacebbd46defdb71c6cf95e to your computer and use it in GitHub Desktop.
C# Task Pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Concurrent; | |
/// <summary> | |
/// TaskPool class that help you to execute async task in pool | |
/// </summary> | |
/// <remarks> | |
/// https://gist.github.com/OmidID/da234a6cfbacebbd46defdb71c6cf95e | |
/// </remarks> | |
public class TaskPool(int threadsMaxCount) | |
{ | |
private readonly HashSet<IInternalTask> _workingTasks = []; | |
private readonly ConcurrentQueue<IInternalTask> _queue = new(); | |
private readonly object _tasksMutex = new(); | |
private readonly object _checkMutex = new(); | |
public int ThreadsMaxCount => threadsMaxCount; | |
private interface IInternalTask | |
{ | |
Task ExecuteAsync(); | |
} | |
private sealed class InternalTaskHolder : IInternalTask | |
{ | |
public required Func<Task> Task { get; init; } | |
public required TaskCompletionSource<object?>? Waiter { get; init; } | |
public async Task ExecuteAsync() | |
{ | |
await Task(); | |
Waiter?.SetResult(null); | |
} | |
} | |
private sealed class InternalTaskHolderGeneric<T> : IInternalTask | |
{ | |
public required Func<Task<T>> Task { get; init; } | |
public required TaskCompletionSource<T> Waiter { get; init; } | |
public async Task ExecuteAsync() | |
{ | |
var result = await Task(); | |
Waiter.SetResult(result); | |
} | |
} | |
/// <summary> | |
/// Raised when all tasks have been completed. | |
/// </summary> | |
public event EventHandler? Completed; | |
public TaskPool(int threadsMaxCount, IEnumerable<Func<Task>> tasks) | |
: this(threadsMaxCount) | |
{ | |
foreach (var task in tasks) | |
{ | |
_queue.Enqueue(new InternalTaskHolder | |
{ | |
Task = task, | |
Waiter = null | |
}); | |
} | |
CheckQueue(); | |
} | |
/// <summary> | |
/// Adds a task and runs it if free thread exists. Otherwise, enqueue. | |
/// </summary> | |
/// <param name="task">The task that will be executed.</param> | |
public Task EnqueueAsync(Func<Task> task) | |
{ | |
lock (_tasksMutex) | |
{ | |
var holder = new InternalTaskHolder | |
{ | |
Task = task, | |
Waiter = new TaskCompletionSource<object?>() | |
}; | |
if (_workingTasks.Count >= ThreadsMaxCount) | |
{ | |
_queue.Enqueue(holder); | |
} | |
else | |
{ | |
_ = StartTaskAsync(holder); | |
} | |
return holder.Waiter.Task; | |
} | |
} | |
/// <summary> | |
/// Adds a task and runs it if free thread exists. Otherwise, enqueue. | |
/// </summary> | |
/// <param name="task">The task that will be executed.</param> | |
public Task<T> EnqueueAsync<T>(Func<Task<T>> task) | |
{ | |
lock (_tasksMutex) | |
{ | |
var holder = new InternalTaskHolderGeneric<T> | |
{ | |
Task = task, | |
Waiter = new() | |
}; | |
if (_workingTasks.Count >= ThreadsMaxCount) | |
{ | |
_queue.Enqueue(holder); | |
} | |
else | |
{ | |
_ = StartTaskAsync(holder); | |
} | |
return holder.Waiter.Task; | |
} | |
} | |
/// <summary> | |
/// Starts the execution of a task. | |
/// </summary> | |
/// <param name="task">The task that should be executed.</param> | |
private async Task StartTaskAsync(IInternalTask task) | |
{ | |
await task.ExecuteAsync(); | |
TaskCompleted(task); | |
} | |
private void TaskCompleted(IInternalTask task) | |
{ | |
lock (_tasksMutex) | |
{ | |
_workingTasks.Remove(task); | |
CheckQueue(); | |
if (_queue.IsEmpty && _workingTasks.Count == 0) | |
{ | |
OnCompleted(); | |
} | |
} | |
} | |
/// <summary> | |
/// Checks if the queue contains tasks and runs as many as there are free execution slots. | |
/// </summary> | |
private void CheckQueue() | |
{ | |
lock (_checkMutex) | |
{ | |
while (!_queue.IsEmpty && _workingTasks.Count < ThreadsMaxCount) | |
{ | |
if (!_queue.TryDequeue(out var task)) | |
{ | |
continue; | |
} | |
_workingTasks.Add(task); | |
_ = StartTaskAsync(task); | |
} | |
} | |
} | |
/// <summary> | |
/// Raises the Completed event. | |
/// </summary> | |
protected virtual void OnCompleted() | |
{ | |
Completed?.Invoke(this, EventArgs.Empty); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Good