Go 并发模式 4 Fan-out/Fan-in模式

By kcersing , 17 四月, 2026

Fan-out/Fan-in模式

 

import (
	"fmt"
	"sync"
)

// 1 Fan-out: 产生多个任务并分给多个worker
func produce(nums []int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// 2 worker 处理单个任务
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

// 3 Fan-in: 将多个worker的结果合并到一个channel
func merge(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
	//为每个channel启用一个监控协程
	output := func(c <-chan int) {
		for n := range c {
			out <- n
		}
		wg.Done()
	}
	wg.Add(len(channels))
	for _, c := range channels {
		go output(c)
	}
	//有始有终,等待执行完毕后关闭输出channel
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

func main() {
	// Produce -> Fan-out (多个 square) -> Fan-in (merge)
	in := produce([]int{1, 2, 3, 4, 5})

	// 启动多个worker处理任务
	c1 := square(in)
	c2 := square(in)
	c3 := square(in)

	// 合并结果
	for res := range merge(c1, c2, c3) {
		fmt.Println("Result", res)
	}
}

标签