Go concurrency pattern: Worker Pool

Photo by Mike Chai

In Go programming language, we can achieve concurrency by using goroutine. Goroutine is one of the most basic units Go programming language. When we create go program, it has at least one goroutine, we called it the main goroutine, it is automatically created and started when the main process begin.

By combining Go’s concurrency primitives (goroutine and channel) and built-in sync package, we can build a pattern to help us writing maintainable concurrent Go code. There are many patterns can be implemented using Go, for example Fan-in Fan-out, Generator, Job Queue, Pipeline, Semaphore, Worker Pool etc. Each pattern has it own use case, we use Pipeline to execute our independent functions so we can process the results in a stream-like fashion. And we use Semaphore to control access to a resource by multiple processes.

On this article, we’ll talk about Worker Pool. Wait, what is Worker Pool? Worker Pool (aka Thread Pool) is a pattern to achieve concurrency using fixed number of workers to execute multiple amount of tasks on a queue. In Go ecosystem, we use goroutines to spawn the worker and implement the queue using channel. The defined amount of workers will pull the task from queue, and finish up the task, and when the task has been done, the worker will keep pulling the new one until the queue is empty.

Worker Pool illustration

Yeah, I know what is Worker Pool, but why we need Worker Pool on my code? What is the difference when I just spawn new goroutine if I need the process to run concurrently?

You don’t have unlimited resource on your machine, the minimal size of a goroutine object is 2 KB, when you spawn too many goroutine, your machine will quickly run out of memory and the CPU will keep processing the task until it reach the limit. By using limited pool of workers and keep the task on the queue, we can reduce the burst of CPU and memory since the task will wait on the queue until the the worker pull the task.

Ok, let’s get our hand dirty by creating our Worker Pool on Go. First we will define an interface as a contract for our Worker Pool and its related struct and type.

Our Worker Pool implementation will has two methods:

Run()

Run will dispatch the Worker Pool, we need to execute this method before adding task to the Worker Pool.

AddTask(task func())

We need to call this method for adding task (function to be processed) to Worker Pool.

And for the final step, let’s implement the WorkerPool interface.

Our workerPool struct will hold the max total worker and queued task channel.

We spawn the goroutine based on maxWorker property, and then we pull the task from queuedTaskC channel, and process it. As long as we don’t call AddTask, the Worker Pool just spawn the goroutine. It will execute the process after we add the task to queuedTaskC.

The AddTask method will push given task to queuedTaskC channel. The implementation has been finished. Let’s see out Worker Pool implementation by mocking heavy process using built in time.Sleep function.

We create 5 tasks and use 3 workers on the Worker Pool, with 5 seconds sleep inside the implementation, and here is the result:

Output executing 5 tasks using 3 workers

All tasks has been completed on 10 seconds, since we use 3 workers, the task run concurrently on 3 goroutines, the process will block until the previous task has been finished. The output said we spawn 5 goroutines, which is correct, since one goroutine is used by Go main process, and the other one is used on our example code for monitoring the process.

Visualization executing 5 tasks using 3 workers

Let’s change increase our workers by setting totalWorker variable to 5 and re-run the program.

Output executing 5 tasks using 3 workers

When we change our workers to 5, the tasks executed on 5 goroutines (the log said we spawn 7 goroutines including main and monitoring), and the process has been completed on 5 seconds, it faster than previous one.

Visualization executing 5 tasks using 5 workers

Ok, so what is the difference between using the goroutine directly and this Worker Pool implementation?
Now, let’s increase our task to 100 by setting the totalTask on our example program. Here is some part of the output:

Output executing 100 tasks using 5 workers

Yep, by using this Worker Pool implementation we still use 5 goroutines to process 100 tasks. In contrast by using goroutine directly for each task, the runtime will spawn 100 goroutines for our process. But, since the goroutines is are limited to 5, the task will take more time to complete.

At this point we already implemented simple Worker Pool on Go by utilizing channel as queue and goroutine as worker. You can see full source code for this article on this repository.

Thanks.

Reference:

  1. https://en.wikipedia.org/wiki/Thread_pool
  2. https://brandur.org/go-worker-pool
  3. https://hackernoon.com/concurrency-in-golang-and-workerpool-part-1-e9n31ao
  4. https://www.oreilly.com/library/view/concurrency-in-go/9781491941294/

A lifelong learner