Go concurrency pattern: Pipeline

Syafdia Okta
4 min readJun 24, 2019

“Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.” — Rob Pike

Silver Pipes (Photo by jiawei cui)

Concurrency and Parallelism

What is the meaning of concurrency in programming?

Concurrency is the ability of a program for running multiple tasks simultaneously.

And what is parallelism ?

Parallelism is about doing multiple tasks at once.

Concurrency vs Parallelism. (Image by Natasha Mathur)

In Golang, we can achieve concurrency easily by using goroutine. We just need to add “go” prefix before executing a function, and the function will be run on separated goroutine.

Pipeline

Pipeline is a pattern where we breaking down complicated task to smaller sub tasks, and the output of first sub task, will be input for next sub task, we will repeat this process until all sub tasks has been completed.

Collection Pipeline by Martin Fowler:

Collection pipelines are a programming pattern where you organize some computation as a sequence of operations which compose by taking a collection as output of one operation and feeding it into the next.

Implementation in Go

Let’s assume we have some asynchrounous task for multiplying an integer, square it, add quote to value, and add “Foo”. We can separate the task to smaller sub tasks:

func multiplyTwo(v int) int {    time.Sleep(1 * time.Second)    return v * 2}
func square(v int) int {
time.Sleep(2 * time.Second) return v * v}func addQuoute(v int) string { time.Sleep(1 * time.Second) return fmt.Sprintf("'%d'", v)}
func addFoo(v string) string { time.Sleep(2 * time.Second) return fmt.Sprintf("%s - Foo", v)}

And then we execute all subtasks as one pipeline:

func BenchmarkWithoutPipelineModule(b *testing.B) {    for i := 0; i < b.N; i++ {        addFoo(addQuoute(square(multiplyTwo(i))))    }}

Here is the result:

Benchmark result without Goroutine

Nothing wrong in this task, but can we improve the execution time?

Sure, we can run this task concurrently by using dedicated go routine for each sub task. And let’s “abuse” golang interface{} type to make general pipeline module.

First, let’s define Pipeline interface for our module:

type Executor func(interface{}) (interface{}, error)type Pipeline interface {    Pipe(executor Executor) Pipeline    Merge() <-chan interface{}}

By using this interface, we can chain Pipe function until all sub tasks have been added, and then execute Merge function to run all sub tasks and return the output through Go Channels. And here is the implementation:

type pipeline struct {   dataC     chan interface{}   errC      chan error   executors []Executor} func New(f func(chan interface{})) Pipeline {    inC := make(chan interface{})    go f(inC)    return &pipeline{        dataC:     inC,        errC:      make(chan error),         executors: []Executor{},    }}

We create pipeline struct to hold data Channels, error Channelsand array of Executor. Executor just an alias for function, which will be executed on each pipeline. And here is the rest of Pipeline implementation:

func (p *pipeline) Pipe(executor Executor) Pipeline {    p.executors = append(p.executors, executor)    return p}
func (p *pipeline) Merge() <-chan interface{} { for i := 0; i < len(p.executors); i++ { p.dataC, p.errC = run(p.dataC, p.executors[i]) } return p.dataC}func run(
inC <-chan interface{},
f Executor
) (chan interface{}, chan error) {
outC := make(chan interface{}) errC := make(chan error)
go func() {
defer close(outC) for v := range inC { res, err := f(v) if err != nil { errC <- err continue
}
outC <- res } }()
return outC, errC}

On Pipe implementation, we just add Executor to our array of executors. And on Merge implementation, we run executor one by one, and use the result as an input for the next executor. We use type of chan interface{} to exchange data between executor, and chan error for handling error for each pipeline.

And we can use our implementation of pipeline like this:

func benchmarkWithPipelineModule(b *testing.B) {    outC := New(func(inC chan interface{}) {
defer close(inC)
for i := 0; i < b.N; i++ {
inC <- i
} }). Pipe(func(in interface{}) (interface{}, error) { return multiplyTwo(in.(int)), nil
}).
Pipe(func(in interface{}) (interface{}, error) { return square(in.(int)), nil }). Pipe(func(in interface{}) (interface{}, error) { return addQuoute(in.(int)), nil }). Pipe(func(in interface{}) (interface{}, error) { return addFoo(in.(string)), nil }). Merge() for result := range outC { // Do nothing, just for drain out from channels }
}

And here is the benchmark result for using pipeline with Goroutine:

Benchmark result with Goroutine using pipeline module

From benchmark result, we can conclude that using pipeline with Go concurrency is faster than running pipeline sequentially. And you can visit this link to see full source code for this article.

Thanks for reading.

--

--