Skip to content

Instantly share code, notes, and snippets.

@yuzd
Created December 17, 2021 08:27
Show Gist options
  • Save yuzd/e12b56ab9db51e31a26db8056674da61 to your computer and use it in GitHub Desktop.
Save yuzd/e12b56ab9db51e31a26db8056674da61 to your computer and use it in GitHub Desktop.
var asyncStreamsTest = new AsyncStreamTest();
asyncStreamsTest.ItemsUpdatedEvent += ItemsUpdatedEventhandler;
void ItemsUpdatedEventhandler(object sender, ItemsUpdatedEventArgs e)
{
($"Thread:{Thread.CurrentThread.ManagedThreadId}->" + e.Item).Dump();
}
await asyncStreamsTest.ChannelExample(30, QueryCancelToken);
await asyncStreamsTest.UsingDataFlowAsync(30, QueryCancelToken);
await asyncStreamsTest.UsingWorkerThreadsAsync(30, QueryCancelToken);
class AsyncStreamTest
{
private readonly TaskScheduler uiTaskScheduler;
public AsyncStreamTest()
{
//调度器
//var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.FromCurrentSynchronizationContext());
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default);
//taskSchedulerPair.ConcurrentScheduler
uiTaskScheduler = taskSchedulerPair.ExclusiveScheduler;
//只要ExclusiveScheduler没有运行任务,ConcurrentScheduler就可以让多个任务同时执行
//只有当ConcurrentScheduler没有任务执行的时候 ExclusiveScheduler才可以执行,并且每次只允许一个任务运行
}
private string page;//= null;
public string Page
{
get
{
return page ??= BuildPageAsJsonString();
}
private set
{
page = value;
}
}
private string BuildPageAsJsonString()
{
List<string> lines = new();
string lineString = Constants.LineString;
int i = 0;
while (i < Constants.PageSize)
{
lines.Add($"{i}{lineString}");
i++;
}
return JsonConvert.SerializeObject(lines);
}
public async Task<string> GetWebpageAsync()
{
await Task.Delay(Constants.WebSocketDelay);
return Page;
}
public async IAsyncEnumerable<string> GetLines(int count)
{
bool isContinue = true;
int itemCount = 0;
while (isContinue)
{
// The returned ValueTask needs to reference a Task<int> here
//as the Task may not yet have been completed
string jsonResponse = await GetWebpageAsync();
string[] items = JsonConvert.DeserializeObject<string[]>(jsonResponse);
//synchronous enumeration, the 'yielded'' ValueTask does not need to wrap a Task<int>
foreach (var item in items)
{
yield return item;
itemCount++;
if (itemCount == count)
{
isContinue = false;
break;
}
}
}
}
public async Task UsingWorkerThreadsAsync(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
var semaphoreSlim = new SemaphoreSlim(workersCount);
List<Task> tasks = new();
try
{
await foreach (var line in lines.WithCancellation(token))
{
await semaphoreSlim.WaitAsync(token);
var task = Task.Factory.StartNew(() => WorkersFunction(line, semaphoreSlim),
token,
TaskCreationOptions.None,
TaskScheduler.Default).ContinueWith(
(t) => RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(t.Result)),
token, TaskContinuationOptions.AttachedToParent, uiTaskScheduler);
tasks.Add(task);
}
}
catch (OperationCanceledException)
{ }
await Task.WhenAll(tasks);
}
private string WorkersFunction(string message, SemaphoreSlim semaphoreSlim)
{
try
{
string vowels = "aeiou";
Thread.Sleep(Constants.WorkerThreadSleep);//simulate a busy thread
return string.Concat(message.Where(c => !vowels.Contains(c)));
}
finally
{
semaphoreSlim.Release();
}
}
public async Task ChannelExample(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
var semaphoreSlim = new SemaphoreSlim(workersCount);
var channel = Channel.CreateBounded<Task<string>>(new BoundedChannelOptions(Constants.BufferCapacity)
{ SingleWriter = true });
var readerTask = ReadFromSingleChannelAsync(channel, token);
try
{
await foreach (var line in lines.WithCancellation(token))
{ //Cancelling the semaphore directly can be problematical
await semaphoreSlim.WaitAsync(CancellationToken.None);
var workerTask = Task.Run(() => WorkersFunction(line, semaphoreSlim));
await channel.Writer.WriteAsync(workerTask, token);
}
}
catch (OperationCanceledException) { }
channel.Writer.Complete();
await readerTask;
}
private async Task ReadFromSingleChannelAsync(Channel<Task<string>> channel, CancellationToken token = default)
{
while (await channel.Reader.WaitToReadAsync(token))
{
var readTask = await channel.Reader.ReadAsync(token);
var result = await readTask;
RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(result));
}
}
public async Task UsingDataFlowAsync(int workersCount, CancellationToken token = default)
{
var lines = GetLines(Constants.LinesRequired);
//set the TransformerBlock options
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = workersCount,//number of active worker threads
SingleProducerConstrained = true,//this is the default value. It saves having to gate the input
BoundedCapacity = Constants.BufferCapacity,//sets the buffer size
CancellationToken = token
};
//The Transform block takes a string as its input. It passes it to the WorkersFunction
//and outputs the value returned from that function.
var transformBlock = new TransformBlock<string, string>((message) => WorkersFunctionDF(message), options);
//The ActionBlock takes the output string from the TransformBlock and
//raises the ItemsUpdateEvent on the UI thread, passing the output string to the EventArgs of that event
var uiUpdaterBlock = new ActionBlock<string>(msg => RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(msg)),
new ExecutionDataflowBlockOptions { TaskScheduler = uiTaskScheduler, CancellationToken = token });
//Setting the DataFlowLinkOption PropagateCompletion flag means that, if the TransformBlock receives completion request
// that request will be passed on to the ActionBlock
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
//Couple the TransformerBlock to the ActionBlock
transformBlock.LinkTo(uiUpdaterBlock, linkOptions);
await foreach (var line in lines.WithCancellation(token))
{ //Send the line to the TransformerBlock and await for it to be accepted
_ = await transformBlock.SendAsync(line);
}
//Complete the TransformBlock and await for the ActionBlock to complete
transformBlock.Complete();
await uiUpdaterBlock.Completion;
}
private string WorkersFunctionDF(string line)
{
string vowels = "aeiou";
Thread.Sleep(Constants.WorkerThreadSleep);//simulate a busy thread
return string.Concat(line.Where(c => !vowels.Contains(c)));
}
public delegate void ItemsUpdatedHandler(object sender, ItemsUpdatedEventArgs e);
public event ItemsUpdatedHandler ItemsUpdatedEvent;
private void RaiseItemsUpdatedEvent(ItemsUpdatedEventArgs args)
{
ItemsUpdatedEvent?.Invoke(this, args);
}
}
static class Constants
{
public const int WorkerThreadSleep = 10;//millisecs
public const string LineString = "The quick brown fox jumps over the lazy dog";
public const int WorkersCount = 35;
public const int BufferCapacity = 3000;
public const int PageSize = 3000;
public const int WebSocketDelay = 40;
public const int LinesRequired = 10000;
}
class ItemsUpdatedEventArgs : EventArgs
{
public ItemsUpdatedEventArgs(string item)
{
Item = item;
}
public string Item { get; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment