Created
April 20, 2024 01:12
-
-
Save Loschcode/9a9f643fe5086b3585d9c3507f2214b8 to your computer and use it in GitHub Desktop.
Job scheduler in Golang
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package job_scheduler | |
import ( | |
"context" | |
"database/sql/driver" | |
"encoding/json" | |
"fmt" | |
"log" | |
"time" | |
"aquiestoy/pkg/mailer" | |
"aquiestoy/pkg/tracking" | |
"github.com/getsentry/sentry-go" | |
"github.com/go-co-op/gocron" | |
"github.com/google/uuid" | |
"github.com/hibiken/asynq" | |
"github.com/redis/go-redis/v9" | |
"gorm.io/gorm" | |
) | |
const ( | |
maxConcurrency = 10 | |
fetchFrequencyInSeconds = 10 | |
) | |
type Clients struct { | |
Db *gorm.DB | |
Redis *redis.Client | |
Tracking *tracking.Tracking | |
Mailer *mailer.Mailer | |
} | |
type Scheduler struct { | |
Clients Clients | |
JobsMap JobsMap | |
} | |
type JobsMap map[string]func(*Scheduler, map[string]interface{}) error | |
type ScheduledJob struct { | |
ID *uuid.UUID `json:"id,omitempty" gorm:"type:uuid;default:uuid_generate_v4()" db:"id"` | |
ScheduledFor time.Time | |
FunctionName string | |
Arguments Arguments | |
Status string | |
CreatedAt time.Time | |
UpdatedAt time.Time | |
} | |
// all of that is to handle JSONB | |
// if it's seen in more places in the code | |
// Let's convert this struct name into "JSONB" | |
// or something | |
type Arguments map[string]interface{} | |
func (args Arguments) Value() (driver.Value, error) { | |
return json.Marshal(args) | |
} | |
func (args *Arguments) Scan(src interface{}) error { | |
source, ok := src.([]byte) | |
if !ok { | |
return fmt.Errorf("type assertion when processing JSONB failed (1)") | |
} | |
var i interface{} | |
err := json.Unmarshal(source, &i) | |
if err != nil { | |
return err | |
} | |
*args, ok = i.(map[string]interface{}) | |
if !ok { | |
return fmt.Errorf("type assertion when processing JSONB failed (2)") | |
} | |
return nil | |
} | |
func NewScheduler(clients Clients, jobsMap JobsMap) (*Scheduler, error) { | |
log.Println("About to launch job scheduler.") | |
return &Scheduler{ | |
Clients: clients, | |
JobsMap: jobsMap, | |
}, nil | |
} | |
func (schdl *Scheduler) AddJob(scheduled_for time.Time, functionName string, arguments map[string]interface{}) error { | |
job := ScheduledJob{ | |
ScheduledFor: scheduled_for, | |
FunctionName: functionName, | |
Status: "wait", | |
Arguments: arguments, | |
} | |
if err := schdl.Clients.Db.Create(&job).Error; err != nil { | |
// we'll usually not consider the error outside of this to not block the system | |
// so we can spawn sentry from here | |
sentry.CaptureException(err) | |
return err | |
} | |
log.Printf("A new job %s has been inserted.\n", functionName) | |
return nil | |
} | |
func (schdl *Scheduler) Run() { | |
go schdl.periodicJobChecker() | |
select {} | |
} | |
func (schdl *Scheduler) runJob(job *ScheduledJob) error { | |
if customFunction, exists := schdl.JobsMap[job.FunctionName]; exists { | |
err := customFunction(schdl, job.Arguments) | |
if err != nil { | |
schdl.Clients.Db.Model(&job).Update("status", "fail") | |
sentry.CaptureException(err) | |
readError := fmt.Sprintf("Job %s failed: %s", job.FunctionName, err) | |
log.Print(readError) | |
schdl.Clients.Tracking.AddAnonymousEvent(context.Background(), readError, job.Arguments) | |
return err | |
} | |
schdl.Clients.Db.Delete(job) | |
} else { | |
log.Printf("Function %s not found", job.FunctionName) | |
} | |
return nil | |
} | |
// Add status with wait, pending, fail, timeout | |
func (schdl *Scheduler) periodicJobChecker() error { | |
cronScheduler := gocron.NewScheduler(time.UTC) | |
_, err := cronScheduler.Every(10).Seconds().Do(schdl.checkAndRunJobs) | |
if err != nil { | |
return err | |
} | |
cronScheduler.StartAsync() | |
log.Println("CRON Job has been started asynchronously") | |
return nil | |
} | |
func (schdl *Scheduler) checkAndRunJobs() error { | |
var jobs []*ScheduledJob | |
now := time.Now() | |
result := schdl.Clients.Db.Where("scheduled_for < ? AND status = ?", now, "wait").Limit(maxConcurrency).Find(&jobs) | |
if result.Error != nil { | |
return result.Error | |
} | |
if len(jobs) > 0 { | |
log.Printf("%v jobs will be processed\n", len(jobs)) | |
} | |
for _, job := range jobs { | |
schdl.Clients.Db.Model(&job).Update("status", "pending") | |
if job.ScheduledFor.Before(now) { | |
// this magical piece of code prevent panic from | |
// spreading outside of the goroutine | |
go func(job *ScheduledJob) { | |
defer func() { | |
if r := recover(); r != nil { | |
log.Printf("Job %s just crashed\n", job.FunctionName) | |
sentry.CaptureMessage(fmt.Sprintf("Job crashed %s but we don't have the trace, it's likely a job internals issue", job.FunctionName)) | |
schdl.Clients.Db.Model(&job).Update("status", "fail") | |
} | |
}() | |
schdl.runJob(job) | |
}(job) | |
} | |
} | |
return nil | |
} | |
package jobs | |
import ( | |
"aquiestoy/internal/models" | |
"aquiestoy/internal/services" | |
"aquiestoy/pkg/job_scheduler" | |
"aquiestoy/pkg/tracking" | |
"context" | |
"fmt" | |
"time" | |
"github.com/google/uuid" | |
"gorm.io/gorm" | |
) | |
func ServicesClients(schdl *job_scheduler.Scheduler) services.Clients { | |
return services.Clients{ | |
Db: schdl.Clients.Db, | |
Redis: schdl.Clients.Redis, | |
Tracking: schdl.Clients.Tracking, | |
Mailer: schdl.Clients.Mailer, | |
JobScheduler: schdl, | |
} | |
} | |
// Map you must add the jobs you want to available to the scheduler in here | |
func Map() job_scheduler.JobsMap { | |
return job_scheduler.JobsMap{ | |
"ping": ping, | |
} | |
} | |
func ping(schdl *job_scheduler.Scheduler, args map[string]interface{}) error { | |
return nil | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment