Go concurrency pattern: Pipeline
“Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.” — Rob Pike
Concurrency and Parallelism
What is the meaning of concurrency in programming?
Concurrency is the ability of a program for running multiple tasks simultaneously.
And what is parallelism ?
Parallelism is about doing multiple tasks at once.
In Golang, we can achieve concurrency easily by using goroutine. We just need to add “go” prefix before executing a function, and the function will be run on separated goroutine.
Pipeline
Pipeline is a pattern where we breaking down complicated task to smaller sub tasks, and the output of first sub task, will be input for next sub task, we will repeat this process until all sub tasks has been completed.
Collection Pipeline by Martin Fowler:
Collection pipelines are a programming pattern where you organize some computation as a sequence of operations which compose by taking a collection as output of one operation and feeding it into the next.
Implementation in Go
Let’s assume we have some asynchrounous task for multiplying an integer, square it, add quote to value, and add “Foo”. We can separate the task to smaller sub tasks:
func multiplyTwo(v int) int { time.Sleep(1 * time.Second) return v * 2}
func square(v int) int { time.Sleep(2 * time.Second) return v * v}func addQuoute(v int) string { time.Sleep(1 * time.Second) return fmt.Sprintf("'%d'", v)}
func addFoo(v string) string { time.Sleep(2 * time.Second) return fmt.Sprintf("%s - Foo", v)}
And then we execute all subtasks as one pipeline:
func BenchmarkWithoutPipelineModule(b *testing.B) { for i := 0; i < b.N; i++ { addFoo(addQuoute(square(multiplyTwo(i)))) }}
Here is the result:
Nothing wrong in this task, but can we improve the execution time?
Sure, we can run this task concurrently by using dedicated go routine for each sub task. And let’s “abuse” golang interface{} type to make general pipeline module.
First, let’s define Pipeline interface for our module:
type Executor func(interface{}) (interface{}, error)type Pipeline interface { Pipe(executor Executor) Pipeline Merge() <-chan interface{}}
By using this interface, we can chain Pipe function until all sub tasks have been added, and then execute Merge function to run all sub tasks and return the output through Go Channels. And here is the implementation:
type pipeline struct { dataC chan interface{} errC chan error executors []Executor} func New(f func(chan interface{})) Pipeline { inC := make(chan interface{}) go f(inC) return &pipeline{ dataC: inC, errC: make(chan error), executors: []Executor{}, }}
We create pipeline struct to hold data Channels, error Channelsand array of Executor. Executor just an alias for function, which will be executed on each pipeline. And here is the rest of Pipeline implementation:
func (p *pipeline) Pipe(executor Executor) Pipeline { p.executors = append(p.executors, executor) return p}
func (p *pipeline) Merge() <-chan interface{} { for i := 0; i < len(p.executors); i++ { p.dataC, p.errC = run(p.dataC, p.executors[i]) } return p.dataC}func run(
inC <-chan interface{},
f Executor
) (chan interface{}, chan error) { outC := make(chan interface{}) errC := make(chan error)
go func() { defer close(outC) for v := range inC { res, err := f(v) if err != nil { errC <- err continue
} outC <- res } }()
return outC, errC}
On Pipe implementation, we just add Executor to our array of executors. And on Merge implementation, we run executor one by one, and use the result as an input for the next executor. We use type of chan interface{} to exchange data between executor, and chan error for handling error for each pipeline.
And we can use our implementation of pipeline like this:
func benchmarkWithPipelineModule(b *testing.B) { outC := New(func(inC chan interface{}) {
defer close(inC)
for i := 0; i < b.N; i++ {
inC <- i } }). Pipe(func(in interface{}) (interface{}, error) { return multiplyTwo(in.(int)), nil
}). Pipe(func(in interface{}) (interface{}, error) { return square(in.(int)), nil }). Pipe(func(in interface{}) (interface{}, error) { return addQuoute(in.(int)), nil }). Pipe(func(in interface{}) (interface{}, error) { return addFoo(in.(string)), nil }). Merge() for result := range outC { // Do nothing, just for drain out from channels }
}
And here is the benchmark result for using pipeline with Goroutine:
From benchmark result, we can conclude that using pipeline with Go concurrency is faster than running pipeline sequentially. And you can visit this link to see full source code for this article.
Thanks for reading.