并发编程

goroutine

启动单个goroutine

package main

import "fmt"

func hello() {
  fmt.Println("Hello Gorotine")
}

func main() {
  go hello() // 启动一个goroutine,并发调用函数hello
  fmt.Println("main done!")
  time.Sleep(time.Second) // 防止hello协程还未执行,主线程就退出了
}

运行结果:

main done!
Hello Gorotine

sync.WaitGroup

sync.WaitGroup类似Java中的CountDownLaunch,当sync.WaitGroup中的值为0时,阻塞将会被放开,用于保证一个或者多个goroutine完成运行:

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func hello(i int) {
	defer wg.Done() // goroutine结束就登记-1
	fmt.Println("Hello ", i)
}

func main() {
	for i := 0; i < 100; i++ {
		wg.Add(1) // 启动一个goroutine就登记+1
		go hello(i)
	}
	fmt.Println("main done!")
	wg.Wait() // 等待所有goroutine结束登记
}

channel

虽然goroutine可以通过共享内存的方式来相互通讯进行数据共享,但是这种方式往往会造成资源竞争,从而发生并发安全问题。如果通过锁机制解决资源竞争问题,势必会导致性能下降。

Go语言的并发模型CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循**先入先出(First In First Out)**的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

注意事项:

  1. 一个先进先出的队列

  2. 管道是数据安全的,不需要加锁

  3. 管道是有类型的,一个string类型的channel只能存放string

  4. chan是引用类型,chan必须初始化make之后才能使用

  5. 在没有使用协程的情况下,如果取/读完了再去取/读就会报dead lock错误

无缓冲channel(同步通道)

无缓冲channel通讯示例(正常情况):

package main

import (
	"fmt"
	"time"
)

func main() {
  var ch chan int
	fmt.Println(ch) // <nil> channel 是引用类型
  
	// 创建的chan类型变量必须通过make函数初始化之后才可以使用
	ch = make(chan int) // 创建一个无缓冲通道
  
	go recv(ch)             // 启动一个goroutine用于接收main从通道ch发送的值
	// 向通道发送值
	ch <- 10 // 此处将会阻塞,等待recv消费完消息后才会继续执行
	fmt.Println("main:发送数字10成功")
}

func recv(ci chan int) {
	time.Sleep(5 * time.Second)
	// 消费通道消息
	i := <-ci
	fmt.Println("recv:消费了通道内的值 ", i)
}

返回结果:

<nil> 
# ... 这里会等待五秒钟
recv:消费了通道内的值  10
main:发送数字10成功

有发送,但是无接收:

package main

import "fmt"

func main() {
	// 创建的chan类型变量必须通过make函数初始化之后才可以使用
	var ch chan int = make(chan int) // 创建一个无缓冲通道

	// 向通道发送值
	ch <- 10 // 无缓冲通道需要有接收方才能发送信息,否则将会一直阻塞在此处
  // 因为没有任何接收方,main会一直阻塞在此处,所以代码发生了死锁
	// 故此处代码运行时将会抛出异常 fatal error: all goroutines are asleep - deadlock!

}

有接收,但是没有发送:

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func main() {
	var ch = make(chan int) // 创建一个无缓冲通道
	wg.Add(1)
	go recv(ch)
	wg.Wait()
}

func recv(ch chan int) {
	i := <-ch // fatal error: all goroutines are asleep - deadlock!
	// 消费方也会从通道中取信息,如果信息不存在,会一直阻塞等待,故也会引发死锁错误
	fmt.Println("recv:消费了通道内的值 ", i)
	wg.Done()
}

无缓冲通道总结:

  1. 发送方发送数据到通道,发送方将会开始阻塞,等待者消费者取出,当消费取出数据时,才会继续执行。如果没有消费者,或者消费者没有消费数据,将会一直阻塞,造成程序死锁报错

  2. 接收方从通道接收数据,接收方将会开始阻塞,等待发送者发送,直到接收方接收到数据时,才会继续执行。如果没有发送者或者发送者没有像通道发送数据,将会一直阻塞,造成程序死锁报错

有缓冲channel

有时候发送方发送数据不想等待接收方接收数据后才执行,这时以采用有缓冲通道,有缓冲通道的定义与无缓冲的区别在于只需要在make函数初始化channel时,使用第二个参数指定缓冲队列的长度即可,代码如下:

package main

import "fmt"

func main() {
  // 创建一个容量为1的有缓冲区通道, 只要通道缓冲区容量大于0,就是有缓冲通道
	ch := make(chan int, 1)
	ch <- 10 // 通道不会阻塞,因为防止在了通道缓冲区
	fmt.Println("发送成功")
	ch <- 20 // 通道已经满了,当前协程将会阻塞,所以报出运行时异常 fatal error: all goroutines are asleep - deadlock!
}

关闭channel

使用内置函数close()可以关闭channel,被关闭后,程序不能向channel继续写入数据了,但是可以继续从中读取数据。

package main

import "fmt"

func main() {
	ic := make(chan int, 3)
	ic <- 100
	ic <- 200
	close(ic) // 关闭管道
	// 再次写入就会报错
	// ic <- 300 // panic: send on closed channel
	// 但是可以继续读取
	n := <-ic
  fmt.Println(n)
}

注意,当从一个被关闭的通道中拿取数据时,如果通道中没有数据,该操作将不会再阻塞,而是返回一个false的状态位:n, ok := <- ic,ok == false

遍历channel

package main

import "fmt"

func main() {
	ic := make(chan int, 3)
	ic <- 100
	ic <- 200
	ic <- 300 // panic: send on closed channel

	// 如果放入之后,没有关闭,在循环取出时,取出到最后一条,将会发生死锁错误
	close(ic)

	for i := range ic {
		fmt.Printf("i=%v\n", i) // 假设没有关闭,将会报错 error: all goroutines are asleep - deadlock!
	}
}

上面的代码等同于如下循环:

package main

import "fmt"

func main() {
	ic := make(chan int, 3)
	ic <- 100
	ic <- 200
	ic <- 300 // panic: send on closed channel

	// 如果放入之后,没有关闭,在循环取出时,取出到最后一条,将会发生死锁错误
	close(ic)

	for {
		i, ok := <-ic
		if !ok {
			break
		}
		fmt.Printf("i=%v\n", i) // 假设没有关闭,将会报错 error: all goroutines are asleep - deadlock!
	}
}

只写只读管道

package main

func main() {
	// 默认情况下,管道是双向的,也就是可读可写
	var chan1 chan int

	// 也可以在声明管道时,将管道设置为只读或者只写

	// 只写管道
	var chan2 chan <- int
	chan2 = make(chan int, 3)

	chan2 <- 20
	num := <- chan2 // Invalid operation: <- chan2 (receive from the send-only type chan<- int)

	// 只读管道
	var chan3 <- chan int
	chan3 = make(chan int, 3)
	chan3 <- 30 // Invalid operation: chan3 <- 30 (send to the receive-only type <-chan int)
}

**使用场景:**可以限制某个协程只读、只写管道,如下:

package main

import (
	"fmt"
	"sync"
)

var wg sync.WaitGroup

func main() {
	c := make(chan int, 3)
	wg.Add(2)
	go send(c)
	go recv(c)
	wg.Wait()
}

// 发送方只写
func send(c chan <- int) {
	for i := 0; i < 10; i++ {
		c <- i
	}
	close(c)
	wg.Done()
}

// 接收方只读
func recv(c <-chan int) {
	for i := range c {
		fmt.Println(i)
	}
	wg.Done()
}

错误处理

如果启动了一个协程,但是该协程出现了panic,如果没有捕获这个panic,就会造成整个程序的崩溃。这时可以在协程函数中使用recover捕获panic处理,防止影响其他协程以及主协程。

select关键字

解决管道阻塞

package main

import "fmt"

func main() {
	intChan := make(chan int, 5)
	strChan := make(chan string, 5)

	for i := 0; i < 5; i++ {
		intChan <- i
		strChan <- fmt.Sprintf("hello%d", i)
	}

	// 不关闭chan
over:
	for true { // 循环使用select读取管道,且按顺序读,如果第一个case没有读取到,则从下一个case管道读取
		select {
		case v := <-intChan:
			fmt.Println("从intChan读取数据", v)
		case v := <-strChan:
			fmt.Println("从strChan读取数据", v)
		default:
			fmt.Println("都读取完了,退出for")
			break over
		}
	}

}

练习

练习1

启动一个协程,将 1 - 2000 的数字放入到channel中,然后再启动8个协程,从channel中取出数字,假设取出的数字是n,并计算1+2+3+...+n-1+n的值,并将结果放置到resChan

考虑类型 resChan chan int 是否合适

package main

import "fmt"

const count = 2000

func main() {
	c := make(chan int, 800)
	resChan := make(chan int, count)
	exitChan := make(chan bool, 8)

	// 添加数字
	go addNum(c)

	// 计算数字
	for i := 0; i < 8; i++ {
		go calc(c, resChan, exitChan)
	}

	// 检测计算数字协程的退出状态
	go func() {
		for i := 0; i < 8; i++ {
			<-exitChan
		}
		// 八个全部取出,说明八个协程全部执行完毕,现在可以关闭通道了
		close(exitChan)
		close(resChan)
	}()

	// 取出结果, 放在主协程上,放置主协程提前退出
	for i := range resChan {
		fmt.Printf(" %d ", i)
	}
}

func addNum(c chan int) {
	for i := 1; i <= count; i++ {
		c <- i
	}
	close(c)
}

func calc(c chan int, resChan chan int, exitChan chan bool) {
	for n := range c {
		sum := 0
		for i := 1; i <= n; i++ {
			sum += i
		}
		// 计算完毕,返回结果
		resChan <- sum
	}
	// 如果该协程计算完毕,则向退出通道发送一条消息
	exitChan <- true
}

练习2

开一个协程writeDataToFile,生成1000个随机数,存放到文件中。当writeDataToFile完成1000个数据到文件后,让sort协程从文件中读取1000个数字,并完成排序,最后将结果写入到另一个文件

最后更新于