Forked from ramonsmits/CancellationTimeoutBehavior.cs
Created
February 9, 2021 05:53
-
-
Save SeanFeldman/0b13879f0c6a84015699a76628392ab3 to your computer and use it in GitHub Desktop.
NServiceBus 7 behavior that adds a cancellation token to the context which can be retrieved by handler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// NServiceBus v8 will have improved support for cooperative cancellation. | |
// | |
// Register behavior: | |
// | |
// endpointConfiguration.Pipeline.Register(new CancellationTimeoutBehavior(TimeSpan.FromSeconds(5)), nameof(CancellationTimeoutBehavior)); | |
// | |
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using NServiceBus; | |
using NServiceBus.Extensibility; | |
using NServiceBus.Logging; | |
using NServiceBus.Pipeline; | |
public class HandlerThatGetsCancelleationTokenFromContext : IHandleMessages<object> | |
{ | |
public async Task Handle(object message, IMessageHandlerContext context) | |
{ | |
var cancellationToken = context.CancellationToken(); | |
await Task.Delay(Timeout.Infinite, cancellationToken); | |
} | |
} | |
public class CancellationTimeoutBehavior : Behavior<IIncomingLogicalMessageContext> | |
{ | |
static readonly ILog Log = LogManager.GetLogger<CancellationTimeoutBehavior>(); | |
TimeSpan OperationTimeout { get; } | |
public CancellationTimeoutBehavior(TimeSpan operationTimeout) | |
{ | |
OperationTimeout = operationTimeout; | |
} | |
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next) | |
{ | |
using (var cancellationTokenSource = new CancellationTokenSource(OperationTimeout)) | |
{ | |
var timeoutTask = Task.Delay(OperationTimeout, cancellationTokenSource.Token); | |
context.Extensions.Set(cancellationTokenSource.Token); | |
await next(); | |
if (cancellationTokenSource.IsCancellationRequested) | |
{ | |
// Not honored because the if statement would not be run if an OperationCanceledException would occur. | |
Log.WarnFormat("Cancellation was requested but not honored while processing message '{0}'", context.MessageId); | |
} | |
} | |
} | |
} | |
public static class IExtendableCancellationTokenExtension | |
{ | |
public static CancellationToken CancellationToken(this IExtendable context) | |
{ | |
return context.Extensions.Get<CancellationToken>(); | |
} | |
} | |
// Alternative way to retrieve cancellation token: | |
public abstract class HandleMessages<T> : IHandleMessages<T> | |
{ | |
public Task Handle(T message, IMessageHandlerContext context) | |
{ | |
return Handle(message, context, context.CancellationToken()); | |
} | |
public abstract Task Handle(T message, IMessageHandlerContext context, CancellationToken cancellationToken); | |
} | |
public class GetsCancelleationTokenViaBaseClass : HandleMessages<object> | |
{ | |
public override async Task Handle(object message, IMessageHandlerContext context, CancellationToken cancellationToken) | |
{ | |
await Task.Delay(Timeout.Infinite, cancellationToken); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment