package threadpool import ( "fmt" "sync" ) var ( ErrQueueFull = fmt.Errorf("queue is full, not able add the task") ErrNoWorkers = fmt.Errorf("worker pool is empty") ) type ThreadPool struct { workersTopLimit int workerPool chan chan interface{} closeHandle chan bool wgReceivers sync.WaitGroup } func NewThreadPool(workersLimit int) *ThreadPool { threadPool := &ThreadPool{workersTopLimit: workersLimit} threadPool.workerPool = make(chan chan interface{}, workersLimit) threadPool.closeHandle = make(chan bool) threadPool.wgReceivers = sync.WaitGroup{} threadPool.wgReceivers.Add(workersLimit) threadPool.createPool() return threadPool } func (t *ThreadPool) Close() { close(t.closeHandle) // Stops all the routines t.wgReceivers.Wait() close(t.workerPool) // Closes the Job threadpool } func (t *ThreadPool) createPool() { for i := 0; i < t.workersTopLimit; i++ { worker := NewWorker(t.workerPool, t.closeHandle, &t.wgReceivers) worker.Start() } go t.dispatch() } func (t *ThreadPool) submitTask(task interface{}) error { // Add the task to the job queue //Find a worker for the job if t.workerPool == nil || t.workersTopLimit == 0 { return ErrNoWorkers } jobChannel := <-t.workerPool //Submit job to the worker jobChannel <- task return nil } // Execute submits the job to available worker func (t *ThreadPool) Execute(task Runnable) error { return t.submitTask(task) } // ExecuteFuture will submit the task to the threadpool and return the response handle func (t *ThreadPool) ExecuteFuture(task Callable) (*Future, error) { // Create future and task if t.workerPool == nil || t.workersTopLimit == 0 { return nil, ErrNoWorkers } handle := &Future{response: make(chan interface{})} futureTask := callableTask{Task: task, Handle: handle} err := t.submitTask(futureTask) if err != nil { return nil, err } return futureTask.Handle, nil } // dispatch listens to the jobqueue and handles the jobs to the workers func (t *ThreadPool) dispatch() { for { select { case <-t.closeHandle: // Close thread threadpool return } } }