Skip to content

Instantly share code, notes, and snippets.

@odinserj
Last active October 31, 2024 11:31
Show Gist options
  • Save odinserj/a8332a3f486773baa009 to your computer and use it in GitHub Desktop.
Save odinserj/a8332a3f486773baa009 to your computer and use it in GitHub Desktop.
// Zero-Clause BSD (more permissive than MIT, doesn't require copyright notice)
//
// Permission to use, copy, modify, and/or distribute this software for any purpose
// with or without fee is hereby granted.
//
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
// AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
// OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
// TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
// THIS SOFTWARE.
public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter
{
private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(1);
public void OnCreating(CreatingContext filterContext)
{
if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
{
filterContext.Canceled = true;
}
}
public void OnPerformed(PerformedContext filterContext)
{
RemoveFingerprint(filterContext.Connection, filterContext.Job);
}
private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
{
var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(job));
DateTimeOffset timestamp;
if (fingerprint != null &&
fingerprint.ContainsKey("Timestamp") &&
DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out timestamp) &&
DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
{
// Actual fingerprint found, returning.
return false;
}
// Fingerprint does not exist, it is invalid (no `Timestamp` key),
// or it is not actual (timeout expired).
connection.SetRangeInHash(GetFingerprintKey(job), new Dictionary<string, string>
{
{ "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
});
return true;
}
}
private static void RemoveFingerprint(IStorageConnection connection, Job job)
{
using (connection.AcquireDistributedLock(GetFingerprintLockKey(job), LockTimeout))
using (var transaction = connection.CreateWriteTransaction())
{
transaction.RemoveHash(GetFingerprintKey(job));
transaction.Commit();
}
}
private static string GetFingerprintLockKey(Job job)
{
return String.Format("{0}:lock", GetFingerprintKey(job));
}
private static string GetFingerprintKey(Job job)
{
return String.Format("fingerprint:{0}", GetFingerprint(job));
}
private static string GetFingerprint(Job job)
{
string parameters = string.Empty;
if (job.Arguments != null)
{
parameters = string.Join(".", job.Arguments);
}
if (job.Type == null || job.Method == null)
{
return string.Empty;
}
var fingerprint = String.Format(
"{0}.{1}.{2}",
job.Type.FullName,
job.Method.Name, parameters);
return fingerprint;
}
void IClientFilter.OnCreated(CreatedContext filterContext)
{
}
void IServerFilter.OnPerforming(PerformingContext filterContext)
{
}
}
@mack0196
Copy link

mack0196 commented Feb 2, 2022

Thanks erwin-faceit. Updated.

using Hangfire.Client;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Security.Cryptography;

namespace Hangfire.Filters
{
    public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IApplyStateFilter
    {
        private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
        private static readonly CultureInfo EnUs = new CultureInfo("en-US");

        /// <summary>
        /// Convert arguments into a culture-aware string
        /// </summary>
        ///<see cref="https://gist.github.com/odinserj/a8332a3f486773baa009?permalink_comment_id=4048344#gistcomment-4048344"/><see>
        private static string ConvertArgument(object obj)
        {
            switch (obj)
            {
                case null:
                    return string.Empty;
                case string s:
                    return s;
                case DateTime dt:
                    return dt.ToString("o"); // ISO8601 date format
                default:
                    return (string)Convert.ChangeType(obj, typeof(string), EnUs); // And force the rest to US English
            }
        }

        private static bool AddFingerprintIfNotExists(IStorageConnection connection, Job job)
        {
            var fingerprintKey = GetFingerprintKey(job);
            var finterprintLockKey = GetFingerprintLockKey(fingerprintKey);
            var distributedLock = connection.AcquireDistributedLock(finterprintLockKey, LockTimeout);
            using (distributedLock)
            {
                var fingerprint = connection.GetAllEntriesFromHash(fingerprintKey);

                if (fingerprint != null)
                {
                    // Actual fingerprint found, returning.
                    return false;
                }

                // Fingerprint does not exist, it is invalid (no `Timestamp` key),
                // or it is not actual (timeout expired).
                connection.SetRangeInHash(fingerprintKey, new Dictionary<string, string>
                    {
                        { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
                    });

                return true;
            }
        }

        private static void RemoveFingerprint(IStorageConnection connection, Job job)
        {
            var fingerprintKey = GetFingerprintKey(job);
            var finterprintLockKey = GetFingerprintLockKey(fingerprintKey);
            using (connection.AcquireDistributedLock(finterprintLockKey, LockTimeout))
            using (var transaction = connection.CreateWriteTransaction())
            {
                transaction.RemoveHash(fingerprintKey);
                transaction.Commit();
            }
        }

        private static string GetFingerprintLockKey(string fingerprintKey)
        {
            return string.Format("{0}:lock", fingerprintKey);
        }

        private static string GetFingerprintKey(Job job)
        {
            return string.Format("fingerprint:{0}", GetFingerprint(job));
        }

        private static string GetFingerprint(Job job)
        {
            string parameters = string.Empty;
            if (job?.Args != null)
            {
                parameters = string.Join(".", job.Args.Select(ConvertArgument));
            }
            if (job?.Type == null || job.Method == null)
            {
                return string.Empty;
            }

            //https://gist.github.com/odinserj/a8332a3f486773baa009#gistcomment-1898401
            var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
            var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
            var fingerprint = Convert.ToBase64String(hash);
            return fingerprint;
        }

        public void OnCreating(CreatingContext filterContext)
        {
            if (!AddFingerprintIfNotExists(filterContext.Connection, filterContext.Job))
            {
                filterContext.Canceled = true;
            }
        }

        public void OnCreated(CreatedContext filterContext)
        {
            //do nothing
        }


        public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            if (context.NewState.Name.Equals(Hangfire.States.SucceededState.StateName)
                || context.NewState.Name.Equals(Hangfire.States.FailedState.StateName))
            {
                RemoveFingerprint(context.Connection, context.BackgroundJob.Job);
            }
        }

        public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            // do nothing
        }
    }
}

@mack0196
Copy link

mack0196 commented Feb 2, 2022

⚠️⚠️⚠️ This works for the Enqueue only; Scheduled jobs will duplicate.

⚠️⚠️⚠️ This filter will break ContinueWith scenarios that expect a jobId string returned from BackgroundJobClient.Enqueue.

@mcastellano
Copy link

@mack0196, I believe you should check for the transition to Deleted state as well for fingerprint removal.

@uciprian
Copy link

uciprian commented Sep 1, 2022

This solution sometimes is generating RedisTimeoutException upon fingerprint removal and in the error message it is something about
SUBSCRIBE fingerprint:60684f8fd3ad130e8c60210b41706448:lock:ev

@mack0196
Copy link

Thanks mcastellano

@afelinczak
Copy link

Hello,
We experienced problem when passing CancellationToken into a job.
https://docs.hangfire.io/en/latest/background-methods/using-cancellation-tokens.html
In our case, when job was completed calculated hash was incorrect as Hangfire replaced it with null in Parameters.
To fix it we changed ConvertArgument method to ignore token.

@HenrikHoyer
Copy link

@afelinczak

To fix it we changed ConvertArgument method to ignore token.

Please share your code changes

@DevineDevelopers
Copy link

@HenrikHoyer Did you ever figure out the code @afelinczak was talking about

@afelinczak
Copy link

afelinczak commented Sep 23, 2024

Hello, I missed the comment - sorry.
This is the fix we are using.

private static string ConvertArgument(object obj) => obj switch { CancellationToken => String.Empty, _ => JsonConvert.SerializeObject(obj) };

@FixRM
Copy link

FixRM commented Oct 31, 2024

Hangfire replaced it with null in Parameters

Hello @afelinczak. What do you mean? CancellationToken is a struct, it can't be null. We must pass it as the default but it'll be replaced with real token in runtime. So it shouldn't be null anyway.

Btw. There is at least one more "special" parameter: PerformContext. It is not well documented but is is used by popular extension Hangfire.Console

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment