86 lines
2.1 KiB
Go
86 lines
2.1 KiB
Go
package threadpool
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
)
|
|
|
|
var (
|
|
ErrQueueFull = fmt.Errorf("queue is full, not able add the task")
|
|
ErrNoWorkers = fmt.Errorf("worker pool is empty")
|
|
)
|
|
|
|
type ThreadPool struct {
|
|
workersTopLimit int
|
|
workerPool chan chan interface{}
|
|
closeHandle chan bool
|
|
wgReceivers sync.WaitGroup
|
|
}
|
|
|
|
func NewThreadPool(workersLimit int) *ThreadPool {
|
|
threadPool := &ThreadPool{workersTopLimit: workersLimit}
|
|
threadPool.workerPool = make(chan chan interface{}, workersLimit)
|
|
threadPool.closeHandle = make(chan bool)
|
|
threadPool.wgReceivers = sync.WaitGroup{}
|
|
threadPool.wgReceivers.Add(workersLimit)
|
|
threadPool.createPool()
|
|
return threadPool
|
|
|
|
}
|
|
func (t *ThreadPool) Close() {
|
|
close(t.closeHandle) // Stops all the routines
|
|
t.wgReceivers.Wait()
|
|
close(t.workerPool) // Closes the Job threadpool
|
|
}
|
|
|
|
func (t *ThreadPool) createPool() {
|
|
for i := 0; i < t.workersTopLimit; i++ {
|
|
worker := NewWorker(t.workerPool, t.closeHandle, &t.wgReceivers)
|
|
worker.Start()
|
|
}
|
|
go t.dispatch()
|
|
}
|
|
|
|
func (t *ThreadPool) submitTask(task interface{}) error {
|
|
// Add the task to the job queue
|
|
//Find a worker for the job
|
|
if t.workerPool == nil || t.workersTopLimit == 0 {
|
|
return ErrNoWorkers
|
|
}
|
|
jobChannel := <-t.workerPool
|
|
//Submit job to the worker
|
|
jobChannel <- task
|
|
return nil
|
|
}
|
|
|
|
// Execute submits the job to available worker
|
|
func (t *ThreadPool) Execute(task Runnable) error {
|
|
return t.submitTask(task)
|
|
}
|
|
|
|
// ExecuteFuture will submit the task to the threadpool and return the response handle
|
|
func (t *ThreadPool) ExecuteFuture(task Callable) (*Future, error) {
|
|
// Create future and task
|
|
if t.workerPool == nil || t.workersTopLimit == 0 {
|
|
return nil, ErrNoWorkers
|
|
}
|
|
handle := &Future{response: make(chan interface{})}
|
|
futureTask := callableTask{Task: task, Handle: handle}
|
|
err := t.submitTask(futureTask)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return futureTask.Handle, nil
|
|
}
|
|
|
|
// dispatch listens to the jobqueue and handles the jobs to the workers
|
|
func (t *ThreadPool) dispatch() {
|
|
for {
|
|
select {
|
|
case <-t.closeHandle:
|
|
// Close thread threadpool
|
|
return
|
|
}
|
|
}
|
|
}
|