Go concurrency pattern: Pipeline

Silver Pipes (Photo by jiawei cui)

Concurrency and Parallelism

Concurrency vs Parallelism. (Image by Natasha Mathur)

Pipeline

Implementation in Go

func multiplyTwo(v int) int {    time.Sleep(1 * time.Second)    return v * 2}
func square(v int) int {
time.Sleep(2 * time.Second) return v * v}func addQuoute(v int) string { time.Sleep(1 * time.Second) return fmt.Sprintf("'%d'", v)}
func addFoo(v string) string { time.Sleep(2 * time.Second) return fmt.Sprintf("%s - Foo", v)}
func BenchmarkWithoutPipelineModule(b *testing.B) {    for i := 0; i < b.N; i++ {        addFoo(addQuoute(square(multiplyTwo(i))))    }}
Benchmark result without Goroutine
type Executor func(interface{}) (interface{}, error)type Pipeline interface {    Pipe(executor Executor) Pipeline    Merge() <-chan interface{}}
type pipeline struct {   dataC     chan interface{}   errC      chan error   executors []Executor} func New(f func(chan interface{})) Pipeline {    inC := make(chan interface{})    go f(inC)    return &pipeline{        dataC:     inC,        errC:      make(chan error),         executors: []Executor{},    }}
func (p *pipeline) Pipe(executor Executor) Pipeline {    p.executors = append(p.executors, executor)    return p}
func (p *pipeline) Merge() <-chan interface{} { for i := 0; i < len(p.executors); i++ { p.dataC, p.errC = run(p.dataC, p.executors[i]) } return p.dataC}func run(
inC <-chan interface{},
f Executor
) (chan interface{}, chan error) {
outC := make(chan interface{}) errC := make(chan error)
go func() {
defer close(outC) for v := range inC { res, err := f(v) if err != nil { errC <- err continue
}
outC <- res } }()
return outC, errC}
func benchmarkWithPipelineModule(b *testing.B) {    outC := New(func(inC chan interface{}) {
defer close(inC)
for i := 0; i < b.N; i++ {
inC <- i
} }). Pipe(func(in interface{}) (interface{}, error) { return multiplyTwo(in.(int)), nil
}).
Pipe(func(in interface{}) (interface{}, error) { return square(in.(int)), nil }). Pipe(func(in interface{}) (interface{}, error) { return addQuoute(in.(int)), nil }). Pipe(func(in interface{}) (interface{}, error) { return addFoo(in.(string)), nil }). Merge() for result := range outC { // Do nothing, just for drain out from channels }
}
Benchmark result with Goroutine using pipeline module

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store