Skip to content

Instantly share code, notes, and snippets.

@OmidID
Created March 12, 2018 21:00
Show Gist options
  • Save OmidID/1d3c53e782cf1535ec83a9d254660acf to your computer and use it in GitHub Desktop.
Save OmidID/1d3c53e782cf1535ec83a9d254660acf to your computer and use it in GitHub Desktop.
C# ThreadPool
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.ComponentModel;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace OmidID.Utils
{
/// <summary>
/// Threadpool class that help you to execute async task in pool
/// </summary>
/// <remarks>
/// https://gist.github.com/OmidID/1d3c53e782cf1535ec83a9d254660acf
/// </remarks>
public class ThreadPool
{
private readonly HashSet<InternalTaskHolder> workingTasks = new HashSet<InternalTaskHolder>();
private readonly ConcurrentQueue<InternalTaskHolder> queue = new ConcurrentQueue<InternalTaskHolder>();
private readonly object tasksMutex = new object();
private readonly object checkMutex = new object();
private int threadsMaxCount;
private class InternalTaskHolder
{
public ThreadStart Task { get; set; }
public TaskCompletionSource<IDisposable> Waiter { get; set; }
}
/// <summary>
/// Raised when all tasks have been completed.
/// </summary>
public event EventHandler Completed;
/// <summary>
/// Creates a new thread queue with a maximum number of threads
/// </summary>
/// <param name="threadsMaxCount">The maximum number of currently threads</param>
public ThreadPool(int threadsMaxCount)
{
this.threadsMaxCount = threadsMaxCount;
}
/// <summary>
/// Creates a new thread queue with a maximum number of threads and the tasks that should be executed.
/// </summary>
/// <param name="threadsMaxCount">The maximum number of currently threads.</param>
/// <param name="tasks">The tasks that will be execut in pool.</param>
public ThreadPool(int threadsMaxCount, ThreadStart[] tasks) : this(threadsMaxCount)
{
foreach (ThreadStart task in tasks)
{
queue.Enqueue(new InternalTaskHolder { Task = task });
}
}
/// <summary>
/// Adds a task and runs it if free thread exists. Otherwise enqueue.
/// </summary>
/// <param name="task">The task that will be execut</param>
public Task Enqueue(ThreadStart task)
{
lock (tasksMutex)
{
var holder = new InternalTaskHolder { Task = task, Waiter = new TaskCompletionSource<IDisposable>() };
if (workingTasks.Count >= threadsMaxCount)
queue.Enqueue(holder);
else
StartTask(holder);
return holder.Waiter.Task;
}
}
/// <summary>
/// Starts the execution of a task.
/// </summary>
/// <param name="task">The task that should be executed.</param>
private void StartTask(InternalTaskHolder task)
{
workingTasks.Add(task);
BackgroundWorker thread = new BackgroundWorker();
thread.DoWork += (s, e) => task.Task.Invoke();
thread.RunWorkerCompleted += (s, e) => TaskCompleted(task);
thread.RunWorkerAsync();
}
private void TaskCompleted(InternalTaskHolder start)
{
//release waiters
start.Waiter.SetResult(null);
lock (tasksMutex)
{
workingTasks.Remove(start);
CheckQueue();
if (queue.Count == 0 && workingTasks.Count == 0)
OnCompleted();
}
}
/// <summary>
/// Checks if the queue contains tasks and runs as many as there are free execution slots.
/// </summary>
private void CheckQueue()
{
lock (checkMutex)
while (queue.Count > 0 && workingTasks.Count < threadsMaxCount)
if (queue.TryDequeue(out InternalTaskHolder task))
StartTask(task);
}
/// <summary>
/// Raises the Completed event.
/// </summary>
protected void OnCompleted()
{
Completed?.Invoke(this, null);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment