今天开始学习Go语言的重要特性,Goroutine

goroutine

Goroutine,通常称之为协程,区别于Java的线程,它是由Go runtime调度,启动成本很低,初始栈很小,并且可以按需增长。多个goroutine在同一个进程地址空间内并发执行。

先看看Java的线程:

1
new Thread(() -> {System.out.println("Hello Java");}).start();

在看看看Gogoroutine

1
2
3
go func() {
fmt.Println("Hello Go")
}()

启动普通函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import (
"fmt"
"sync"
"testing"
)

func PrintName(name string) {
fmt.Println(name)
}

func TestGoroutine(t *testing.T) {
// wg用来等待程序完成
var wg sync.WaitGroup
// 计数加1,表示要等待1个goroutine,注意,这里暂时不可以是2,因为其中一个没有执行wa.Done(),后续在解释。
wg.Add(1)
//如果需要 defer wg.Done(),通常会包一层匿名函数。
go func() {
// 在函数退出时,调用Done来通知main函数工作已经完成
defer wg.Done()
PrintName("hello")
}()
// 也可以这样直接调用
go PrintName("Hello Goroutine")

fmt.Println("Waiting To Finish")
wg.Wait()
fmt.Println("Terminating Program")
}

如果你需要 defer wg.Done(),通常会包一层匿名函数。

此时程序会打印如下:

1
2
3
4
5
6
7
=== RUN   TestGoroutine
Waiting To Finish
Hello Goroutine
hello
Terminating Program
--- PASS: TestGoroutine (0.00s)
PASS

启动匿名函数

1
2
3
go func(){
fmt.Println("running in background")
}()

特别注意,后面的()别忘了,这个括号表示立即执行这个匿名函数,只是他会在新的goroutine中执行。

主goroutine退出,程序就结束

这是一个特别需要注意的地方。下面通过一个错误示例来观察:

1
2
3
4
5
6
func TestGoroutine2(t *testing.T) {
go func() {
fmt.Println("hello")
}()
fmt.Println("main done")
}

这个例子,最后打印的结果有三种情况:

1
2
3
4
5
6
7
8
9
10
# 情况一,两个打印语句都执行
main done
hello

# 情况二,只打印 main done
main done

# 情况三,极低概率,会先打印hello
hello
main done

Java的线程类似,谁先执行(或者会不会执行),完全看CPU如何调度(如果是主goroutine先执行,那么程序就会退出,其他的goroutine不一定会执行),这是不可靠的。为了避免这个问题,请使用如下方式:

1
2
3
4
5
6
7
8
9
func TestGoroutine3(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Hello")
}()
wg.Wait()
}

WaitGroup

sync.WaitGroup是日常开发最常用的goroutine等待工具。它本质上是一个计数器:Add增加任务数,Done表示任务完成,Wait阻塞等待计数归零。言简意赅就是用于等待一组goroutine或任务完成得计数号量。

有些像CountDownLatch?

上面的两个例子都已经有过介绍,其主要语法(其实就是模板代码)就是

1
2
3
4
5
6
7
8
9
var wg sync.WaitGroup
// 增加一个任务数
wg.Add(1)
go func(){
// 任务完成后,计数器减少1
defer wg.Done()
}()
// 等计数器等于0,则退出,否则会阻塞。最开始那个例子就是因为第二个没有执行Done,导致计数器不是0
wg.Wait()

这里也有个知识点需要说明,wg.Add(1)需要放在goroutine外面。下面先看要给错误的示例:

1
2
3
4
5
6
7
8
9
10
11
func TestGoroutine4(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
go func() {
wg.Add(1)
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}

这个例子得问题:wg.Wait()可能再wg.Add(1)执行前就开始等待,甚至直接结束。

需要将其修改为如下方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
func TestGoroutine5(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
// 在 go 关键字之前执行 Add,确保主协程去 Wait 时,计数器一定大于 0
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i)
}(i)
}
// 此时计数器是 3,主协程会老老实实在这里死等,直到 3 次 Done 都报到
wg.Wait()
}

使用 sync.WaitGroup 的黄金法则是:必须在启动子协程之前(在主协程里)就把账记好(Add),Done必须在go之中。

上述例子还可以修改为:

1
2
3
4
5
6
7
8
9
10
11
12
func TestGoroutine6(t *testing.T) {
var wg sync.WaitGroup
// 如果确定知道要执行三次,也可以再循环外一次性把计数器设置好
wg.Add(3)
for i := 0; i < 3; i++ {
go func(i int) {
defer wg.Done()
fmt.Println(i)
}(i)
}
wg.Wait()
}

对于面试来说,只需要一句:

Add 必须在启动 goroutine 之前调用,否则主 goroutine 可能先执行到 Wait,造成等待计数不准确,甚至提前返回。

由于我是基于Go 1.26学习,这个版本对WaitGroup.go有所优化,可以直接使用Go方法,它可以直接启动任务并等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
func TestGoroutine7(t *testing.T) {
var wg sync.WaitGroup
// 直接启动任务并等待
wg.Go(func() {
fmt.Println("task 1")
})
// 直接启动任务并等待
wg.Go(func() {
fmt.Println("task 2")
})

wg.Wait()
}

从代码层面来看,使用wg.Go()肯定要优于传统写法得,如果使用传统写法,你需要特别注意:

  • 必须时刻小心Add()得位置,不能写在go func内部
  • 必须保证Add数量与Done的数量绝对相等
  • 任何地方漏写了defer wg.Done(),或者因为panic中途崩了导致wa.Done()没执行上,程序会直接死锁。

传统写法,用于面试或者维护古老项目使用。新项目建议使用wg.Go(func(){})

这里有个小坑:WaitGroup可以复用,但必须确保上一轮彻底结束后在进入下一轮,复杂场景下不如重新定义新的WaitGroup

channel

goroutine不能像普通函数那样直接拿返回值。

1
2
// 不支持如下语法
//result := go calculate()

此时就需要通过channel接受结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
func calculate() int {
return 100
}

func TestGoroutine8(t *testing.T) {
resultChannel := make(chan int)

go func() {
resultChannel <- calculate()
}()
result := <-resultChannel
t.Logf("Result from goroutine: %d", result)
}

或者也可以通过WaitGroup + 共享变量实现:

1
2
3
4
5
6
7
8
9
10
11
12
func TestGoroutine9(t *testing.T) {
var wg sync.WaitGroup
var result int

wg.Add(1)
go func() {
defer wg.Done()
result = 100
}()
wg.Wait()
t.Logf("Result from goroutine: %d", result)
}

虽然也能拿到返回值,但如果有多个goroutine同时写共享变量,就得考虑或者channel

Go 有一句非常经典的思想:不要通过共享内存通信,而要通过通信共享内存。

Go 官方 blog 也强调,Go 鼓励通过 channelgoroutine 之间传递数据引用,避免多个 goroutine 同时直接操作同一份数据。

对于Java程序员来说,channel可以类比为:

  • Blocking Queue
  • 线程间消息传递
  • 生产者-消费者模式

基础发送和接收

1
2
3
4
5
6
7
8
9
10
11
func TestChannel(t *testing.T) {
// 创建一个string类型的channel
ch := make(chan string)
go func() {
// 表示发送
ch <- "hello"
}()
// 表示接收
msg := <-ch
t.Log(msg)
}

无缓冲channel

1
ch := make(chan int)

无缓冲channel发送时,如果没有接收者,发送方会阻塞。,总结其特点就是:

  • 发送方发送时,如果没有接收方准备好,会阻塞
  • 接收方接收时,如果没有发送方准备好,也会阻塞

所以,它不仅能传数据,还能用来做同步。

1
2
3
4
5
6
7
8
9
10
11
func TestChannel2(t *testing.T) {
ch := make(chan int)

go func() {
fmt.Println("sending...")
ch <- 1
fmt.Println("sent")
}()
v := <-ch
t.Logf("received,%d", v)
}

无缓冲channel很像Java里的SynchronousQueue,发送和接收必须“碰头”。

有缓冲channel

1
2
// 有缓冲channel 创建方式
ch := make(chan int,2)

例如:

1
2
3
4
5
6
7
8
9
func TestChannel3(t *testing.T) {
// 创建一个容量为2的缓冲channel
ch := make(chan int, 2)
// 前两次发送不会阻塞,第三次发送如果没人接收,就会阻塞
ch <- 1
ch <- 2
t.Log(<-ch)
t.Log(<-ch)
}

有缓冲channel在缓冲区没满前,发送不会阻塞。相当于JavaBlockingQueue。但Gochannel语义更轻,更适合配合select

1
2
// ch := make(chan int, 2)
new ArrayBlockingQueue<>(2)

关闭channel

关闭channel,表示:以后不会再发送新数据了。

1
close(ch)

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func TestChannel4(t *testing.T) {
ch := make(chan int)

go func() {
defer close(ch)
for i := 0; i < 3; i++ {
ch <- i
}
}()
// 遍历channel
for v := range ch {
t.Log(v)
}
}

for v := range ch : 会一直读取channel,直到channel被关闭。

如果对已经关闭的channel进行读取和写入,会发生啥呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func TestChannel5(t *testing.T) {
ch := make(chan int, 1)
ch <- 100
close(ch)
v1, ok1 := <-ch
// v1 = 100, ok1 = true
t.Logf("v1 = %d, ok1 = %v", v1, ok1)
v2, ok2 := <-ch
// v2 = 0, ok2 = false
t.Logf("v2 = %d, ok2 = %v", v2, ok2)

// panic: send on closed channel [recovered, repanicked]
// ch <- 200
// t.Log(ch)

// panic: close of closed channel [recovered, repanicked]
// close(ch)
}
  • 读取已关闭的channel,代码不会报错,读取完后返回值为channel类型的零值,状态码为false,表示channel已关闭并且没有剩余数据。

    • 如果channel里还有值,先取值,然后ok == true
    • 如果channel已关闭且没有剩余值,则返回零值,ok == false
  • 写入已关闭的channel,会panic

  • 关闭已关闭的channel。会panic

所以Go官方也是明确提醒,向已关闭的channel发送数据会panic,因此通常需要确保所有发送都完成后再关闭channel

那么,到底谁来关闭channel呢?

日常经验:通常有发送方关闭channel,不由接收方关闭。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 单向 Channel
// chan <- int 只发送
func producer(ch chan<- int) {
defer close(ch)

for i := 1; i <= 3; i++ {
ch <- i
}
}
// <- chan int 只接收
func consumer(ch <-chan int) {
for v := range ch {
fmt.Println(v)
}
}

func TestChannel6(t *testing.T) {
ch := make(chan int)
go producer(ch)
consumer(ch)
}

这种方式挺有有,可以让函数签名更清晰,减少误用。

select

select类似Java里同时监听多个阻塞队列,但Go语法更直接

基础用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func TestSelect(t *testing.T) {
ch1 := make(chan string)
ch2 := make(chan string)

go func() {
time.Sleep(1 * time.Second)
ch1 <- "from ch1"
}()

go func() {
time.Sleep(2 * time.Second)
ch2 <- "from ch2"
}()

select {
case msg := <-ch1:
t.Log("received", msg)
case msg := <-ch2:
t.Log("received", msg)
}
}

哪个channel先准备好,就执行哪个case。看着就像是switch

select + timeout

这是日常开发的高频场景

1
2
3
4
5
6
7
8
9
10
11
12
13
func TestSelectTimeout(t *testing.T) {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()
select {
case msg := <-ch:
t.Log("received", msg)
case <-time.After(1 * time.Second):
t.Log("timeout")
}
}

可以类比于Java中的

  • Future.get(timeout,unit)
  • 一些异步框架里的超市等待

select + default 非阻塞操作

1
2
3
4
5
6
7
8
9
10
func TestSelectDefault(t *testing.T) {
ch := make(chan int)

select {
case v := <-ch:
fmt.Println(v)
default:
fmt.Println("default")
}
}
  • 如果没有任何case就绪
  • 立即执行default
  • 不阻塞(如果没有default,在ch没有数据时,会阻塞等待)

如果channel没有准备好,就执行default。注意:default用不好会造成忙轮询,例如下面这个错误的例子:

1
2
3
4
5
6
7
8
for {
select {
case v:= <- ch:
fmt.Println(v)
default:
// 空转,占用CPU
}
}

退出信号

1
2
3
4
5
6
7
8
9
for {
select {
case msg := <- ch:
fmt.Println("msg ",msg)
case <- done:
fmt.Println("exit")
return
}
}

这种方式通常用于:

  • worker停止
  • 后台协程退出
  • 优雅关闭

如果多个case同时可执行的时候,Go会随机选择一个执行,它并不是书写顺序优先,不依赖顺序。

空select

1
select{}

这种一般会永久阻塞,没啥意义。

context

context.Context 用于在API边界之间传递取消信号、超时时间、deadline和请求级别的值,服务端收到请求后应创建Context,向下游调用时继续传递Context

对于Java来说,大概可以这么理解:

1
context = CancellationToken + time + RequestScope

用context取消goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("worker stopped:", ctx.Err())
return
default:
fmt.Println("worker running...")
time.Sleep(500 * time.Millisecond)
}
}
}

func TestContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

go worker(ctx)

time.Sleep(2 * time.Second)
cancel()

time.Sleep(1 * time.Second)
}

打印如下:

1
2
3
4
5
6
7
8
=== RUN   TestContext
worker running...
worker running...
worker running...
worker running...
worker stopped: context canceled
--- PASS: TestContext (3.00s)
PASS

context.WithTimeout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func query(ctx context.Context) error {
select {
case <-time.After(2 * time.Second):
fmt.Println("query done")
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func TestContextWithTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

if err := query(ctx); err != nil {
fmt.Println("query failed:", err)
}

// query failed: context deadline exceeded
}

日常开发里,HTTP 请求、数据库查询、RPC 调用都应该优先支持 context。

Mutex

Mutex实际上就是Go语言中的互斥锁。对于Java来说,它相当于synchronizedReentrantLock的最基础互斥能力。

1
var mu sync.Mutex

虽然Go鼓励使用channel通信,但也不能完全不用锁。sync.Mutexchannel都是重要的同步手段,具体使用哪个看场景:

  • 需要保护共享状态:优先Mutex
  • 需要传递任务/结果/事件: 优先channel

和学习Javasynchronized一样,先来一个没有锁的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func TestMutex(t *testing.T) {
var wg sync.WaitGroup
count := 0

for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
count++
}()
}
wg.Wait()
// 多次运行,每次结果都不同.
fmt.Println("count =", count)
}

出现这个原因其实和Java多线程一样,多个goroutine同时修改count,会产生data race(数据竞争)。

此时就可以使用锁Mutex来修复这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func TestMutex1(t *testing.T) {
var wg sync.WaitGroup
var mu sync.Mutex
count := 0

for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
count++
mu.Unlock()
}()
}
wg.Wait()
// 1000
fmt.Println("count =", count)
}

当然,学习了defer那就有更安全的写法:

1
2
3
mu.Lock()
defer mu.Unlock()
count++

defer释放锁

常见写法

1
2
3
mu.Lock()

defer mu.Unlock()

优点:

  • 避免中途return,导致忘记解锁

缺点:

  • 在极高频短路径里,defer有一点点开销
  • 但绝大多数业务代码优先考虑正确性和可维护性

RWmutex

语法:

1
var mu sync.RWMutex
  • RLock(): 读锁
  • Lock() 写锁

这种类型的锁非常适合读多写少,除此之外,使用Mutex即可。

对于锁,各大语言的坑应该差不多,需要特别注意以下几点:

  1. 忘记解锁:会导致死锁或者后续goroutine永久阻塞。
  2. 重复加锁顺序不一致:多个锁嵌套时,如果不同goroutine获取锁顺序不一致,容易死锁。
  3. 锁住范围过大
1
2
3
mu.Lock()
doSometingSlow()
mu.Unlock()

如果临界区里包含慢操作,会严重影响并发性能,应该尽量:

  • 缩小加锁范围
  • 只保护共享数据本身

Once

sync.Once用来保证某段代码只执行一次。

1
var once sync.Once

下面是一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func TestOnce(t *testing.T) {
var once sync.Once
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
once.Do(func() {
fmt.Println("hello once")
})
}()
}

wg.Wait()
}

无论多少 goroutine 调用,里面那段逻辑都只会执行一次。从这个名字就能看出来,应该可以用在单例模式,除此之外还支持如下场景:

  • 配置加载一次
  • 连接池初始化
  • 懒加载共享资源

使用once.Do需要注意的是,一旦Do里面的函数panic,也算调用过,后续不会再重试。所以有那种可能出现失败重试的逻辑,不要使用这个特性。

Goroutine泄露

一个goroutine被永久阻塞,无法退出,这个就叫goroutine leak

例如下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
// fatal error: all goroutines are asleep - deadlock!
// goroutine 1 [chan receive]:
func TestGoroutineLeak(t *testing.T) {
ch := make(chan int)
go func() {
// 无人接收,永远阻塞
ch <- 1
}()

select {}
}

此时,可以利用context来解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func send(ctx context.Context, ch chan<- int) {
select {
case ch <- 1:
fmt.Println("sent...")
case <-ctx.Done():
fmt.Println("cancelled...")
return
}
}

func TestGoroutineLeakContext(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
ch := make(chan int)
go send(ctx, ch)
time.Sleep(2 * time.Second)
}

即使没人接收,goroutine也会因为context超市而退出。

Worker Pool

学习并发,那就一定会涉及到这个问题,对于Java来说,可以使用ThreadPool来处理,对于Go,也可以实现类似的效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func workerPool(id int, jobs <-chan int, wg *sync.WaitGroup) {
defer wg.Done()

for job := range jobs {
fmt.Printf("worker %d processing job %d\n", id, job)
time.Sleep(500 * time.Millisecond)
}
}

func TestGoroutineWorkerPool(t *testing.T) {
// 最多3个worker同时处理任务
const workerCount = 3
const jobCount = 10

jobs := make(chan int)
var wg sync.WaitGroup

for i := 1; i <= workerCount; i++ {
wg.Add(1)
go workerPool(i, jobs, &wg)
}

for j := 1; j <= jobCount; j++ {
jobs <- j
}

close(jobs)
wg.Wait()
fmt.Println("all jobs done")

}

panic在goroutine中不会被其他goroutine recover

下面看个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
func TestGoroutinePanic(t *testing.T) {
defer func() {
if r := recover(); r != nil {
fmt.Println("recover:", r)
}
}()

go func() {
panic("panic")
}()

select {}
}

这里的recover捕获不到另一个goroutine里面的panic

recover只能捕获同一个goroutine调用栈上的panic,不能跨goroutine捕获。