并发编程
goroutine
启动单个goroutine
package main
import "fmt"
func hello() {
fmt.Println("Hello Gorotine")
}
func main() {
go hello() // 启动一个goroutine,并发调用函数hello
fmt.Println("main done!")
time.Sleep(time.Second) // 防止hello协程还未执行,主线程就退出了
}
运行结果:
main done!
Hello Gorotine
sync.WaitGroup
sync.WaitGroup
类似Java中的CountDownLaunch
,当sync.WaitGroup
中的值为0时,阻塞将会被放开,用于保证一个或者多个goroutine
完成运行:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello ", i)
}
func main() {
for i := 0; i < 100; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
fmt.Println("main done!")
wg.Wait() // 等待所有goroutine结束登记
}
channel
虽然goroutine
可以通过共享内存的方式来相互通讯进行数据共享,但是这种方式往往会造成资源竞争,从而发生并发安全问题。如果通过锁机制解决资源竞争问题,势必会导致性能下降。
Go语言的并发模型是 CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。如果说goroutine
是Go程序并发的执行体,channel
就是它们之间的连接。channel
是可以让一个goroutine
发送特定值到另一个goroutine
的通信机制。
Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循**先入先出(First In First Out)**的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
注意事项:
一个先进先出的队列
管道是数据安全的,不需要加锁
管道是有类型的,一个
string
类型的channel只能存放string
chan
是引用类型,chan
必须初始化make
之后才能使用在没有使用协程的情况下,如果取/读完了再去取/读就会报
dead lock
错误
无缓冲channel(同步通道)
无缓冲channel通讯示例(正常情况):
package main
import (
"fmt"
"time"
)
func main() {
var ch chan int
fmt.Println(ch) // <nil> channel 是引用类型
// 创建的chan类型变量必须通过make函数初始化之后才可以使用
ch = make(chan int) // 创建一个无缓冲通道
go recv(ch) // 启动一个goroutine用于接收main从通道ch发送的值
// 向通道发送值
ch <- 10 // 此处将会阻塞,等待recv消费完消息后才会继续执行
fmt.Println("main:发送数字10成功")
}
func recv(ci chan int) {
time.Sleep(5 * time.Second)
// 消费通道消息
i := <-ci
fmt.Println("recv:消费了通道内的值 ", i)
}
返回结果:
<nil>
# ... 这里会等待五秒钟
recv:消费了通道内的值 10
main:发送数字10成功
有发送,但是无接收:
package main
import "fmt"
func main() {
// 创建的chan类型变量必须通过make函数初始化之后才可以使用
var ch chan int = make(chan int) // 创建一个无缓冲通道
// 向通道发送值
ch <- 10 // 无缓冲通道需要有接收方才能发送信息,否则将会一直阻塞在此处
// 因为没有任何接收方,main会一直阻塞在此处,所以代码发生了死锁
// 故此处代码运行时将会抛出异常 fatal error: all goroutines are asleep - deadlock!
}
有接收,但是没有发送:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func main() {
var ch = make(chan int) // 创建一个无缓冲通道
wg.Add(1)
go recv(ch)
wg.Wait()
}
func recv(ch chan int) {
i := <-ch // fatal error: all goroutines are asleep - deadlock!
// 消费方也会从通道中取信息,如果信息不存在,会一直阻塞等待,故也会引发死锁错误
fmt.Println("recv:消费了通道内的值 ", i)
wg.Done()
}
无缓冲通道总结:
发送方发送数据到通道,发送方将会开始阻塞,等待者消费者取出,当消费取出数据时,才会继续执行。如果没有消费者,或者消费者没有消费数据,将会一直阻塞,造成程序死锁报错
接收方从通道接收数据,接收方将会开始阻塞,等待发送者发送,直到接收方接收到数据时,才会继续执行。如果没有发送者或者发送者没有像通道发送数据,将会一直阻塞,造成程序死锁报错
有缓冲channel
有时候发送方发送数据不想等待接收方接收数据后才执行,这时以采用有缓冲通道,有缓冲通道的定义与无缓冲的区别在于只需要在make
函数初始化channel
时,使用第二个参数指定缓冲队列的长度即可,代码如下:
package main
import "fmt"
func main() {
// 创建一个容量为1的有缓冲区通道, 只要通道缓冲区容量大于0,就是有缓冲通道
ch := make(chan int, 1)
ch <- 10 // 通道不会阻塞,因为防止在了通道缓冲区
fmt.Println("发送成功")
ch <- 20 // 通道已经满了,当前协程将会阻塞,所以报出运行时异常 fatal error: all goroutines are asleep - deadlock!
}
关闭channel
使用内置函数close()
可以关闭channel
,被关闭后,程序不能向channel
继续写入数据了,但是可以继续从中读取数据。
package main
import "fmt"
func main() {
ic := make(chan int, 3)
ic <- 100
ic <- 200
close(ic) // 关闭管道
// 再次写入就会报错
// ic <- 300 // panic: send on closed channel
// 但是可以继续读取
n := <-ic
fmt.Println(n)
}
注意,当从一个被关闭的通道中拿取数据时,如果通道中没有数据,该操作将不会再阻塞,而是返回一个false的状态位:
n, ok := <- ic
,ok == false
遍历channel
package main
import "fmt"
func main() {
ic := make(chan int, 3)
ic <- 100
ic <- 200
ic <- 300 // panic: send on closed channel
// 如果放入之后,没有关闭,在循环取出时,取出到最后一条,将会发生死锁错误
close(ic)
for i := range ic {
fmt.Printf("i=%v\n", i) // 假设没有关闭,将会报错 error: all goroutines are asleep - deadlock!
}
}
上面的代码等同于如下循环:
package main
import "fmt"
func main() {
ic := make(chan int, 3)
ic <- 100
ic <- 200
ic <- 300 // panic: send on closed channel
// 如果放入之后,没有关闭,在循环取出时,取出到最后一条,将会发生死锁错误
close(ic)
for {
i, ok := <-ic
if !ok {
break
}
fmt.Printf("i=%v\n", i) // 假设没有关闭,将会报错 error: all goroutines are asleep - deadlock!
}
}
只写只读管道
package main
func main() {
// 默认情况下,管道是双向的,也就是可读可写
var chan1 chan int
// 也可以在声明管道时,将管道设置为只读或者只写
// 只写管道
var chan2 chan <- int
chan2 = make(chan int, 3)
chan2 <- 20
num := <- chan2 // Invalid operation: <- chan2 (receive from the send-only type chan<- int)
// 只读管道
var chan3 <- chan int
chan3 = make(chan int, 3)
chan3 <- 30 // Invalid operation: chan3 <- 30 (send to the receive-only type <-chan int)
}
**使用场景:**可以限制某个协程只读、只写管道,如下:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func main() {
c := make(chan int, 3)
wg.Add(2)
go send(c)
go recv(c)
wg.Wait()
}
// 发送方只写
func send(c chan <- int) {
for i := 0; i < 10; i++ {
c <- i
}
close(c)
wg.Done()
}
// 接收方只读
func recv(c <-chan int) {
for i := range c {
fmt.Println(i)
}
wg.Done()
}
错误处理
如果启动了一个协程,但是该协程出现了panic
,如果没有捕获这个panic
,就会造成整个程序的崩溃。这时可以在协程函数中使用recover捕获panic处理,防止影响其他协程以及主协程。
select关键字
解决管道阻塞
package main
import "fmt"
func main() {
intChan := make(chan int, 5)
strChan := make(chan string, 5)
for i := 0; i < 5; i++ {
intChan <- i
strChan <- fmt.Sprintf("hello%d", i)
}
// 不关闭chan
over:
for true { // 循环使用select读取管道,且按顺序读,如果第一个case没有读取到,则从下一个case管道读取
select {
case v := <-intChan:
fmt.Println("从intChan读取数据", v)
case v := <-strChan:
fmt.Println("从strChan读取数据", v)
default:
fmt.Println("都读取完了,退出for")
break over
}
}
}
练习
练习1
启动一个协程,将 1 - 2000
的数字放入到channel
中,然后再启动8个协程,从channel
中取出数字,假设取出的数字是n
,并计算1+2+3+...+n-1+n
的值,并将结果放置到resChan
中
考虑类型
resChan chan int
是否合适
package main
import "fmt"
const count = 2000
func main() {
c := make(chan int, 800)
resChan := make(chan int, count)
exitChan := make(chan bool, 8)
// 添加数字
go addNum(c)
// 计算数字
for i := 0; i < 8; i++ {
go calc(c, resChan, exitChan)
}
// 检测计算数字协程的退出状态
go func() {
for i := 0; i < 8; i++ {
<-exitChan
}
// 八个全部取出,说明八个协程全部执行完毕,现在可以关闭通道了
close(exitChan)
close(resChan)
}()
// 取出结果, 放在主协程上,放置主协程提前退出
for i := range resChan {
fmt.Printf(" %d ", i)
}
}
func addNum(c chan int) {
for i := 1; i <= count; i++ {
c <- i
}
close(c)
}
func calc(c chan int, resChan chan int, exitChan chan bool) {
for n := range c {
sum := 0
for i := 1; i <= n; i++ {
sum += i
}
// 计算完毕,返回结果
resChan <- sum
}
// 如果该协程计算完毕,则向退出通道发送一条消息
exitChan <- true
}
练习2
开一个协程writeDataToFile
,生成1000个随机数,存放到文件中。当writeDataToFile
完成1000个数据到文件后,让sort
协程从文件中读取1000个数字,并完成排序,最后将结果写入到另一个文件
最后更新于
这有帮助吗?