Created
July 24, 2023 14:15
-
-
Save richardartoul/5d24cd508ac9372bf25c7a639668499c to your computer and use it in GitHub Desktop.
A simple Goroutine pool for amortizing stack growth overhead
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pool | |
import ( | |
"fmt" | |
"sync" | |
"runtime" | |
) | |
// GoroutinePool pools Goroutines to avoid performance penalties associated with spawning many | |
// short-lived goroutines that each have to individually grow their stack. | |
type GoroutinePool struct { | |
mu sync.Mutex | |
numWorkersBusy int | |
numWorkers int | |
workCh chan work | |
maxWorkers int | |
} | |
// NewGoroutinePool creates a new GoroutinePool. | |
func NewGoroutinePool(maxWorkers int) *GoroutinePool { | |
gp := &GoroutinePool{ | |
workCh: make(chan work, runtime.NumCPU()), | |
maxWorkers: maxWorkers, | |
} | |
return gp | |
} | |
func (g *GoroutinePool) Go(fn func()) { | |
var ( | |
existingWorkerAvailable = false | |
spawnNewWorker = false | |
) | |
g.mu.Lock() | |
numWorkers := g.numWorkers | |
if numWorkers < 0 { | |
g.mu.Unlock() | |
panic(fmt.Sprintf("numWorkers: %d < 0", numWorkers)) | |
} | |
numWorkersBusy := g.numWorkersBusy | |
if numWorkersBusy < 0 { | |
g.mu.Unlock() | |
panic(fmt.Sprintf("numWorkersBusy: %d < 0", numWorkersBusy)) | |
} | |
if g.numWorkersBusy < g.numWorkers { | |
g.numWorkersBusy++ | |
existingWorkerAvailable = true | |
} else if g.numWorkers < g.maxWorkers { | |
g.numWorkersBusy++ | |
g.numWorkers++ | |
spawnNewWorker = true | |
} else { | |
// Do nothing and it will just spawn a new goroutine that won't participate in the pool. | |
} | |
g.mu.Unlock() | |
if existingWorkerAvailable { | |
// There is a worker available, just submit it to the work channel to be picked | |
// up by a free worker. | |
g.workCh <- work{fn: fn} | |
return | |
} | |
if spawnNewWorker { | |
// There wasn't a free worker, so spawn a new one and then submit it to the work | |
// channel to be picked up by whichever goroutine gets to it first. | |
go g.workerLoop() | |
g.workCh <- work{fn: fn} | |
return | |
} | |
// There wasn't a free worker available, *and* we're already at the worker limit so | |
// we can't spawn a new worker. What we can do though is just spawn a new goroutine | |
// that won't be pooled. | |
go fn() | |
} | |
func (g *GoroutinePool) workerLoop() { | |
defer func() { | |
g.mu.Lock() | |
g.numWorkers-- | |
g.mu.Unlock() | |
}() | |
i := 0 | |
for work := range g.workCh { | |
work.fn() | |
i++ | |
g.mu.Lock() | |
g.numWorkersBusy-- | |
g.mu.Unlock() | |
if i > 1000 { | |
return | |
} | |
} | |
} | |
type work struct { | |
fn func() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment