Skip to content

Commit

Permalink
allow max concurrent runs and singleton mode together (#625)
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler authored Nov 28, 2023
1 parent 63225ac commit 5814fbc
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 109 deletions.
242 changes: 149 additions & 93 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,16 @@ type limitModeConfig struct {
limit uint
rescheduleLimiter chan struct{}
in chan uuid.UUID
// singletonJobs is used to track singleton jobs that are running
// in the limit mode runner. This is used to prevent the same job
// from running multiple times across limit mode runners when both
// a limit mode and singleton mode are enabled.
singletonJobs map[uuid.UUID]struct{}
singletonJobsMu sync.Mutex
}

func (e *executor) start() {
e.logger.Debug("executor started")
e.logger.Debug("gocron: executor started")

// creating the executor's context here as the executor
// is the only goroutine that should access this context
Expand All @@ -48,20 +54,11 @@ func (e *executor) start() {
e.ctx, e.cancel = context.WithCancel(context.Background())

// the standardJobsWg tracks
standardJobsWg := &waitGroupWithMutex{
wg: sync.WaitGroup{},
mu: sync.Mutex{},
}
standardJobsWg := &waitGroupWithMutex{}

singletonJobsWg := &waitGroupWithMutex{
wg: sync.WaitGroup{},
mu: sync.Mutex{},
}
singletonJobsWg := &waitGroupWithMutex{}

limitModeJobsWg := &waitGroupWithMutex{
wg: sync.WaitGroup{},
mu: sync.Mutex{},
}
limitModeJobsWg := &waitGroupWithMutex{}

// create a fresh map for tracking singleton runners
e.singletonRunners = make(map[uuid.UUID]singletonRunner)
Expand Down Expand Up @@ -141,7 +138,6 @@ func (e *executor) start() {
j := requestJobCtx(ctx, id, e.jobOutRequest)
if j == nil {
// safety check as it'd be strange bug if this occurred
// TODO add a log line here
return
}
if j.singletonMode {
Expand All @@ -155,7 +151,7 @@ func (e *executor) start() {
}
e.singletonRunners[id] = runner
singletonJobsWg.Add(1)
go e.limitModeRunner("singleton-"+id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
go e.singletonModeRunner("singleton-"+id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
}

if j.singletonLimitMode == LimitModeReschedule {
Expand Down Expand Up @@ -204,106 +200,85 @@ func (e *executor) start() {
}
}

func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
e.logger.Debug("stopping executor")
// we've been asked to stop. This is either because the scheduler has been told
// to stop all jobs or the scheduler has been asked to completely shutdown.
//
// cancel tells all the functions to stop their work and send in a done response
e.cancel()

// the wait for job channels are used to report back whether we successfully waited
// for all jobs to complete or if we hit the configured timeout.
waitForJobs := make(chan struct{}, 1)
waitForSingletons := make(chan struct{}, 1)
waitForLimitMode := make(chan struct{}, 1)

// the waiter context is used to cancel the functions waiting on jobs.
// this is done to avoid goroutine leaks.
waiterCtx, waiterCancel := context.WithCancel(context.Background())

// wait for standard jobs to complete
go func() {
e.logger.Debug("waiting for standard jobs to complete")
go func() {
// this is done in a separate goroutine, so we aren't
// blocked by the WaitGroup's Wait call in the event
// that the waiter context is cancelled.
// This particular goroutine could leak in the event that
// some long-running standard job doesn't complete.
standardJobsWg.Wait()
e.logger.Debug("standard jobs completed")
waitForJobs <- struct{}{}
}()
<-waiterCtx.Done()
}()
func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for {
select {
case id := <-in:
select {
case <-e.ctx.Done():
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name)
wg.Done()
return
default:
}

// wait for per job singleton limit mode runner jobs to complete
go func() {
e.logger.Debug("waiting for singleton jobs to complete")
go func() {
singletonJobsWg.Wait()
e.logger.Debug("singleton jobs completed")
waitForSingletons <- struct{}{}
}()
<-waiterCtx.Done()
}()
ctx, cancel := context.WithCancel(e.ctx)
j := requestJobCtx(ctx, id, e.jobOutRequest)
cancel()
if j != nil {
if j.singletonMode {
e.limitMode.singletonJobsMu.Lock()
_, ok := e.limitMode.singletonJobs[id]
if ok {
// this job is already running, so don't run it
// but instead reschedule it
e.limitMode.singletonJobsMu.Unlock()
select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
case e.jobIDsOut <- j.id:
}
continue
}
e.limitMode.singletonJobs[id] = struct{}{}
e.limitMode.singletonJobsMu.Unlock()
}
e.runJob(*j)

// wait for limit mode runners to complete
go func() {
e.logger.Debug("waiting for limit mode jobs to complete")
go func() {
limitModeJobsWg.Wait()
e.logger.Debug("limitMode jobs completed")
waitForLimitMode <- struct{}{}
}()
<-waiterCtx.Done()
}()
if j.singletonMode {
e.limitMode.singletonJobsMu.Lock()
delete(e.limitMode.singletonJobs, id)
e.limitMode.singletonJobsMu.Unlock()
}
}

// now either wait for all the jobs to complete,
// or hit the timeout.
var count int
timeout := time.Now().Add(e.stopTimeout)
for time.Now().Before(timeout) && count < 3 {
select {
case <-waitForJobs:
count++
case <-waitForSingletons:
count++
case <-waitForLimitMode:
count++
default:
// remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule {
select {
case <-rescheduleLimiter:
default:
}
}
case <-e.ctx.Done():
e.logger.Debug("limitModeRunner shutting down", "name", name)
wg.Done()
return
}
}
if count < 3 {
e.done <- ErrStopJobsTimedOut
e.logger.Debug("executor stopped - timed out")
} else {
e.done <- nil
e.logger.Debug("executor stopped")
}
waiterCancel()
}

func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("limitModeRunner starting", "name", name)
func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for {
select {
case id := <-in:
select {
case <-e.ctx.Done():
e.logger.Debug("limitModeRunner shutting down", "name", name)
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name)
wg.Done()
return
default:
}

ctx, cancel := context.WithCancel(e.ctx)
j := requestJobCtx(ctx, id, e.jobOutRequest)
cancel()
if j != nil {
e.runJob(*j)
}
cancel()

// remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule {
Expand Down Expand Up @@ -360,3 +335,84 @@ func (e *executor) runJob(j internalJob) {
_ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name)
}
}

func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
e.logger.Debug("gocron: stopping executor")
// we've been asked to stop. This is either because the scheduler has been told
// to stop all jobs or the scheduler has been asked to completely shutdown.
//
// cancel tells all the functions to stop their work and send in a done response
e.cancel()

// the wait for job channels are used to report back whether we successfully waited
// for all jobs to complete or if we hit the configured timeout.
waitForJobs := make(chan struct{}, 1)
waitForSingletons := make(chan struct{}, 1)
waitForLimitMode := make(chan struct{}, 1)

// the waiter context is used to cancel the functions waiting on jobs.
// this is done to avoid goroutine leaks.
waiterCtx, waiterCancel := context.WithCancel(context.Background())

// wait for standard jobs to complete
go func() {
e.logger.Debug("gocron: waiting for standard jobs to complete")
go func() {
// this is done in a separate goroutine, so we aren't
// blocked by the WaitGroup's Wait call in the event
// that the waiter context is cancelled.
// This particular goroutine could leak in the event that
// some long-running standard job doesn't complete.
standardJobsWg.Wait()
e.logger.Debug("gocron: standard jobs completed")
waitForJobs <- struct{}{}
}()
<-waiterCtx.Done()
}()

// wait for per job singleton limit mode runner jobs to complete
go func() {
e.logger.Debug("gocron: waiting for singleton jobs to complete")
go func() {
singletonJobsWg.Wait()
e.logger.Debug("gocron: singleton jobs completed")
waitForSingletons <- struct{}{}
}()
<-waiterCtx.Done()
}()

// wait for limit mode runners to complete
go func() {
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
go func() {
limitModeJobsWg.Wait()
e.logger.Debug("gocron: limitMode jobs completed")
waitForLimitMode <- struct{}{}
}()
<-waiterCtx.Done()
}()

// now either wait for all the jobs to complete,
// or hit the timeout.
var count int
timeout := time.Now().Add(e.stopTimeout)
for time.Now().Before(timeout) && count < 3 {
select {
case <-waitForJobs:
count++
case <-waitForSingletons:
count++
case <-waitForLimitMode:
count++
default:
}
}
if count < 3 {
e.done <- ErrStopJobsTimedOut
e.logger.Debug("gocron: executor stopped - timed out")
} else {
e.done <- nil
e.logger.Debug("gocron: executor stopped")
}
waiterCancel()
}
4 changes: 0 additions & 4 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,6 @@ func WithName(name string) JobOption {
// WithSingletonMode keeps the job from running again if it is already running.
// This is useful for jobs that should not overlap, and that occasionally
// (but not consistently) run longer than the interval between job runs.
//
// Note - this is mutually exclusive with WithLimitConcurrentJobs. If both
// are set, WithLimitConcurrentJobs will take precedence.
// WithSingletonMode effectively sets a per-job limit of 1 concurrent job.
func WithSingletonMode(mode LimitMode) JobOption {
return func(j *internalJob) error {
j.singletonMode = true
Expand Down
21 changes: 9 additions & 12 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
}

go func() {
s.logger.Info("new scheduler created")
s.logger.Info("gocron: new scheduler created")
for {
select {
case id := <-s.exec.jobIDsOut:
Expand Down Expand Up @@ -164,7 +164,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
// about jobs.

func (s *scheduler) stopScheduler() {
s.logger.Debug("stopping scheduler")
s.logger.Debug("gocron: stopping scheduler")
if s.started {
s.exec.stopCh <- struct{}{}
}
Expand All @@ -188,7 +188,7 @@ func (s *scheduler) stopScheduler() {
}
s.stopErrCh <- err
s.started = false
s.logger.Debug("scheduler stopped")
s.logger.Debug("gocron: scheduler stopped")
}

func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) {
Expand Down Expand Up @@ -294,7 +294,7 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) {
}

func (s *scheduler) selectStart() {
s.logger.Debug("scheduler starting")
s.logger.Debug("gocron: scheduler starting")
go s.exec.start()

s.started = true
Expand Down Expand Up @@ -325,7 +325,7 @@ func (s *scheduler) selectStart() {
select {
case <-s.shutdownCtx.Done():
case s.startedCh <- struct{}{}:
s.logger.Info("scheduler started")
s.logger.Info("gocron: scheduler started")
}
}

Expand Down Expand Up @@ -605,19 +605,16 @@ const (
// WithLimitConcurrentJobs sets the limit and mode to be used by the
// Scheduler for limiting the number of jobs that may be running at
// a given time.
//
// Note - this is mutually exclusive with WithSingletonMode. If both
// are set, WithLimitConcurrentJobs will take precedence.
// WithSingletonMode effectively sets a per-job limit of 1 concurrent job.
func WithLimitConcurrentJobs(limit uint, mode LimitMode) SchedulerOption {
return func(s *scheduler) error {
if limit == 0 {
return ErrWithLimitConcurrentJobsZero
}
s.exec.limitMode = &limitModeConfig{
mode: mode,
limit: limit,
in: make(chan uuid.UUID, 1000),
mode: mode,
limit: limit,
in: make(chan uuid.UUID, 1000),
singletonJobs: make(map[uuid.UUID]struct{}),
}
if mode == LimitModeReschedule {
s.exec.limitMode.rescheduleLimiter = make(chan struct{}, limit)
Expand Down
Loading

0 comments on commit 5814fbc

Please sign in to comment.