Created
December 17, 2021 08:27
-
-
Save yuzd/e12b56ab9db51e31a26db8056674da61 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var asyncStreamsTest = new AsyncStreamTest(); | |
asyncStreamsTest.ItemsUpdatedEvent += ItemsUpdatedEventhandler; | |
void ItemsUpdatedEventhandler(object sender, ItemsUpdatedEventArgs e) | |
{ | |
($"Thread:{Thread.CurrentThread.ManagedThreadId}->" + e.Item).Dump(); | |
} | |
await asyncStreamsTest.ChannelExample(30, QueryCancelToken); | |
await asyncStreamsTest.UsingDataFlowAsync(30, QueryCancelToken); | |
await asyncStreamsTest.UsingWorkerThreadsAsync(30, QueryCancelToken); | |
class AsyncStreamTest | |
{ | |
private readonly TaskScheduler uiTaskScheduler; | |
public AsyncStreamTest() | |
{ | |
//调度器 | |
//var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.FromCurrentSynchronizationContext()); | |
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default); | |
//taskSchedulerPair.ConcurrentScheduler | |
uiTaskScheduler = taskSchedulerPair.ExclusiveScheduler; | |
//只要ExclusiveScheduler没有运行任务,ConcurrentScheduler就可以让多个任务同时执行 | |
//只有当ConcurrentScheduler没有任务执行的时候 ExclusiveScheduler才可以执行,并且每次只允许一个任务运行 | |
} | |
private string page;//= null; | |
public string Page | |
{ | |
get | |
{ | |
return page ??= BuildPageAsJsonString(); | |
} | |
private set | |
{ | |
page = value; | |
} | |
} | |
private string BuildPageAsJsonString() | |
{ | |
List<string> lines = new(); | |
string lineString = Constants.LineString; | |
int i = 0; | |
while (i < Constants.PageSize) | |
{ | |
lines.Add($"{i}{lineString}"); | |
i++; | |
} | |
return JsonConvert.SerializeObject(lines); | |
} | |
public async Task<string> GetWebpageAsync() | |
{ | |
await Task.Delay(Constants.WebSocketDelay); | |
return Page; | |
} | |
public async IAsyncEnumerable<string> GetLines(int count) | |
{ | |
bool isContinue = true; | |
int itemCount = 0; | |
while (isContinue) | |
{ | |
// The returned ValueTask needs to reference a Task<int> here | |
//as the Task may not yet have been completed | |
string jsonResponse = await GetWebpageAsync(); | |
string[] items = JsonConvert.DeserializeObject<string[]>(jsonResponse); | |
//synchronous enumeration, the 'yielded'' ValueTask does not need to wrap a Task<int> | |
foreach (var item in items) | |
{ | |
yield return item; | |
itemCount++; | |
if (itemCount == count) | |
{ | |
isContinue = false; | |
break; | |
} | |
} | |
} | |
} | |
public async Task UsingWorkerThreadsAsync(int workersCount, CancellationToken token = default) | |
{ | |
var lines = GetLines(Constants.LinesRequired); | |
var semaphoreSlim = new SemaphoreSlim(workersCount); | |
List<Task> tasks = new(); | |
try | |
{ | |
await foreach (var line in lines.WithCancellation(token)) | |
{ | |
await semaphoreSlim.WaitAsync(token); | |
var task = Task.Factory.StartNew(() => WorkersFunction(line, semaphoreSlim), | |
token, | |
TaskCreationOptions.None, | |
TaskScheduler.Default).ContinueWith( | |
(t) => RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(t.Result)), | |
token, TaskContinuationOptions.AttachedToParent, uiTaskScheduler); | |
tasks.Add(task); | |
} | |
} | |
catch (OperationCanceledException) | |
{ } | |
await Task.WhenAll(tasks); | |
} | |
private string WorkersFunction(string message, SemaphoreSlim semaphoreSlim) | |
{ | |
try | |
{ | |
string vowels = "aeiou"; | |
Thread.Sleep(Constants.WorkerThreadSleep);//simulate a busy thread | |
return string.Concat(message.Where(c => !vowels.Contains(c))); | |
} | |
finally | |
{ | |
semaphoreSlim.Release(); | |
} | |
} | |
public async Task ChannelExample(int workersCount, CancellationToken token = default) | |
{ | |
var lines = GetLines(Constants.LinesRequired); | |
var semaphoreSlim = new SemaphoreSlim(workersCount); | |
var channel = Channel.CreateBounded<Task<string>>(new BoundedChannelOptions(Constants.BufferCapacity) | |
{ SingleWriter = true }); | |
var readerTask = ReadFromSingleChannelAsync(channel, token); | |
try | |
{ | |
await foreach (var line in lines.WithCancellation(token)) | |
{ //Cancelling the semaphore directly can be problematical | |
await semaphoreSlim.WaitAsync(CancellationToken.None); | |
var workerTask = Task.Run(() => WorkersFunction(line, semaphoreSlim)); | |
await channel.Writer.WriteAsync(workerTask, token); | |
} | |
} | |
catch (OperationCanceledException) { } | |
channel.Writer.Complete(); | |
await readerTask; | |
} | |
private async Task ReadFromSingleChannelAsync(Channel<Task<string>> channel, CancellationToken token = default) | |
{ | |
while (await channel.Reader.WaitToReadAsync(token)) | |
{ | |
var readTask = await channel.Reader.ReadAsync(token); | |
var result = await readTask; | |
RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(result)); | |
} | |
} | |
public async Task UsingDataFlowAsync(int workersCount, CancellationToken token = default) | |
{ | |
var lines = GetLines(Constants.LinesRequired); | |
//set the TransformerBlock options | |
var options = new ExecutionDataflowBlockOptions | |
{ | |
MaxDegreeOfParallelism = workersCount,//number of active worker threads | |
SingleProducerConstrained = true,//this is the default value. It saves having to gate the input | |
BoundedCapacity = Constants.BufferCapacity,//sets the buffer size | |
CancellationToken = token | |
}; | |
//The Transform block takes a string as its input. It passes it to the WorkersFunction | |
//and outputs the value returned from that function. | |
var transformBlock = new TransformBlock<string, string>((message) => WorkersFunctionDF(message), options); | |
//The ActionBlock takes the output string from the TransformBlock and | |
//raises the ItemsUpdateEvent on the UI thread, passing the output string to the EventArgs of that event | |
var uiUpdaterBlock = new ActionBlock<string>(msg => RaiseItemsUpdatedEvent(new ItemsUpdatedEventArgs(msg)), | |
new ExecutionDataflowBlockOptions { TaskScheduler = uiTaskScheduler, CancellationToken = token }); | |
//Setting the DataFlowLinkOption PropagateCompletion flag means that, if the TransformBlock receives completion request | |
// that request will be passed on to the ActionBlock | |
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; | |
//Couple the TransformerBlock to the ActionBlock | |
transformBlock.LinkTo(uiUpdaterBlock, linkOptions); | |
await foreach (var line in lines.WithCancellation(token)) | |
{ //Send the line to the TransformerBlock and await for it to be accepted | |
_ = await transformBlock.SendAsync(line); | |
} | |
//Complete the TransformBlock and await for the ActionBlock to complete | |
transformBlock.Complete(); | |
await uiUpdaterBlock.Completion; | |
} | |
private string WorkersFunctionDF(string line) | |
{ | |
string vowels = "aeiou"; | |
Thread.Sleep(Constants.WorkerThreadSleep);//simulate a busy thread | |
return string.Concat(line.Where(c => !vowels.Contains(c))); | |
} | |
public delegate void ItemsUpdatedHandler(object sender, ItemsUpdatedEventArgs e); | |
public event ItemsUpdatedHandler ItemsUpdatedEvent; | |
private void RaiseItemsUpdatedEvent(ItemsUpdatedEventArgs args) | |
{ | |
ItemsUpdatedEvent?.Invoke(this, args); | |
} | |
} | |
static class Constants | |
{ | |
public const int WorkerThreadSleep = 10;//millisecs | |
public const string LineString = "The quick brown fox jumps over the lazy dog"; | |
public const int WorkersCount = 35; | |
public const int BufferCapacity = 3000; | |
public const int PageSize = 3000; | |
public const int WebSocketDelay = 40; | |
public const int LinesRequired = 10000; | |
} | |
class ItemsUpdatedEventArgs : EventArgs | |
{ | |
public ItemsUpdatedEventArgs(string item) | |
{ | |
Item = item; | |
} | |
public string Item { get; } | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment