Files
cloud-services/pkg/utils/threadpool/threadpool.go

86 lines
2.1 KiB
Go

package threadpool
import (
"fmt"
"sync"
)
var (
ErrQueueFull = fmt.Errorf("queue is full, not able add the task")
ErrNoWorkers = fmt.Errorf("worker pool is empty")
)
type ThreadPool struct {
workersTopLimit int
workerPool chan chan interface{}
closeHandle chan bool
wgReceivers sync.WaitGroup
}
func NewThreadPool(workersLimit int) *ThreadPool {
threadPool := &ThreadPool{workersTopLimit: workersLimit}
threadPool.workerPool = make(chan chan interface{}, workersLimit)
threadPool.closeHandle = make(chan bool)
threadPool.wgReceivers = sync.WaitGroup{}
threadPool.wgReceivers.Add(workersLimit)
threadPool.createPool()
return threadPool
}
func (t *ThreadPool) Close() {
close(t.closeHandle) // Stops all the routines
t.wgReceivers.Wait()
close(t.workerPool) // Closes the Job threadpool
}
func (t *ThreadPool) createPool() {
for i := 0; i < t.workersTopLimit; i++ {
worker := NewWorker(t.workerPool, t.closeHandle, &t.wgReceivers)
worker.Start()
}
go t.dispatch()
}
func (t *ThreadPool) submitTask(task interface{}) error {
// Add the task to the job queue
//Find a worker for the job
if t.workerPool == nil || t.workersTopLimit == 0 {
return ErrNoWorkers
}
jobChannel := <-t.workerPool
//Submit job to the worker
jobChannel <- task
return nil
}
// Execute submits the job to available worker
func (t *ThreadPool) Execute(task Runnable) error {
return t.submitTask(task)
}
// ExecuteFuture will submit the task to the threadpool and return the response handle
func (t *ThreadPool) ExecuteFuture(task Callable) (*Future, error) {
// Create future and task
if t.workerPool == nil || t.workersTopLimit == 0 {
return nil, ErrNoWorkers
}
handle := &Future{response: make(chan interface{})}
futureTask := callableTask{Task: task, Handle: handle}
err := t.submitTask(futureTask)
if err != nil {
return nil, err
}
return futureTask.Handle, nil
}
// dispatch listens to the jobqueue and handles the jobs to the workers
func (t *ThreadPool) dispatch() {
for {
select {
case <-t.closeHandle:
// Close thread threadpool
return
}
}
}