本文是Go Concurrency Patterns: Pipelines and cancellation的读书笔记:
(1)Pipeline
定义:
What is a pipeline?
There’s no formal definition of a pipeline in Go; it’s just one of many kinds of concurrent programs. Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines
receive values from upstream via inbound channels
perform some function on that data, usually producing new values
send values downstream via outbound channelsEach stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage, the sink or consumer.
在Go
并发编程中,pipeline
由一系列stage
组成,而每个stage
则由一组执行相同功能的goroutine
组成,各个stage
之间通过channel
进行通信。goroutine
从inbound channels
从上游stage
接收消息,经过一番处理后,利用outbound channels
向下游stage
发送消息。每个stage
可以包含若干个inbound
和outbound channels
,但是第一个stage
只能有一个outbound channel
,最后一个stage
只能有一个inbound channel
。
(2)一个基本的pipeline
例子:
package main
import "fmt"
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
执行结果:
4
9
gen()
是第一个stage
并产生一个outbound channel
。sq()
是第二个stage
,从gen()
的outbound channel
接受输入,并产生一个新的outbound channel
,把每个数的平方发送到这个outbound channel
中。main()
是最后一个stage
,接收sq()
的outbound channel
的输入,并把结果打印出来。
(3)fan-out
和fan-in
Fan-out
和fan-in
的定义:
Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.
A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.
多个函数从同一个channel
读取叫fan-out
;一个函数从多个channel
读取并把处理结果发送到一个channel
中,称之为fan-in
。举例如下:
package main
import (
"fmt"
"sync"
)
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
以下代码称之为fan-out
:
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
而merge
函数的工作即为fan-in
。
(3)通知上游stage
取消发送操作。例子如下:
package main
import (
"fmt"
"sync"
)
func gen(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <- done:
return
}
}
}()
return out
}
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
}
构建一个done channel
,调用close
函数关闭done channel
,用来通知上游stage
停止发送。
(4)Pipeline
构建的原则:
Here are the guidelines for pipeline construction:
stages close their outbound channels when all the send operations are done.
stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.Pipelines unblock senders either by ensuring there’s enough buffer for all the values that are sent or by explicitly signalling senders when the receiver may abandon the channel.
本文是Go Concurrency Patterns: Pipelines and cancellation的读书笔记:
(1)Pipeline
定义:
What is a pipeline?
There’s no formal definition of a pipeline in Go; it’s just one of many kinds of concurrent programs. Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines
receive values from upstream via inbound channels
perform some function on that data, usually producing new values
send values downstream via outbound channelsEach stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage, the sink or consumer.
在Go
并发编程中,pipeline
由一系列stage
组成,而每个stage
则由一组执行相同功能的goroutine
组成,各个stage
之间通过channel
进行通信。goroutine
从inbound channels
从上游stage
接收消息,经过一番处理后,利用outbound channels
向下游stage
发送消息。每个stage
可以包含若干个inbound
和outbound channels
,但是第一个stage
只能有一个outbound channel
,最后一个stage
只能有一个inbound channel
。
(2)一个基本的pipeline
例子:
package main
import "fmt"
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
执行结果:
4
9
gen()
是第一个stage
并产生一个outbound channel
。sq()
是第二个stage
,从gen()
的outbound channel
接受输入,并产生一个新的outbound channel
,把每个数的平方发送到这个outbound channel
中。main()
是最后一个stage
,接收sq()
的outbound channel
的输入,并把结果打印出来。
(3)fan-out
和fan-in
Fan-out
和fan-in
的定义:
Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.
A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.
多个函数从同一个channel
读取叫fan-out
;一个函数从多个channel
读取并把处理结果发送到一个channel
中,称之为fan-in
。举例如下:
package main
import (
"fmt"
"sync"
)
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
以下代码称之为fan-out
:
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
而merge
函数的工作即为fan-in
。
(3)通知上游stage
取消发送操作。例子如下:
package main
import (
"fmt"
"sync"
)
func gen(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <- done:
return
}
}
}()
return out
}
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
}
构建一个done channel
,调用close
函数关闭done channel
,用来通知上游stage
停止发送。
(4)Pipeline
构建的原则:
Here are the guidelines for pipeline construction:
stages close their outbound channels when all the send operations are done.
stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.Pipelines unblock senders either by ensuring there’s enough buffer for all the values that are sent or by explicitly signalling senders when the receiver may abandon the channel.