Golang 手写一个 Channel

2026/5/29 21:16:53Henry2 阅读0 点赞0 评论

对于 Golang 来说,其实永远绕不过这两个东西,一个是 GMP 模型,这个已经倒背如流了,相信每一个 Gopher 都已经深挖过 GMP,这在我的内部 Wiki 中也有非常详细的源码解读,此外就是 Channel。至于内存管理,如 GC 等,其实和语言关联性不强,算是比较通用的解决方案,在 Golang 中我觉得没必要深究。

GMP 和 Channel 算是组成了 Golang 的核心特性:高并发能力 + CSP

Do not communicate by sharing memory; instead, share memory by communicating.

sync.Cond

一个条件变量,用于让 goroutine 在某个条件不满足时睡眠,等条件满足后再被唤醒。

它通常和 sync.Mutex 配合使用:cond := *sync.NewCond(&mutex)

  • Wait 会原子地释放锁并挂起当前 goroutine,等被 SignalBroadcast 唤醒后,会在返回前重新加锁;
  • Broadcast 会唤醒所有等待者,Signal 只唤醒一个等待者。
GO
mu.Lock()
for conditions {
	cond.Wait()
}
// ...
mu.Unlock()

注意,一般要用 for,不要用 if

因为被唤醒后,只代表条件可能变了,不代表条件一定满足。所以醒来以后必须重新检查条件。Go 官方文档也建议把 Wait 放在循环里检查条件。

方法 作用
Wait() 当前 goroutine 进入等待,并临时释放锁
Signal() 唤醒一个正在等待的 goroutine
Broadcast() 唤醒所有正在等待的 goroutine

例如:

GO
cond.Wait()

内部大概做了如下几件事:

  1. 释放锁
  2. 当前 goroutine 睡眠
  3. 被 Signal 或 Broadcast 唤醒
  4. 重新抢锁
  5. Wait 返回

所以调用 Wait() 前,必须已经持有锁:

GO
mu.Lock()
cond.Wait()
mu.Unlock()

实现 Channel

Model

GO
// unbufferedSlot 无缓冲 channel
type unbufferedSlot[T any] struct {
	// 泛型
	value T
	// 用于记录是否被消费,发送者需要等待无缓冲 channel 被完全消费
	consumed bool
}

// MyChan 自实现 channel
type MyChan[T any] struct {
	// mu 用于并发控制
	mu sync.Mutex

	// sync.Cond 用于阻塞 goroutine
	cond sync.Cond

	// capacity 表示缓冲区大小
	capacity int

	// buf 用于有缓冲的 channel
	buf []T

	// closed 记录 channel 是否已关闭
	closed bool

	// slot 用于无缓冲的 channel
	slot *unbufferedSlot[T]
}

Common

GO
// NewMyChan 新建一个 channel
func NewMyChan[T any](capacity int) *MyChan[T] {
	if capacity <= 0 {
		capacity = 0
	}

	ch := &MyChan[T]{
		capacity: capacity,
	}

	// NewCond 参数为 Locker Interface
	// 而 Mutex 在指针类型上实现了 Locker,所以这里需要 &ch.mu
	ch.cond = *sync.NewCond(&ch.mu)

	return ch
}

// Close 关闭 Channel
func(ch *MyChan[T]) Close() {
	ch.mu.Lock()
	defer ch.mu.Unlock()

	// 和原版 Channel 保持一致
	if ch.closed {
		panic("close of closed channel")
	}

	ch.closed = true

	// 无缓冲 Channel 如果还有值等待接收
	// 则唤醒发送者并抛出 panic
	if ch.capacity == 0 && ch.slot != nil && !ch.slot.consumed {
		ch.slot = nil
	}

	// 唤醒阻塞中的发送者/接收者
	// 发送者醒来后发现 closed=true,然后 panic
	// 接收者醒来后如果没有数据,就返回零值和 false
	ch.cond.Broadcast()
}

这里需要注意的是 sync.NewCond 接受已实现 Locker 接口类型的参数。

查看源码可知 sync.Mutex 中,Lock/Unlock 方法使用的是指针接收器,所以 *sync.Mutex 才算实现了 Locker 接口。

方法 T 是否实现接口 *T 是否实现接口
func (t T) M()
func (t *T) M()

此外,Golang 不建议在同个类型的方法实现中同时使用值接收器和指针接收器。

一个类型只要有任何方法需要用指针接收器,通常这个类型的其他方法也都建议用指针接收器。

这样可以避免接口方法集不一致,也能避免复制带来的语义和并发问题。

Send

GO
// Send 向 Channel 发送数据
func (ch *MyChan[T]) Send(val T) {
	ch.mu.Lock()
	defer ch.mu.Unlock()

	// 首先判断 Channel 是否有缓冲
	if ch.capacity == 0 {
		ch.sendUnbuffered(val)
		return
	}

	// 有缓冲
	// 首先要判断缓冲区是否还有空间,没有空间则阻塞等待
	for len(ch.buf) >= ch.capacity && !ch.closed {
		ch.cond.Wait()
	}

	// buffer 有空间了,但是 channel 被关闭了
	// 和原版保持行为一致 panic
	if ch.closed {
		panic("send on closed channel")
	}

	ch.buf = append(ch.buf, val)

	// 现在 buffer 缓冲区有数据了,应该唤醒可能阻塞的接收方 goroutines
	ch.cond.Broadcast()
}

// sendUnbuffered 发送到无缓冲 Channel
func (ch *MyChan[T]) sendUnbuffered(val T) {
	// 由于是无缓冲的,所有如果有数据待接收
	// 将会阻塞发送方
	for ch.slot != nil && !ch.closed {
		ch.cond.Wait()
	}

	// 和原版 channel 一样,如果向一个 closed channel 发送
	// 直接抛出 panic
	if ch.closed {
		panic("send on closed channel")
	}

	s := &unbufferedSlot[T]{
		value: val,
		consumed: false,
	}
	ch.slot = s

	// 现在 channel 有数据了,唤醒等待的接收方 goroutine
	ch.cond.Broadcast()

	// 等待值被接收,否则阻塞
	for !s.consumed && !ch.closed {
		ch.cond.Wait()
	}

	// 判断是否因关闭而跳出阻塞
	if !s.consumed && ch.closed {
		panic("send on closed channel")
	}
}

Receive

GO
func(ch *MyChan[T]) Recv() (T, bool) {
	ch.mu.Lock()
	defer ch.mu.Unlock()

	if ch.capacity == 0 {
		return ch.recvUnbuffered()
	}

	// 有缓冲
	// buffer 为空且 chan 没有被关闭,阻塞等待
	for len(ch.buf) == 0 != !ch.closed {
		ch.cond.Wait()
	}

	// 特殊处理 closed channel
	var zeroVal T
	if len(ch.buf) == 0 && ch.closed {
		return zeroVal, false
	}

	// 有数据,按照 FIFO 出队
	val := ch.buf[0]

	// 截断掉第一个已经出队的元素
	ch.buf[0] = zeroVal
	ch.buf = ch.buf[1:]

	// 出队后,buffer 可能有空余空间
	// 唤醒阻塞的发送方
	ch.cond.Broadcast()

	return val, true
}

// recvUnbuffered 从无缓冲的 Channel 接收值
func(ch *MyChan[T]) recvUnbuffered() (T, bool) {
	// 等待接收数据
	for ch.slot == nil && !ch.closed {
		ch.cond.Wait()
	}

	// 如果没有数据,且已被关闭,那么返回零值和 ok: false
	var zeroVal T
	if ch.slot == nil && ch.closed {
		return zeroVal, false
	}

	// slot 不为空
	s := ch.slot
	val := s.value

	s.value = zeroVal
	s.consumed = true

	// 接收完之后注意清理掉 slot
	ch.slot = nil

	// 值已经接收完成,唤醒可能被阻塞的发送方 goroutines
	ch.cond.Broadcast()

	// 注意,如果 channel closed 但 buffer 还有数据
	// ok 依然为 true,直到 buffer 数据接收完毕,返回零值和 false
	return val, true
}

YuelaiGroup | 字节星球 2026-05-29

评论区