Go并发基础


概述

基本上大部分编程语言在并发上的编程范式都差不多,配合Cpp食用更佳。这里推荐一本书,<<深入理解Go并发编程>>。

goroutine

动态栈

操作系统的线程一般都有固定的栈内存(通常为2MB),而 Go 语言中的 goroutine 非常轻量级,一个 goroutine 的初始栈空间很小(一般为2KB),所以在 Go 语言中一次创建数万个 goroutine 也是可能的。并且 goroutine 的栈不是固定的,可以根据需要动态地增大或缩小, Go 的 runtime 会自动为 goroutine 分配合适的栈空间。

goroutine调度

操作系统内核在调度时会挂起当前正在执行的线程并将寄存器中的内容保存到内存中,然后选出接下来要执行的线程并从内存中恢复该线程的寄存器信息,然后恢复执行该线程的现场并开始执行线程。从一个线程切换到另一个线程需要完整的上下文切换。因为可能需要多次内存访问,索引这个切换上下文的操作开销较大,会增加运行的cpu周期。

区别于操作系统内核调度操作系统线程,goroutine 的调度是Go语言运行时(runtime)层面的实现,是完全由 Go 语言本身实现的一套调度系统——go scheduler。它的作用是按照一定的规则将所有的 goroutine 调度到操作系统线程上执行。

在经历数个版本的迭代之后,目前 Go 语言的调度器采用的是 GPM 调度模型。

其中:

  • G:表示 goroutine,每执行一次go f()就创建一个 G,包含要执行的函数和上下文信息。
  • 全局队列(Global Queue):存放等待运行的 G。
  • P:表示 goroutine 执行所需的资源,最多有 GOMAXPROCS 个。
  • P 的本地队列:同全局队列类似,存放的也是等待运行的G,存的数量有限,不超过256个。新建 G 时,G 优先加入到 P 的本地队列,如果本地队列满了会批量移动部分 G 到全局队列。
  • M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,当 P 的本地队列为空时,M 也会尝试从全局队列或其他 P 的本地队列获取 G。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。
  • Goroutine 调度器和操作系统调度器是通过 M 结合起来的,每个 M 都代表了1个内核线程,操作系统调度器负责把内核线程分配到 CPU 的核上执行。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的, goroutine 则是由Go运行时(runtime)自己的调度器调度的,完全是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身 goroutine 的超轻量级,以上种种特性保证了 goroutine 调度方面的性能。

  • Work stealing

    所有P放满了才会放到全局队列,如果某个M没东西了,会从其他P偷一个G过来,· 其他P没得偷从全局中拿一个

  • handoff

如果某P的本地队列中的某G使得M阻塞,P会移动到另一个唤醒/创建的M,原来的M在G运行完之后睡眠或者销毁

sync.WaitGroup

可以用sync.WaitGroup来让主线程在其他用户线程之后退出

package main

import (
    "fmt"
    "sync"
)

// 声明全局等待组变量
var wg sync.WaitGroup

func hello() {
    fmt.Println("hello")
    wg.Done() // 告知当前goroutine完成
}

func main() {
    wg.Add(1) // 登记1个goroutine
    go hello()
    fmt.Println("你好")
    wg.Wait() // 阻塞等待登记的goroutine完成
}

channel

Go语言采用的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信

只读通道:v <-chan int

只写通道:v chan<- int

注意:一个通道值是可以被垃圾回收掉的。通道通常由发送方执行关闭操作,并且只有在接收方明确等待通道关闭的信号时才需要执行关闭操作。它和关闭文件不一样,通常在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致 panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致 panic。

无缓冲通道:c := make(chan int)

有缓冲通道:cc := make(chan int, 3)

对无缓冲通道发送消息而不接收会造成死锁,我们要创建一个goroutine来接收通道中的内容:

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}

func main() {
    ch := make(chan int)
    go recv(ch) // 创建一个 goroutine 从通道接收值
    ch <- 10
    fmt.Println("发送成功")
}

我们可以用多返回值模式来判断通道是否关闭:

value, ok := <- ch

其中:

  • value:从通道中取出的值,如果通道被关闭则返回对应类型的零值。
  • ok:通道ch关闭时返回 false,否则返回 true。

Worker Pool

E.g:三个线程干五个活

package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Println("worker id: ", id, "doing job: ", job)
        results <- job * 2
        time.Sleep(time.Second)
        fmt.Println("worker id: ", id, "finished job: ", job)
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    for j := 0; j < 3; j++ {
        go worker(j, jobs, results)
    }

    for i := 0; i < 5; i++ {
        jobs <- i
    }
    close(jobs)

    for i := 0; i < 5; i++ {
        fmt.Println(<-results)
    }

}

select

Select 语句具有以下特点。

  • 可处理一个或多个 channel 的发送/接收操作。
  • 如果多个 case 同时满足,select 会随机选择一个执行。
  • 对于没有 case 的 select 会一直阻塞,可用于阻塞 main 函数,防止退出
package main

import "fmt"

func main() {
    ch := make(chan int, 1)

    for i := 0; i < 10; i++ {
        select { //如果多个case都满足,随机选一个执行
        case x := <-ch: //可以从channel中取值
            fmt.Println("Receiving", x)

        case ch <- i:
            fmt.Println("Sending ", i) //可以从channel中存放值
        default:
            fmt.Println("do nothing")
        }
    }
}

并发安全和互斥锁

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源。Go 语言中使用sync包中提供的Mutex类型来实现互斥锁。

sync.Mutex提供了两个方法供我们使用。

方法名功能
func (m *Mutex) Lock()获取互斥锁
func (m *Mutex) Unlock()释放互斥锁
package main

import (
    "fmt"
    "sync"
)

// sync.Mutex

var (
    x int64
    wg sync.WaitGroup // 等待组
    m sync.Mutex // 互斥锁
)

// add 对全局变量x执行5000次加1操作
func add() {
    for i := 0; i < 5000; i++ {
        m.Lock() // 修改x前加锁
        x = x + 1
        m.Unlock() // 改完解锁
    }
    wg.Done()
}

func main() {
    wg.Add(2)

    go add()
    go add()

    wg.Wait()
    fmt.Println(x)
}

读写互斥锁

互斥锁是完全互斥的,但是实际上有很多场景是读多写少的,当我们并发的去读取一个资源而不涉及资源修改的时候是没有必要加互斥锁的,这种场景下使用读写锁是更好的一种选择。读写锁在 Go 语言中使用sync包中的RWMutex类型。

sync.RWMutex提供了以下5个方法。

方法名功能
func (rw *RWMutex) Lock()获取写锁
func (rw *RWMutex) Unlock()释放写锁
func (rw *RWMutex) RLock()获取读锁
func (rw *RWMutex) RUnlock()释放读锁
func (rw *RWMutex) RLocker() Locker返回一个实现Locker接口的读写锁

读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。

在读多写少(相差一个数量级)的情况下,使用读写互斥锁比起使用互斥锁可以提高执行效率

// 使用互斥锁,10并发写,1000并发读
do(writeWithLock, readWithLock, 10, 1000) // x:10 cost:1.466500951s

// 使用读写互斥锁,10并发写,1000并发读
do(writeWithRWLock, readWithRWLock, 10, 1000) // x:10 cost:117.207592ms

sync.WaitGroup

方法名功能
func (wg * WaitGroup) Add(delta int)计数器+delta
(wg *WaitGroup) Done()计数器-1
(wg *WaitGroup) Wait()阻塞直到计数器变为0
var wg sync.WaitGroup

func hello() {
    defer wg.Done()
    fmt.Println("Hello Goroutine!")
}
func main() {
    wg.Add(1)
    go hello() // 启动另外一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
    wg.Wait()
}

sync.Once

在某些场景下我们需要确保某些操作即使在高并发的场景下也只会被执行一次,例如只加载一次配置文件等。

Go语言中的sync包中提供了一个针对只执行一次场景的解决方案——sync.Oncesync.Once只有一个Do方法

package singleton

import (
    "sync"
)

type singleton struct {}

var instance *singleton
var once sync.Once

func GetInstance() *singleton {
    once.Do(func() {
        instance = &singleton{}
    })
    return instance
}

sync.Map

Go 语言中内置的 map 不是并发安全的,当并发地写map时会报错fatal error: concurrent map writes,因此我们可以使用sync.Map

方法名功能
func (m *Map) Store(key, value interface{})存储key-value数据
func (m *Map) Load(key interface{}) (value interface{}, ok bool)查询key对应的value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)查询或存储key对应的value
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool)查询并删除key
func (m *Map) Delete(key interface{})删除key
func (m *Map) Range(f func(key, value interface{}) bool)对map中的每个key-value依次调用f
package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := sync.WaitGroup{}
    m := sync.Map{}
    for i := 0; i < 20; i++ {
        go func(i int) {
            wg.Add(1)
            m.Store(i, i+100)
            t, _ := m.Load(i)
            fmt.Println(i, ":", t)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

评论
  目录