Fan-out/Fan-in模式
import (
"fmt"
"sync"
)
// 1 Fan-out: 产生多个任务并分给多个worker
func produce(nums []int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 2 worker 处理单个任务
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 3 Fan-in: 将多个worker的结果合并到一个channel
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
//为每个channel启用一个监控协程
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
//有始有终,等待执行完毕后关闭输出channel
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Produce -> Fan-out (多个 square) -> Fan-in (merge)
in := produce([]int{1, 2, 3, 4, 5})
// 启动多个worker处理任务
c1 := square(in)
c2 := square(in)
c3 := square(in)
// 合并结果
for res := range merge(c1, c2, c3) {
fmt.Println("Result", res)
}
}