xfeng

xfeng

健身 技术 阅读 思考 记录
tg_channel
tg_channel
github
bilibili
tg_channel

Golang为并发而生

image

1. 概述#

Google 一开始写 Golang 的时候就是为了解决 Google 内部业务的高并发需求,而且 Golang 的一大特点就是高并发,所以本文就介绍与 Golang 高并发相关的原理,概念以及技术点。

我会首先介绍一些概念,如:并行和并发,进程、线程和协程以及它们的区别,然后介绍 Golang 里面的 goroutine 和 channel,它们是 Golang 实现高并发的关键,在聊一下 select,定时器,runtime 和同步锁,最后介绍 Go 的并发优势,并发模型和 Go 的调度器。

2. 并行、并发#

学过操作系统的话,应该对并行和并发不陌生

image

并行:
在同一时刻,有多条指令在多个处理器上同时执行

image

并发:
在同一时刻只能有一条指令执行,但多个进程指令被快速轮换执行(根据不同的情况有不同的轮换算法)

并行与并发的区别:

  • 并行在多处理器系统中存在,而并发可以在但处理器和多处理器系统中存在
  • 并行要求程序能够同时执行多个操作,而并发只要求程序假装同时执行多个操作(一个时间片执行一个操作,再轮换多个操作)

3. 进程、线程、协程#

image

进程:
是包含计算机指令,用户数据和系统数据的程序执行环境,以及包含其允许时获得的其他类型资源

线程:
相对进程是更加小巧而轻量的实体,线程有进程创建且包含自己的控制流和栈,进程和线程的区别在于:进程是正在执行的二进制文件,而线程是进程的子集

协程:
协程(goroutine)是 Go 程序并发执行的最小单元,因 goroutine 不像 Unix 进行那样是自治的实体,goroutine 主要优点是非常轻巧,轻松运行成千上万个都没问题,goroutine 比线程还轻量,goroutine 需要一个进程的环境才能存在,创建 goroutine 的时候,需要一个进程且这个进程至少有一个线程。协程是一种用户态的轻量级线程,协程的调度完全由用户控制,协程间的切换只需要保保存任务的上下文,没有内核的开销。线程栈空间通常是 2M,Goroutine 栈空间最小 2K

4. goroutine#

上面介绍了协程(下文统一用 goroutine)的概念,下面介绍一下 goroutine 的实际语法。

在 Go 语言中使用 go 关键字后跟函数名称或定义完整的匿名函数即可开启一个新的 goroutine,使用 go 关键字调用函数后会立即返回,该函数在后台作为 goroutine 运行,程序的其余部分会继续执行。

创建一个 goroutine

package main
import (
	"fmt"
	"time"
)

func main()  {
	go function()
	go func() {
		for i := 10; i < 20; i++ {
			fmt.Print(i, " ")
		}
	}()
	time.Sleep(1 * time.Second)
}

func function() {
	for i := 0; i < 10; i++ {
		fmt.Print(i)
	}
	fmt.Println()
}

你可能会发现上面的输出不是固定的(main 函数可能会提前结束),我们可以用 sync 包来解决这个问题。

package main
import (
	"flag"
	"fmt"
	"sync"
)

func main() {
	n := flag.Int("n", 20, "Number of goroutines")
	flag.Parse()
	count := *n
	fmt.Printf("Going to create %d goroutines.\n", count)
	var waitGroup sync.WaitGroup //定义sync.WaitGroup类型的变量

	fmt.Printf("%#v\n", waitGroup)
	for i := 0; i < count; i++ { //使用for循环创建所需数量的goroutine
		waitGroup.Add(1) //每次调用都会增加sync.WaitGroup变量中的计数器,防止出现任何竞争条件
		go func(x int) {
			defer waitGroup.Done() //sync.WaitGroup变量减一
			fmt.Printf("%d ", x)
		}(i)
	}

	fmt.Printf("%#v\n", waitGroup)
	waitGroup.Wait() //sync.Wait调用将阻塞,直到sync.WaitGroup变量中的计数器为0,从而保证所有groutine能执行完成
	fmt.Println("\nExiting...")
}

5. channel#

channel(通道)是 Go 共的一种通信机制,允许 goroutine 之间进行数据传输。

一些明确的规定:

  • 每个 channel 只允许交换指定类型的数据,也就是通道的元素类型
  • 要是 channel 正常运行,需要保证通道有数据接受方法

使用 chan 关键字即可声明一个 channel,可以使用 close () 函数来关闭通道

当使用 channel 作为函数时,可以指定其为单向 channel

image

5.1 channel 的写入#

package main
import (
	"fmt"
	"time"
)
func main() {
	c := make(chan int)
	go writeToChannel(c, 10)
	time.Sleep(1 * time.Second)
}
func writeToChannel(c chan int, x int) {
	fmt.Println(x)
	c <- x
	close(c)
	fmt.Println(x)
}

5.2 从 channel 接受数据#

package main
import (
	"fmt"
	"time"
)
func main() {
	c := make(chan int)
	go writeToChannel(c, 10)
	time.Sleep(1 * time.Second)
	fmt.Println("Read:", <-c)
	time.Sleep(1 * time.Second)
	_, ok := <-c
	if ok {
		fmt.Println("Channel is open!")
	}else {
		fmt.Println("Channel is closed!")
	}
}
func writeToChannel(c chan int, x int) {
	fmt.Println("l", x)
	c <- x
	close(c)
	fmt.Println("2", x)
}

5.3 channel 作为函数参数传递#

package main

import (
	"fmt"
	//"time"
)
func main() {
	c := make(chan bool, 1)
	for i := 0; i < 10; i++ {
		go Go(c, i)
	}

	<-c
}
func Go(c chan bool, index int) {
	sum := 0
	for i := 0; i < 1000000; i++ {
		sum += i
	}
	fmt.Println(sum)
	c <- true
}

6. select#

Go 中 select 语句看起来像 channels 的 switch 语句,实际上,select 允许 goroutine 等待多个通信操作,因此,使用 select 的主要好处是:select 可以处理多个 channels,进行非阻塞操作。

注意:使用 channels 和 select 的最大问题是 死锁 。为了解锁死锁问题,后面会介绍同步锁。

package main
import(
	"fmt"
	"math/rand"
	"os"
	"strconv"
	"time"
)
func main() {
	rand.Seed(time.Now().Unix())
	createNumber := make(chan int)
	end := make(chan bool)
	if len(os.Args) != 2 {
		fmt.Println("Please give me an integer!")
		return
	}
	n, _ := strconv.Atoi(os.Args[1])
	fmt.Printf("Going to create %d random numbers.\n", n)
	go gen(0, 2*n, createNumber, end)
	for i := 0; i < n; i++ {
		fmt.Printf("%d ", <-createNumber)
	}
    time.Sleep(5 * time.Second)  //给gen()函数中的time.After()函数足够时间返回,从而激活select分支
	fmt.Println("Exting...")
    end <- true  //激活gen()里面的select语句中的case->end 分支来终止程序并执行相关代码
}
func gen(min, max int, createNumber chan int, end chan bool) {
	for {
		select {
		case createNumber <- rand.Intn(max-min) + min:
		case <- end:
			close(end)
			return
		case <- time.After(4 * time.Second): //time.After函数在指定时间过后返回,因此它将在其他channels被阻塞时解锁select语句
			fmt.Println("\ntime.After()!") //可以把这个case当作default分支
		}
	}
}

注意:select 语句不需要 default 分支

select 语句不是按顺序计算的,因为所有的 channels 都是同时检查的

如果 select 语句中没有 channels 是准备好的,那么 select 语句就会 阻塞 ,直到有 channels 准备好,Go 运行时就会在这些准备好的 channels 之间做 随机选择 ,做到公平一致

select 最大的优点是:可以连接、编排、管理多个 channels

当 channels 连接 goroutine 的时候,select 连接那些连接 goroutine 的 channels

7. 定时器#

介绍 select 的时候也用到了定时器,那么什么是定时器呢?

定时器是一种通过设置一项任务,在未来的某个时刻执行该任务的机制

定时器有两种:

  • 只执行一次的延时模式
  • 每隔一段时间执行一次的间隔模式

Go 语言中的定时器比较完善,所有的 API 都在 time 包中

7.1 延时模式#

延迟执行有两种:time.After 和 time.Sleep

7.1.1 time.After#

package main

import (
	"fmt"
	"time"
)
func main() {
	fmt.Println("1")
	timeAfterTrigger := time.After(1 * time.Second)
	<-timeAfterTrigger
	fmt.Println("2")
}

time 包提供了运算好的几个 int 类型常量

const (
	Nanosecond  Duration = 1
	Microsecond          = 1000 * Nanosecond
	Millisecond          = 1000 * Microsecond
	Second               = 1000 * Millisecond
	Minute               = 60 * Second
	Hour                 = 60 * Minute
)

7.1.2 time.Sleep#

package main

import (
	"fmt"
	"time"
)
func main() {
	fmt.Println("1")
	time.Sleep(1 * time.Second)
	fmt.Println("2")
}

两者的区别是:time.Sleep 是阻塞当前协程的,而 time.After 是基于 channel 实现的,可以在不同协程中传递

7.2 间隔模式#

间隔模式有分为两种:一种是执行 N 次后结束,另一种是程序不停休的执行

7.2.1 time.NewTicker#

package main

import (
	"fmt"
	"time"
)
func main() {
	fmt.Println("1")
	count := 0
	timeTicker := time.NewTicker(1 * time.Second)
	for {
		<-timeTicker.C
		fmt.Println("每隔 1 秒输出 2")
		count++
		if count >= 5 {
			timeTicker.Stop()
		}
	}
}

7.2.2 time.Tick#

package main

import (
	"fmt"
	"time"
)
func main() {
	t := time.Tick(1 * time.Second)
	for {
		<-t
		fmt.Println("每隔 1 秒输出一次")
	}
}

7.3 控制定时器#

定时器提供了 Stop 方法和 Reset 方法

  • Stop 方法的作用是停止定时器
  • Reset 方法的作用是改变定时器的间隔时间

7.3.1 time.Stop#

package main

import (
	"fmt"
	"time"
)

func main() {
	timer := time.NewTimer(time.Second * 6)
	go func() {
		<-timer.C
		fmt.Println("时间到")
	}()
	timer.Stop()
}

7.3.2 time.Reset#

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("1")
	count := 0
	timeTicker := time.NewTicker(1 * time.Second)
	for {
		<-timeTicker.C
		fmt.Println("2")
		count++
		if count >= 3 {
			timeTicker.Reset(2 * time.Second)
		}
	}
}

8. runtime#

runtime 是 Go 语言运行所需要的基础设施,如:控制 goroutine 的功能,debug,pprof、trace、race 进行检测的支持,内存分配,系统操作和 CPU 相关操作的封装(信号处理、系统调用、寄存器操作、原子操作等),map、channel、string 等内置类型及反射的实现

与 Java、python 中的 runtime 不同,Java、python 的 runtime 是虚拟机的,而 Go 的 runtime 是和用户代码一起编译到一个可执行文件中的

runtime 发展历程:

image

9. 同步锁#

上文提高 channels 和 select 的最大问题是 死锁 ,这小节介绍解决死锁的问题 -- 同步锁

Go 语言同步锁有两种方式:原子锁,互斥锁

9.1 原子锁#

可以借助某个信号向所有的 goroutine 发送消息

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

var (
	shotdown int64 // 该标志向多个goroutine通知状态
	wg       sync.WaitGroup
)

func main() {
	wg.Add(2)

	go doWork("A")
	go doWork("B")

	time.Sleep(1 * time.Second)

	atomic.StoreInt64(&shotdown, 1) // 修改
	wg.Wait()
}

func doWork(s string) {
	defer wg.Done()

	for {
		fmt.Printf("Doing homework %s\n", s)
		time.Sleep(2 * time.Second)

		if atomic.LoadInt64(&shotdown) == 1 { // 读取
			fmt.Printf("Shotdown home work %s\n", s)
			break
		}
	}
}

9.2 互斥锁#

通过 mutex ,能够将一段临界区间包含起来,只运行单个 goroutine 执行

package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	counter int
	wg      sync.WaitGroup
	mutex   sync.Mutex // 定义临界区
)

func main() {
	wg.Add(2)

	go incCount(1)
	go incCount(2)

	wg.Wait()
	fmt.Printf("Final Counter: %d\n", counter)
}

func incCount(i int) {
	defer wg.Done()

	for count := 0; count < 2; count++ {
		mutex.Lock()
		{
			value := counter
			runtime.Gosched()
			value++
			counter = value
		}
		mutex.Unlock()
	}
}

10. Go 并发优势#

Go 语言为并发编程而内置的上层 API 基于 CSP (communicating sequential processes, 顺序通信进程) 模型。这就意味着显式锁都是可以避免的,因为 Go 语言通过相册安全的通道发送和接受数据以实现同步,这大大地简化了并发程序的编写。

一般情况下,一个普通的桌面计算机跑十几二十个线程就有点负载过大了,但是同样这台机器却可以轻松地让成百上千甚至过万个 goroutine 进行资源竞争

11. Go 并发模型#

Go 语言实现了两种并发形式:

  • 多线程共享内存(通过共享内存来通信)
  • CSP(communicating sequential processes)并发模型(以通信的方式来共享内存)

Do not communicate by sharing memory; instead, share memory by communicating

Java、C++、python 它们的线程都是通过共享内存来通信的

Go 的 CSP 并发模型通过 goroutine 和 channel 来实现

goroutine 与 channel 结合使用案例:

image

package main
import (
	"fmt"
)
 
 
//write Data
func writeData(intChan chan int) {
	for i := 1; i <= 50; i++ {
		//放入数据
		intChan<- i //
		fmt.Println("writeData ", i)
	}
	close(intChan) //关闭
}
 
//read data
func readData(intChan chan int, exitChan chan bool) {
 
	for {
		v, ok := <-intChan
		if !ok {
			break
		}
		fmt.Printf("readData 读到数据=%v\n", v) 
	}
	//readData 读取完数据后,即任务完成
	exitChan<- true
	close(exitChan)
 
}
 
func main() {
 
	//创建两个管道
	intChan := make(chan int, 10)
	exitChan := make(chan bool, 1)
	
	go writeData(intChan)
	go readData(intChan, exitChan)
 
	for {
		_, ok := <-exitChan
		if !ok {
			break
		}
	}
}

12 Go 调度器#

GO 语言的调度器使用了三种结构:

G:
G 代表 goroutine,每个 Goroutine 对应一个 G 结构体,G 存储 Goroutine 的运行堆栈、状态以及任务函数,可重用

M:
M 代表内核线程,代表着真正执行计算的资源,在绑定有效的 P 后,进入 schedule 循环;而 schedule 循环的机制大致是从 Global 队列、P 的 Local 队列以及 wait 队列中获取
{{< /admonition >}}

P:
P 代表逻辑处理器,表示调度的上下文。可以把它看作是一个局部的调度器,让 Go 代码跑在一个单独的线程上。这是让 Go 从一个 N:1 调度器映射到一个 M调度器的关键。

对 G 来说,P 相当于 CPU 核,G 只有绑定到 P 才能被调度。

对 M 来说,P 提供了相关的执行环境 (Context),如内存分配状态 (mcache),任务队列 (G) 等

P 的数量决定了系统内最大可并行的 G 的数量(前提:物理 CPU 核数 >= P 的数量)。

P 的数量由用户设置的 GoMAXPROCS 决定,但是不论 GoMAXPROCS 设置为多大,P 的数量最大为 256

用经典的 地鼠推车搬砖 的模型来说明三者关系

image

地鼠的工作任务是:工地上有若干砖头,地鼠借助小车把砖头运送到火种上

13. 总结#

本文介绍了与 Golang 并发相关的一些知识,从最开始的一些基础概念,包括:并行、并发和进程、线程、协程,到 Golang 并发的一些实际用法,包括:goroutine、channel、select、定时器和同步锁,也简单的介绍了 runtime,最后介绍了 Go 的调度器模型。

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。