Last active
April 1, 2021 21:53
-
-
Save andreib1/a136ce1c951e3281a1990c23871d948b to your computer and use it in GitHub Desktop.
Prevent a nats message from being redelivered during processing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This function can be called to start a hold on a message for processing on Jetstream. | |
// The interval should be shorter than the message ack timeout. | |
// The parentCancel can be used to provide a cancellation function for processing in the event that we cannot set InProgress | |
// The function return should be named release by the caller, and can be used to unlock message processing after a successful ACK/NAK | |
func HoldMessage(ctx context.Context, msg *nats.Msg, interval time.Duration, parentCancel context.CancelFunc) context.CancelFunc { | |
ctx, release := context.WithCancel(ctx) | |
if interval < 1 { | |
return release | |
} | |
var exit bool | |
go func() { | |
for { | |
select { | |
case <- time.After(interval): | |
if err := msg.InProgress() ; err != nil { | |
release() | |
if parentCancel != nil { | |
parentCancel() | |
exit = true | |
} | |
} | |
case <-ctx.Done(): | |
exit = true | |
} | |
if exit { | |
break | |
} | |
} | |
release() | |
}() | |
return release | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment