-
-
Save odinserj/a8332a3f486773baa009 to your computer and use it in GitHub Desktop.
// Zero-Clause BSD (more permissive than MIT, doesn't require copyright notice) | |
// | |
// Permission to use, copy, modify, and/or distribute this software for any purpose | |
// with or without fee is hereby granted. | |
// | |
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | |
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY | |
// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, | |
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS | |
// OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER | |
// TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF | |
// THIS SOFTWARE. | |
public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter | |
{ | |
private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5); | |
private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(1); | |
public void OnCreating(CreatingContext filterContext) | |
{ | |
if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job)) | |
{ | |
filterContext.Canceled = true; | |
} | |
} | |
public void OnPerformed(PerformedContext filterContext) | |
{ | |
RemoveFingerprint(filterContext.Connection, filterContext.Job); | |
} | |
private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job) | |
{ | |
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout)) | |
{ | |
var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job)); | |
DateTimeOffset timestamp; | |
if (fingerprint != null && | |
fingerprint.ContainsKey("Timestamp") && | |
DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out timestamp) && | |
DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout)) | |
{ | |
// Actual fingerprint found, returning. | |
return false; | |
} | |
// Fingerprint does not exist, it is invalid (no `Timestamp` key), | |
// or it is not actual (timeout expired). | |
connection.SetRangeInHash(GetFingerprintKey(job), new Dictionary<string, string> | |
{ | |
{ "Timestamp", DateTimeOffset.UtcNow.ToString("o") } | |
}); | |
return true; | |
} | |
} | |
private static void RemoveFingerprint(IStorageConnection connection, Job job) | |
{ | |
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout)) | |
using (var transaction = connection.CreateWriteTransaction()) | |
{ | |
transaction.RemoveHash(GetFingerprintKey(job)); | |
transaction.Commit(); | |
} | |
} | |
private static string GetFingerprintLockKey(Job job) | |
{ | |
return String.Format("{0}:lock", GetFingerprintKey(job)); | |
} | |
private static string GetFingerprintKey(Job job) | |
{ | |
return String.Format("fingerprint:{0}", GetFingerprint(job)); | |
} | |
private static string GetFingerprint(Job job) | |
{ | |
string parameters = string.Empty; | |
if (job.Arguments != null) | |
{ | |
parameters = string.Join(".", job.Arguments); | |
} | |
if (job.Type == null || job.Method == null) | |
{ | |
return string.Empty; | |
} | |
var fingerprint = String.Format( | |
"{0}.{1}.{2}", | |
job.Type.FullName, | |
job.Method.Name, parameters); | |
return fingerprint; | |
} | |
void IClientFilter.OnCreated(CreatedContext filterContext) | |
{ | |
} | |
void IServerFilter.OnPerforming(PerformingContext filterContext) | |
{ | |
} | |
} |
@bokmadsen have you come up with a solution for the one hour FingerprintTimeout limitation (besides simply increasing the TimeSpan :)
Or anyone else have any suggestions / recommendations... it works fine, except like you say the job runs for more than the fingerprint timeout, at which point a duplicate is scheduled...
@mcarter101 ta ha zemren
My use case is that I don't want to enqueue a second job (with same parameter values) until the enqueued job goes to Succeeded or Failed.
From initial testing I am getting the results I want (block successive enqueue until job succeeds or fails) removing the fingerprint in IApplyStateFilter.OnStateApplied. Any feedback?
public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IApplyStateFilter
{
private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
{
var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));
if (fingerprint != null)
{
// Actual fingerprint found, returning.
return false;
}
// Fingerprint does not exist, it is invalid (no `Timestamp` key),
// or it is not actual (timeout expired).
connection.SetRangeInHash(GetFingerprintKey(job), new Dictionary<string, string>
{
{ "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
});
return true;
}
}
private static void RemoveFingerprint(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
using (var transaction = connection.CreateWriteTransaction())
{
transaction.RemoveHash(GetFingerprintKey(job));
transaction.Commit();
}
}
private static string GetFingerprintLockKey(Job job)
{
return string.Format("{0}:lock", GetFingerprintKey(job));
}
private static string GetFingerprintKey(Job job)
{
return string.Format("fingerprint:{0}", GetFingerprint(job));
}
private static string GetFingerprint(Job job)
{
string parameters = string.Empty;
if (job?.Args != null)
{
parameters = string.Join(".", job.Args);
}
if (job?.Type == null || job.Method == null)
{
return string.Empty;
}
//https://gist.github.com/odinserj/a8332a3f486773baa009#gistcomment-1898401
var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
var fingerprint = Convert.ToBase64String(hash);
return fingerprint;
}
public void OnCreating(CreatingContext filterContext)
{
if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
{
filterContext.Canceled = true;
}
}
public void OnCreated(CreatedContext filterContext)
{
//do nothing
}
public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
{
if (context.NewState.Name.Equals(Hangfire.States.SucceededState.StateName)
|| context.NewState.Name.Equals(Hangfire.States.FailedState.StateName))
{
RemoveFingerprint(context.Connection, context.BackgroundJob.Job);
}
}
public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
{
// do nothing
}
}
@mack0196 Your solution is working great for me. I don't have any particular feedback. Thanks for pulling together the final solution from all these comments!
Took me a while to figure out an issue I was having with the above code. When running multiple servers with different cultures AND having jobs with either doubles or dates in the arguments, this is going to generate different fingerprints. The main issue could be that not everybody sets the current culture, but the line
parameters = string.Join(".", job.Args);
will invoke ToString() on all arguments, resulting in this behavior.
So, either always set the CurrentCulture, or use something like below:
private static readonly CultureInfo EnUs = new CultureInfo("en-US");
private static string ConvertArgument(object obj)
{
switch (obj)
{
case null:
return string.Empty;
case string s:
return s;
case DateTime dt:
return dt.ToString("o"); // ISO8601 date format
default:
return (string) Convert.ChangeType(obj, typeof(string), EnUs); // And force the rest to US English
}
}
and change the line above to
parameters = string.Join(".", job.Args.Select(ConvertArgument));
Thanks erwin-faceit. Updated.
using Hangfire.Client;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Security.Cryptography;
namespace Hangfire.Filters
{
public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IApplyStateFilter
{
private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
private static readonly CultureInfo EnUs = new CultureInfo("en-US");
/// <summary>
/// Convert arguments into a culture-aware string
/// </summary>
///<see cref="https://gist.github.com/odinserj/a8332a3f486773baa009?permalink_comment_id=4048344#gistcomment-4048344"/><see>
private static string ConvertArgument(object obj)
{
switch (obj)
{
case null:
return string.Empty;
case string s:
return s;
case DateTime dt:
return dt.ToString("o"); // ISO8601 date format
default:
return (string)Convert.ChangeType(obj, typeof(string), EnUs); // And force the rest to US English
}
}
private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
{
var fingerprintKey = GetFingerprintKey(job);
var finterprintLockKey = GetFingerprintLockKey(fingerprintKey);
var distributedLock = connection.AcquireDistributedLock(finterprintLockKey, LockTimeout);
using (distributedLock)
{
var fingerprint = connection.GetAllEntriesFromHash(fingerprintKey);
if (fingerprint != null)
{
// Actual fingerprint found, returning.
return false;
}
// Fingerprint does not exist, it is invalid (no `Timestamp` key),
// or it is not actual (timeout expired).
connection.SetRangeInHash(fingerprintKey, new Dictionary<string, string>
{
{ "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
});
return true;
}
}
private static void RemoveFingerprint(IStorageConnection connection, Job job)
{
var fingerprintKey = GetFingerprintKey(job);
var finterprintLockKey = GetFingerprintLockKey(fingerprintKey);
using (connection.AcquireDistributedLock(finterprintLockKey, LockTimeout))
using (var transaction = connection.CreateWriteTransaction())
{
transaction.RemoveHash(fingerprintKey);
transaction.Commit();
}
}
private static string GetFingerprintLockKey(string fingerprintKey)
{
return string.Format("{0}:lock", fingerprintKey);
}
private static string GetFingerprintKey(Job job)
{
return string.Format("fingerprint:{0}", GetFingerprint(job));
}
private static string GetFingerprint(Job job)
{
string parameters = string.Empty;
if (job?.Args != null)
{
parameters = string.Join(".", job.Args.Select(ConvertArgument));
}
if (job?.Type == null || job.Method == null)
{
return string.Empty;
}
//https://gist.github.com/odinserj/a8332a3f486773baa009#gistcomment-1898401
var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
var fingerprint = Convert.ToBase64String(hash);
return fingerprint;
}
public void OnCreating(CreatingContext filterContext)
{
if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
{
filterContext.Canceled = true;
}
}
public void OnCreated(CreatedContext filterContext)
{
//do nothing
}
public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
{
if (context.NewState.Name.Equals(Hangfire.States.SucceededState.StateName)
|| context.NewState.Name.Equals(Hangfire.States.FailedState.StateName))
{
RemoveFingerprint(context.Connection, context.BackgroundJob.Job);
}
}
public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
{
// do nothing
}
}
}
@mack0196, I believe you should check for the transition to Deleted state as well for fingerprint removal.
This solution sometimes is generating RedisTimeoutException upon fingerprint removal and in the error message it is something about
SUBSCRIBE fingerprint:60684f8fd3ad130e8c60210b41706448:lock:ev
Thanks mcastellano
Hello,
We experienced problem when passing CancellationToken into a job.
https://docs.hangfire.io/en/latest/background-methods/using-cancellation-tokens.html
In our case, when job was completed calculated hash was incorrect as Hangfire replaced it with null in Parameters.
To fix it we changed ConvertArgument method to ignore token.
To fix it we changed ConvertArgument method to ignore token.
Please share your code changes
@HenrikHoyer Did you ever figure out the code @afelinczak was talking about
Hello, I missed the comment - sorry.
This is the fix we are using.
private static string ConvertArgument(object obj) => obj switch { CancellationToken => String.Empty, _ => JsonConvert.SerializeObject(obj) };
Hangfire replaced it with null in Parameters
Hello @afelinczak. What do you mean? CancellationToken is a struct, it can't be null. We must pass it as the default
but it'll be replaced with real token in runtime. So it shouldn't be null anyway.
Btw. There is at least one more "special" parameter: PerformContext
. It is not well documented but is is used by popular extension Hangfire.Console
Original GIST should be updated with the latest comments, maybe this should be even added in the official documentation?
Several people seem to need this solution, I am having this problem in an application with hangfire 1.7.x and I'm not sure if this is going to work in production. Reading the comments it seems the original code is not yet production ready