1. 概述#
Google 一开始写 Golang 的时候就是为了解决 Google 内部业务的高并发需求,而且 Golang 的一大特点就是高并发,所以本文就介绍与 Golang 高并发相关的原理,概念以及技术点。
我会首先介绍一些概念,如:并行和并发,进程、线程和协程以及它们的区别,然后介绍 Golang 里面的 goroutine 和 channel,它们是 Golang 实现高并发的关键,在聊一下 select,定时器,runtime 和同步锁,最后介绍 Go 的并发优势,并发模型和 Go 的调度器。
2. 并行、并发#
学过操作系统的话,应该对并行和并发不陌生
并行:
在同一时刻,有多条指令在多个处理器上同时执行
并发:
在同一时刻只能有一条指令执行,但多个进程指令被快速轮换执行(根据不同的情况有不同的轮换算法)
并行与并发的区别:
- 并行在多处理器系统中存在,而并发可以在但处理器和多处理器系统中存在
- 并行要求程序能够同时执行多个操作,而并发只要求程序假装同时执行多个操作(一个时间片执行一个操作,再轮换多个操作)
3. 进程、线程、协程#
进程:
是包含计算机指令,用户数据和系统数据的程序执行环境,以及包含其允许时获得的其他类型资源
线程:
相对进程是更加小巧而轻量的实体,线程有进程创建且包含自己的控制流和栈,进程和线程的区别在于:进程是正在执行的二进制文件,而线程是进程的子集
协程:
协程(goroutine)是 Go 程序并发执行的最小单元,因 goroutine 不像 Unix 进行那样是自治的实体,goroutine 主要优点是非常轻巧,轻松运行成千上万个都没问题,goroutine 比线程还轻量,goroutine 需要一个进程的环境才能存在,创建 goroutine 的时候,需要一个进程且这个进程至少有一个线程。协程是一种用户态的轻量级线程,协程的调度完全由用户控制,协程间的切换只需要保保存任务的上下文,没有内核的开销。线程栈空间通常是 2M,Goroutine 栈空间最小 2K
4. goroutine#
上面介绍了协程(下文统一用 goroutine)的概念,下面介绍一下 goroutine 的实际语法。
在 Go 语言中使用 go 关键字后跟函数名称或定义完整的匿名函数即可开启一个新的 goroutine,使用 go 关键字调用函数后会立即返回,该函数在后台作为 goroutine 运行,程序的其余部分会继续执行。
创建一个 goroutine
package main
import (
"fmt"
"time"
)
func main() {
go function()
go func() {
for i := 10; i < 20; i++ {
fmt.Print(i, " ")
}
}()
time.Sleep(1 * time.Second)
}
func function() {
for i := 0; i < 10; i++ {
fmt.Print(i)
}
fmt.Println()
}
你可能会发现上面的输出不是固定的(main 函数可能会提前结束),我们可以用 sync 包来解决这个问题。
package main
import (
"flag"
"fmt"
"sync"
)
func main() {
n := flag.Int("n", 20, "Number of goroutines")
flag.Parse()
count := *n
fmt.Printf("Going to create %d goroutines.\n", count)
var waitGroup sync.WaitGroup //定义sync.WaitGroup类型的变量
fmt.Printf("%#v\n", waitGroup)
for i := 0; i < count; i++ { //使用for循环创建所需数量的goroutine
waitGroup.Add(1) //每次调用都会增加sync.WaitGroup变量中的计数器,防止出现任何竞争条件
go func(x int) {
defer waitGroup.Done() //sync.WaitGroup变量减一
fmt.Printf("%d ", x)
}(i)
}
fmt.Printf("%#v\n", waitGroup)
waitGroup.Wait() //sync.Wait调用将阻塞,直到sync.WaitGroup变量中的计数器为0,从而保证所有groutine能执行完成
fmt.Println("\nExiting...")
}
5. channel#
channel(通道)是 Go 共的一种通信机制,允许 goroutine 之间进行数据传输。
一些明确的规定:
- 每个 channel 只允许交换指定类型的数据,也就是通道的元素类型
- 要是 channel 正常运行,需要保证通道有数据接受方法
使用 chan 关键字即可声明一个 channel,可以使用 close () 函数来关闭通道
当使用 channel 作为函数时,可以指定其为单向 channel
5.1 channel 的写入#
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan int)
go writeToChannel(c, 10)
time.Sleep(1 * time.Second)
}
func writeToChannel(c chan int, x int) {
fmt.Println(x)
c <- x
close(c)
fmt.Println(x)
}
5.2 从 channel 接受数据#
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan int)
go writeToChannel(c, 10)
time.Sleep(1 * time.Second)
fmt.Println("Read:", <-c)
time.Sleep(1 * time.Second)
_, ok := <-c
if ok {
fmt.Println("Channel is open!")
}else {
fmt.Println("Channel is closed!")
}
}
func writeToChannel(c chan int, x int) {
fmt.Println("l", x)
c <- x
close(c)
fmt.Println("2", x)
}
5.3 channel 作为函数参数传递#
package main
import (
"fmt"
//"time"
)
func main() {
c := make(chan bool, 1)
for i := 0; i < 10; i++ {
go Go(c, i)
}
<-c
}
func Go(c chan bool, index int) {
sum := 0
for i := 0; i < 1000000; i++ {
sum += i
}
fmt.Println(sum)
c <- true
}
6. select#
Go 中 select 语句看起来像 channels 的 switch 语句,实际上,select 允许 goroutine 等待多个通信操作,因此,使用 select 的主要好处是:select 可以处理多个 channels,进行非阻塞操作。
注意:使用 channels 和 select 的最大问题是 死锁 。为了解锁死锁问题,后面会介绍同步锁。
package main
import(
"fmt"
"math/rand"
"os"
"strconv"
"time"
)
func main() {
rand.Seed(time.Now().Unix())
createNumber := make(chan int)
end := make(chan bool)
if len(os.Args) != 2 {
fmt.Println("Please give me an integer!")
return
}
n, _ := strconv.Atoi(os.Args[1])
fmt.Printf("Going to create %d random numbers.\n", n)
go gen(0, 2*n, createNumber, end)
for i := 0; i < n; i++ {
fmt.Printf("%d ", <-createNumber)
}
time.Sleep(5 * time.Second) //给gen()函数中的time.After()函数足够时间返回,从而激活select分支
fmt.Println("Exting...")
end <- true //激活gen()里面的select语句中的case->end 分支来终止程序并执行相关代码
}
func gen(min, max int, createNumber chan int, end chan bool) {
for {
select {
case createNumber <- rand.Intn(max-min) + min:
case <- end:
close(end)
return
case <- time.After(4 * time.Second): //time.After函数在指定时间过后返回,因此它将在其他channels被阻塞时解锁select语句
fmt.Println("\ntime.After()!") //可以把这个case当作default分支
}
}
}
注意:select 语句不需要 default 分支
select 语句不是按顺序计算的,因为所有的 channels 都是同时检查的
如果 select 语句中没有 channels 是准备好的,那么 select 语句就会 阻塞 ,直到有 channels 准备好,Go 运行时就会在这些准备好的 channels 之间做 随机选择 ,做到公平一致
select 最大的优点是:可以连接、编排、管理多个 channels
当 channels 连接 goroutine 的时候,select 连接那些连接 goroutine 的 channels
7. 定时器#
介绍 select 的时候也用到了定时器,那么什么是定时器呢?
定时器是一种通过设置一项任务,在未来的某个时刻执行该任务的机制
定时器有两种:
- 只执行一次的延时模式
- 每隔一段时间执行一次的间隔模式
Go 语言中的定时器比较完善,所有的 API 都在 time 包中
7.1 延时模式#
延迟执行有两种:time.After 和 time.Sleep
7.1.1 time.After#
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("1")
timeAfterTrigger := time.After(1 * time.Second)
<-timeAfterTrigger
fmt.Println("2")
}
time 包提供了运算好的几个 int 类型常量
const (
Nanosecond Duration = 1
Microsecond = 1000 * Nanosecond
Millisecond = 1000 * Microsecond
Second = 1000 * Millisecond
Minute = 60 * Second
Hour = 60 * Minute
)
7.1.2 time.Sleep#
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("1")
time.Sleep(1 * time.Second)
fmt.Println("2")
}
两者的区别是:time.Sleep 是阻塞当前协程的,而 time.After 是基于 channel 实现的,可以在不同协程中传递
7.2 间隔模式#
间隔模式有分为两种:一种是执行 N 次后结束,另一种是程序不停休的执行
7.2.1 time.NewTicker#
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("1")
count := 0
timeTicker := time.NewTicker(1 * time.Second)
for {
<-timeTicker.C
fmt.Println("每隔 1 秒输出 2")
count++
if count >= 5 {
timeTicker.Stop()
}
}
}
7.2.2 time.Tick#
package main
import (
"fmt"
"time"
)
func main() {
t := time.Tick(1 * time.Second)
for {
<-t
fmt.Println("每隔 1 秒输出一次")
}
}
7.3 控制定时器#
定时器提供了 Stop 方法和 Reset 方法
- Stop 方法的作用是停止定时器
- Reset 方法的作用是改变定时器的间隔时间
7.3.1 time.Stop#
package main
import (
"fmt"
"time"
)
func main() {
timer := time.NewTimer(time.Second * 6)
go func() {
<-timer.C
fmt.Println("时间到")
}()
timer.Stop()
}
7.3.2 time.Reset#
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("1")
count := 0
timeTicker := time.NewTicker(1 * time.Second)
for {
<-timeTicker.C
fmt.Println("2")
count++
if count >= 3 {
timeTicker.Reset(2 * time.Second)
}
}
}
8. runtime#
runtime 是 Go 语言运行所需要的基础设施,如:控制 goroutine 的功能,debug,pprof、trace、race 进行检测的支持,内存分配,系统操作和 CPU 相关操作的封装(信号处理、系统调用、寄存器操作、原子操作等),map、channel、string 等内置类型及反射的实现
与 Java、python 中的 runtime 不同,Java、python 的 runtime 是虚拟机的,而 Go 的 runtime 是和用户代码一起编译到一个可执行文件中的
runtime 发展历程:
9. 同步锁#
上文提高 channels 和 select 的最大问题是 死锁 ,这小节介绍解决死锁的问题 -- 同步锁
Go 语言同步锁有两种方式:原子锁,互斥锁
9.1 原子锁#
可以借助某个信号向所有的 goroutine 发送消息
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
shotdown int64 // 该标志向多个goroutine通知状态
wg sync.WaitGroup
)
func main() {
wg.Add(2)
go doWork("A")
go doWork("B")
time.Sleep(1 * time.Second)
atomic.StoreInt64(&shotdown, 1) // 修改
wg.Wait()
}
func doWork(s string) {
defer wg.Done()
for {
fmt.Printf("Doing homework %s\n", s)
time.Sleep(2 * time.Second)
if atomic.LoadInt64(&shotdown) == 1 { // 读取
fmt.Printf("Shotdown home work %s\n", s)
break
}
}
}
9.2 互斥锁#
通过 mutex ,能够将一段临界区间包含起来,只运行单个 goroutine 执行
package main
import (
"fmt"
"runtime"
"sync"
)
var (
counter int
wg sync.WaitGroup
mutex sync.Mutex // 定义临界区
)
func main() {
wg.Add(2)
go incCount(1)
go incCount(2)
wg.Wait()
fmt.Printf("Final Counter: %d\n", counter)
}
func incCount(i int) {
defer wg.Done()
for count := 0; count < 2; count++ {
mutex.Lock()
{
value := counter
runtime.Gosched()
value++
counter = value
}
mutex.Unlock()
}
}
10. Go 并发优势#
Go 语言为并发编程而内置的上层 API 基于 CSP (communicating sequential processes, 顺序通信进程) 模型。这就意味着显式锁都是可以避免的,因为 Go 语言通过相册安全的通道发送和接受数据以实现同步,这大大地简化了并发程序的编写。
一般情况下,一个普通的桌面计算机跑十几二十个线程就有点负载过大了,但是同样这台机器却可以轻松地让成百上千甚至过万个 goroutine 进行资源竞争
11. Go 并发模型#
Go 语言实现了两种并发形式:
- 多线程共享内存(通过共享内存来通信)
- CSP(communicating sequential processes)并发模型(以通信的方式来共享内存)
Do not communicate by sharing memory; instead, share memory by communicating
Java、C++、python 它们的线程都是通过共享内存来通信的
Go 的 CSP 并发模型通过 goroutine 和 channel 来实现
goroutine 与 channel 结合使用案例:
package main
import (
"fmt"
)
//write Data
func writeData(intChan chan int) {
for i := 1; i <= 50; i++ {
//放入数据
intChan<- i //
fmt.Println("writeData ", i)
}
close(intChan) //关闭
}
//read data
func readData(intChan chan int, exitChan chan bool) {
for {
v, ok := <-intChan
if !ok {
break
}
fmt.Printf("readData 读到数据=%v\n", v)
}
//readData 读取完数据后,即任务完成
exitChan<- true
close(exitChan)
}
func main() {
//创建两个管道
intChan := make(chan int, 10)
exitChan := make(chan bool, 1)
go writeData(intChan)
go readData(intChan, exitChan)
for {
_, ok := <-exitChan
if !ok {
break
}
}
}
12 Go 调度器#
GO 语言的调度器使用了三种结构:
G:
G 代表 goroutine,每个 Goroutine 对应一个 G 结构体,G 存储 Goroutine 的运行堆栈、状态以及任务函数,可重用
M:
M 代表内核线程,代表着真正执行计算的资源,在绑定有效的 P 后,进入 schedule 循环;而 schedule 循环的机制大致是从 Global 队列、P 的 Local 队列以及 wait 队列中获取
{{< /admonition >}}
P:
P 代表逻辑处理器,表示调度的上下文。可以把它看作是一个局部的调度器,让 Go 代码跑在一个单独的线程上。这是让 Go 从一个 N:1 调度器映射到一个 M调度器的关键。
对 G 来说,P 相当于 CPU 核,G 只有绑定到 P 才能被调度。
对 M 来说,P 提供了相关的执行环境 (Context),如内存分配状态 (mcache),任务队列 (G) 等
P 的数量决定了系统内最大可并行的 G 的数量(前提:物理 CPU 核数 >= P 的数量)。
P 的数量由用户设置的 GoMAXPROCS 决定,但是不论 GoMAXPROCS 设置为多大,P 的数量最大为 256
用经典的 地鼠推车搬砖 的模型来说明三者关系
地鼠的工作任务是:工地上有若干砖头,地鼠借助小车把砖头运送到火种上
13. 总结#
本文介绍了与 Golang 并发相关的一些知识,从最开始的一些基础概念,包括:并行、并发和进程、线程、协程,到 Golang 并发的一些实际用法,包括:goroutine、channel、select、定时器和同步锁,也简单的介绍了 runtime,最后介绍了 Go 的调度器模型。