Golang 手写一个 Channel
对于 Golang 来说,其实永远绕不过这两个东西,一个是 GMP 模型,这个已经倒背如流了,相信每一个 Gopher 都已经深挖过 GMP,这在我的内部 Wiki 中也有非常详细的源码解读,此外就是 Channel。至于内存管理,如 GC 等,其实和语言关联性不强,算是比较通用的解决方案,在 Golang 中我觉得没必要深究。
GMP 和 Channel 算是组成了 Golang 的核心特性:高并发能力 + CSP
Do not communicate by sharing memory; instead, share memory by communicating.
sync.Cond
一个条件变量,用于让 goroutine 在某个条件不满足时睡眠,等条件满足后再被唤醒。
它通常和 sync.Mutex 配合使用:cond := *sync.NewCond(&mutex)
Wait会原子地释放锁并挂起当前 goroutine,等被Signal或Broadcast唤醒后,会在返回前重新加锁;Broadcast会唤醒所有等待者,Signal只唤醒一个等待者。
mu.Lock()
for conditions {
cond.Wait()
}
// ...
mu.Unlock()
注意,一般要用 for,不要用 if。
因为被唤醒后,只代表条件可能变了,不代表条件一定满足。所以醒来以后必须重新检查条件。Go 官方文档也建议把 Wait 放在循环里检查条件。
| 方法 | 作用 |
|---|---|
Wait() |
当前 goroutine 进入等待,并临时释放锁 |
Signal() |
唤醒一个正在等待的 goroutine |
Broadcast() |
唤醒所有正在等待的 goroutine |
例如:
cond.Wait()
内部大概做了如下几件事:
- 释放锁
- 当前 goroutine 睡眠
- 被 Signal 或 Broadcast 唤醒
- 重新抢锁
- Wait 返回
所以调用 Wait() 前,必须已经持有锁:
mu.Lock()
cond.Wait()
mu.Unlock()
实现 Channel
Model
// unbufferedSlot 无缓冲 channel
type unbufferedSlot[T any] struct {
// 泛型
value T
// 用于记录是否被消费,发送者需要等待无缓冲 channel 被完全消费
consumed bool
}
// MyChan 自实现 channel
type MyChan[T any] struct {
// mu 用于并发控制
mu sync.Mutex
// sync.Cond 用于阻塞 goroutine
cond sync.Cond
// capacity 表示缓冲区大小
capacity int
// buf 用于有缓冲的 channel
buf []T
// closed 记录 channel 是否已关闭
closed bool
// slot 用于无缓冲的 channel
slot *unbufferedSlot[T]
}
Common
// NewMyChan 新建一个 channel
func NewMyChan[T any](capacity int) *MyChan[T] {
if capacity <= 0 {
capacity = 0
}
ch := &MyChan[T]{
capacity: capacity,
}
// NewCond 参数为 Locker Interface
// 而 Mutex 在指针类型上实现了 Locker,所以这里需要 &ch.mu
ch.cond = *sync.NewCond(&ch.mu)
return ch
}
// Close 关闭 Channel
func(ch *MyChan[T]) Close() {
ch.mu.Lock()
defer ch.mu.Unlock()
// 和原版 Channel 保持一致
if ch.closed {
panic("close of closed channel")
}
ch.closed = true
// 无缓冲 Channel 如果还有值等待接收
// 则唤醒发送者并抛出 panic
if ch.capacity == 0 && ch.slot != nil && !ch.slot.consumed {
ch.slot = nil
}
// 唤醒阻塞中的发送者/接收者
// 发送者醒来后发现 closed=true,然后 panic
// 接收者醒来后如果没有数据,就返回零值和 false
ch.cond.Broadcast()
}
这里需要注意的是 sync.NewCond 接受已实现 Locker 接口类型的参数。
查看源码可知 sync.Mutex 中,Lock/Unlock 方法使用的是指针接收器,所以 *sync.Mutex 才算实现了 Locker 接口。
| 方法 | T 是否实现接口 |
*T 是否实现接口 |
|---|---|---|
func (t T) M() |
是 | 是 |
func (t *T) M() |
否 | 是 |
此外,Golang 不建议在同个类型的方法实现中同时使用值接收器和指针接收器。
一个类型只要有任何方法需要用指针接收器,通常这个类型的其他方法也都建议用指针接收器。
这样可以避免接口方法集不一致,也能避免复制带来的语义和并发问题。
Send
// Send 向 Channel 发送数据
func (ch *MyChan[T]) Send(val T) {
ch.mu.Lock()
defer ch.mu.Unlock()
// 首先判断 Channel 是否有缓冲
if ch.capacity == 0 {
ch.sendUnbuffered(val)
return
}
// 有缓冲
// 首先要判断缓冲区是否还有空间,没有空间则阻塞等待
for len(ch.buf) >= ch.capacity && !ch.closed {
ch.cond.Wait()
}
// buffer 有空间了,但是 channel 被关闭了
// 和原版保持行为一致 panic
if ch.closed {
panic("send on closed channel")
}
ch.buf = append(ch.buf, val)
// 现在 buffer 缓冲区有数据了,应该唤醒可能阻塞的接收方 goroutines
ch.cond.Broadcast()
}
// sendUnbuffered 发送到无缓冲 Channel
func (ch *MyChan[T]) sendUnbuffered(val T) {
// 由于是无缓冲的,所有如果有数据待接收
// 将会阻塞发送方
for ch.slot != nil && !ch.closed {
ch.cond.Wait()
}
// 和原版 channel 一样,如果向一个 closed channel 发送
// 直接抛出 panic
if ch.closed {
panic("send on closed channel")
}
s := &unbufferedSlot[T]{
value: val,
consumed: false,
}
ch.slot = s
// 现在 channel 有数据了,唤醒等待的接收方 goroutine
ch.cond.Broadcast()
// 等待值被接收,否则阻塞
for !s.consumed && !ch.closed {
ch.cond.Wait()
}
// 判断是否因关闭而跳出阻塞
if !s.consumed && ch.closed {
panic("send on closed channel")
}
}
Receive
func(ch *MyChan[T]) Recv() (T, bool) {
ch.mu.Lock()
defer ch.mu.Unlock()
if ch.capacity == 0 {
return ch.recvUnbuffered()
}
// 有缓冲
// buffer 为空且 chan 没有被关闭,阻塞等待
for len(ch.buf) == 0 != !ch.closed {
ch.cond.Wait()
}
// 特殊处理 closed channel
var zeroVal T
if len(ch.buf) == 0 && ch.closed {
return zeroVal, false
}
// 有数据,按照 FIFO 出队
val := ch.buf[0]
// 截断掉第一个已经出队的元素
ch.buf[0] = zeroVal
ch.buf = ch.buf[1:]
// 出队后,buffer 可能有空余空间
// 唤醒阻塞的发送方
ch.cond.Broadcast()
return val, true
}
// recvUnbuffered 从无缓冲的 Channel 接收值
func(ch *MyChan[T]) recvUnbuffered() (T, bool) {
// 等待接收数据
for ch.slot == nil && !ch.closed {
ch.cond.Wait()
}
// 如果没有数据,且已被关闭,那么返回零值和 ok: false
var zeroVal T
if ch.slot == nil && ch.closed {
return zeroVal, false
}
// slot 不为空
s := ch.slot
val := s.value
s.value = zeroVal
s.consumed = true
// 接收完之后注意清理掉 slot
ch.slot = nil
// 值已经接收完成,唤醒可能被阻塞的发送方 goroutines
ch.cond.Broadcast()
// 注意,如果 channel closed 但 buffer 还有数据
// ok 依然为 true,直到 buffer 数据接收完毕,返回零值和 false
return val, true
}
YuelaiGroup | 字节星球 2026-05-29
评论区