GoRoutine和通道并发教程

GoRoutine 是一个与其他 goroutine 同时执行的函数。

我们使用 goroutines 来启动多个线程,这有助于我们实现并发性。并发性是指程序在重叠的时间段内独立运行多个任务的能力。

func randSleep(name String, limit int, sleep int){
 for(i=1;i<=n;i++){
 fmt.println(name,rand.Intn(i))
 time.sleep(time.Duration(sleep*int(time.second)))
 }
}

func main(){
go randSleep("first:",4,3)
go randSleep(
"second:",4,3)
}

上述程序不会在终端打印任何内容,因为在执行 goroutines 之前,主函数已经完成。

但是,如果在 goroutine 之后有任何顺序代码,那么该 goroutine 将暂时执行,直到顺序代码执行完毕。例如

func randSleep(name String, limit int, sleep int){
 for i=1;i<=n;i++{
 fmt.println(name,rand.Intn(i))
 time.sleep(time.Duration(sleep*int(time.second)))
 }
}

func main(){
 go randSleep("first:",4,3)
 randSleep(
"second:",4,3)
}

现在,在上述代码中,不管 goroutine 是否完成,goroutine 都将与顺序代码(main中第二行)同时运行,因为它在等待我们的顺序代码执行。

为了解决上述问题,我们将使用同步软件包中的等待组。

等待组Wait Groups
因此,Wait Groups 允许程序等待指定的 goroutines。这些是 Golang 中的同步机制,可以阻止程序的执行,直到 WaitGroup 中的 goroutines 完全执行。

func randSleep(wg *sync.WaitGroup, name String, limit int, sleep int){

defer wg.Done() //altering/subtracting the counter of goroutine when it completes 
 for i:=1;i<=limit;i++{
 fmt.println(name,rand.Intn(i))
 }
}

func main(){
wg:=new(sync.WaitGroup)
//creates a new wait group 

wg.Add(2)
//it informs that it must wait for two goroutines

go randSleep(wg,
"first:",5,3)
go randSleep(wg,
"second:",5,3)

wg.Wait()
//阻止执行,直到 goroutine 执行完毕;
}


因此,整个过程就像是在 wg.Add() 中向计数器加数,在 wg.Done() 中从计数器中减数,然后在 wg.Wait() 中等待计数器归零。

通道Channel
这是一种允许 GoRoutines 交换数据的通信机制。通道是双向的,这意味着任何一方都可以发送或接收信息。

Go 中的双向信道channel是阻塞的,这意味着在向信道发送数据时,Go 会等待直到从信道读取数据后才继续执行。

func writeChannel(wg *sync.WaitGroup, ch chan int, stop int){
defer wg.Done()
for i:=1;i<=stop;i++{
ch<-i
}
}

func readChannel(wg *sync.WaitGroup, ch chan int, stop int){
defer wg.Done()
for i:=1;i<=stop;i++{
fmt.println(<-ch)
}
}

func main(){
wg:=new(sync.WaitGroup)
wg.Add(2)
limitChannel:=make(chan int) //unbuffered channel
defer close(limitChannel)
go writeChannel(wg,limitChannel,3)
go readChannel(wg,limitChannel,3)
wg.Wait()
}

因此,发送操作的次数必须等于接收操作的次数,否则就会出现死锁。

上述代码中怎么会出现死锁呢?

如果 `writeChannel` goroutine 向通道写入的值多于 `read Channel` goroutine 能读取的值,那么当尝试写入时,`writeChannel` goroutine 最终会阻塞,因为通道的缓冲区已满。

由于 `writeChannel` goroutine 被阻塞而无法继续,因此它无法执行 `wg.Done()` 语句,该语句将递减 `sync.WaitGroup` 计数器。

因此,`main`函数中的`wg.Wait()`调用将永远不会返回,程序将陷入死锁状态。

go中的信道类型
Go主要包括两种类型的信道:缓冲信道和非缓冲信道。下面我们就来看看这两种信道有什么不同以及它们有什么好处。

1、缓冲通道

  • 我们可以创建一个可以存储多个值的通道,这意味着向通道发送数据不会阻塞,直到超出其容量。
  • 在缓冲通道中,程序之间的通信是异步的。异步的基本意思是,我们的程序不会处于阻塞状态,而程序的其他功能将继续进行。
  • 缓冲通道遵循队列数据结构,即先插入通道的值将先跳出。
  • 在缓冲通道的情况下,即使我们在通道中填充值,发送方的 goroutine 也不会被阻塞。它将把数据发送到通道上,这些数据基本上会被排队,而 goroutine 会继续做它应该做的事情。

func writeChannel(wg *sync.WaitGroup, limitchannel chan int, stop int) {
defer wg.Done()
for i := 1; i <= stop; i++ {
limitchannel <- i
}
}

func main() {
wg := new(sync.WaitGroup)
wg.Add(1)
limitchannel:= make(chan int, 2)
defer close(limitchannel)
go writeChannel(wg, limitchannel, 2)
wg.Wait()
fmt.Println(<-limitchannel)
fmt.Println(<-limitchannel)
}

如果缓冲通道达到其容量怎么办?

因此,如果缓冲通道实际上已经满了,而发送程序试图在此时将更多数据放入通道,那么该特定程序将被阻塞,直到从该通道读取数据为止。

2、无缓冲通道

  • 默认情况下,通道是无缓冲的,也就是说,一旦有信息发送到通道,通道就需要一个接收器。
  • 在接收者的 goroutine 从通道读取信息之前,发送者的 goroutine 将处于阻塞状态。
  • 默认情况下,一旦我们将数据输入通道,发送方的 goroutine 就会转入等待状态。
  • 无缓冲通道只允许程序进行同步通信。同步通信是指发送方需要等待接收方的响应。

无缓冲通道: Fork-Join模型

func main(){
myChannel:=make(chan string)

// 这是我的匿名函数
go func(){
myChannel <-
"data"
}()
//在这里,我调用了函数

msg:=<-myChannel
fmt.Println(msg)
}

在上面的示例中,main 创建的 go 例程已经被分叉(从 main 中释放),并且正在执行某些操作。但我们借助通道重新连接了它。

因此,在我们收到通道的值或关闭通道之前,main 不会终止。

select
select 语句会阻塞主函数的执行,直到其中一个情况可以运行,一旦收到来自通道的信息,它就会执行该代码块中的代码。

如果多个通道同时接收 select 语句中的值,那么它将随机选择其中任何一个通道。

func main(){

go func(){
myChannel <- "data"
}()

go func(){
anotherChannel <-
"data data"
}()

select{
case msgFromMyChannel := <-myChannel:
fmt.Println(msgFromMyChannel)

case msgFromAnotherChannel := <-anotherChannel:
fmt.Println(msgFromAnotherChannel)
}
}

func main(){

charChannel:= make(chan string,3)
chars:=[]string{"a","b","c"}

for _,s:=range chars{
select{
case charChannel <- s:
  }
}

close(charChannel)
for result:= range charChannel{
fmt.Println(result)
  }
}

关闭通道
在 Golang 中,关闭通道指的是我们想表明该通道上的工作已经完成,无需再开放。

一旦我们关闭了通道,该通道将不再接受任何值,并且会发送一条消息来表明通道已关闭。

func doWork(done <-chan bool){ //this way, the channel will behave as a read-only channel.
for{

Select{
case <- done:
return
default:
fmt.Println(
"Doing Work")
    }
  }
}

func main(){
done:= make(chan bool)
go doWork(done)
time.Sleep(time.second*3)
close(done)
// 主 go 例程将在 3 秒后向子 goroutine 发送信号,从而终止子 goroutine。
}

注:正如《Go 编程语言》所言,当您完成每个通道时,没有必要关闭每个通道。只有在需要告知接收程序所有数据已发送完毕时,才有必要关闭通道。

管道
按照定义,它只是多种并发设计模式中的一种。非正式地讲,流水线是一组由通道连接并由 goroutines 处理的阶段。

每个阶段可以有任意数量的入站和出站通道,但第一阶段和最后阶段除外,它们分别只包含出站或入站通道。

开始(输入)-> 第一阶段(做某事)-> 第二阶段(做某事)-> 结束

上述设计的模型被称为 "流水线",其中每个阶段都负责执行某些操作,我们不断地将数据从一个阶段传递到另一个阶段。

func sliceToChannel(nums []int) <-chan int{ //it will return read-only channel
out:= make(chan int)
go func(){
for _,n := range nums{
out <- n
    }
close(out)
  }()
return out
}

上述函数的解释:我们创建了一个无缓冲通道,这意味着在我们从通道中读取数值之前,通道将被阻塞。因此,一旦我们在循环内的通道中插入数值,该循环将被阻塞,直到我们读取数值。因此,返回语句将被执行,而在后台,goroutine 仍在运行。


func sq(in <-chan int) <-chan int{
out:= make(chan int)
go func(){
for n:= range in{
out <- n*n
    }
close(out)
  }()
return out
}

上述函数的解释:现在,在这个函数中,我们正在对从 sliceToChannel 函数返回的通道进行循环。因此,我们将读取通道中的一个值,读取完成后,下一个值将通过 sliceToChannel 函数后台运行的 goroutine 插入通道。因此,这两个函数都将遵循同步模式。

func main(){
//input
nums:=[]int{2,3,4,7,1}
//stage 1
dataChannel:= sliceToChannel(nums)
//stage 2
finalChannel := sq(dataChannel)
//stage 3
for n:=range finalChannel{
fmt.Println(n)
  }
}