七叶笔记 » golang编程 » 七爪源码:Golang、Goroutines 和 Channels 中的并发性解释

七爪源码:Golang、Goroutines 和 Channels 中的并发性解释

go 中的主并发

Go 是一门功能强大的语言,因为它有效且高效地处理了几件事情,其中一个使它迷人和强大的事情是它如何处理 并发性

在本文中,我的目标是全面解释并发的核心概念,以及 Golang 完成并发程序所遵循的方法。

让我们从定义什么是并发开始:

如果一个程序可以同时处理多个任务,则该程序被认为是并发的。 并发的概念涉及程序同时运行多个操作的能力,即使这并不一定意味着这些操作显式地同时运行,每个任务可以在不同的时间点开始。 让我们看一个图表,以更好地掌握并发操作的样子。

这些操作可能不会同时开始,但它们彼此同时运行。这意味着一个任务不必等到另一个任务完成才能运行。

另一方面,如果您的程序同时运行多个操作,这意味着它们完全在同一时间点开始,这将被视为并行性。使用并发时,您的程序也可以实现并行性,具体取决于用例。

Go lang 并发模型

Golang 使用 goroutine 实现并发。 goroutine 由 Go 运行时管理,与 线程 非常相似,但有几个优点。 Goroutines 允许你同时运行多个操作。在 多线程 环境中,为了并发运行各种操作,操作系统必须创建一个新线程,这涉及到大量的资源、内存和时间,因此使用线程并发运行多个操作对操作系统来说成本更高。另一方面,goroutine 是轻量级的、高效的,并且创建起来不需要太多资源,在 Go 中旋转数百个 goroutine 不是问题。

与 goroutine 共享资源

当运行多个 goroutine 完成不同的任务时,很多时候你会发现 goroutine 需要访问和修改共享资源,如果多个 goroutine 同时访问和修改相同的数据,这会导致几个问题,意想不到的结果以及所谓的竞争条件。

让我们在并发程序的上下文中定义什么是竞争条件。

当同时运行的多个操作尝试同时读取/写入相同的数据时,就会出现竞争条件。

为了避免这种情况,golang 使用了锁,这样每次只有一个 goroutine 可以修改某条数据。这是一个非常重要的话题

因为实现并发时的一个关键组件是确保您的程序不会以意外结果结束。

现在,让我们编写我们的第一个并发程序,看看如何使用 go 关键字创建一个 goroutine:

 package main

import "fmt"

func main() {
  fmt.Println("Main function")
  
  go countNumbers(20)
  
  fmt.Println("End main function")
}
  
func countNumbers(limit int) {
   num := 0
   for i := 1; i < limit; i++ {
     num+=i
   }
  fmt.Println("Num: ", num)
}  

在这个例子中,我们有 main 函数 ,它只在第 6 行打印一条消息,当程序运行时,它会创建所谓的主 goroutine,这个 goroutine 在运行程序时自动创建,它是执行所有代码的地方,但是如果你想在一个单独的 goroutine 中运行另一个操作,我们可以在我们想要并发运行的函数之前使用 go 关键字,这将有效地创建一个新的 goroutine 并运行该函数。

在第 8 行,我们告诉程序为 countNumbers 函数创建一个新的 goroutine。然后回到主 goroutine,在第 10 行有另一个打印语句。

那么,为什么我们看不到 countNumbers 函数中的 print 语句呢?嗯,这是因为主协程没有等待其他协程完成工作,主协程会继续执行主程序,它会终止而不等待查看其他协程是否完成。

为了让 countNumbers 函数完成,我们可以在第 9 行的主 go 例程中休眠 1 秒。

 time.Sleep(1 * time.Second)  

如果你再次运行程序,你现在会看到 print 语句,这个解决方案在任何并发程序中都不理想,所以我们稍后会看到如何使用通道来完成。

什么是渠道?

通道是不同 goroutine 之间的一种通信方式。这是一种将一种数据类型的值发送到另一个 goroutine 的安全方式。通道通过引用传递意味着当创建通道然后将其传递给其他函数时,这些函数将具有指向同一通道的相同引用。如果您了解指针的工作原理,那么使用通道可能很容易理解。

我们只能比较具有相同类型的通道,并且正如我之前提到的,因为它们是通过引用传递的,如果两个通道都指向内存中的相同引用,则两个通道之间的比较将评估为真。我们还可以将通道与 nil 进行比较。

通道的目的是允许 goroutine 发送和接收信息,但通常它们也用于通知其他 goroutine

一个过程已经完成并且不一定通过通道发送任何信息。

通道也可以关闭,这意味着它将不再接受任何要发送或接收的消息,如果 goroutine 尝试从关闭的通道发送或接收消息,程序将崩溃,除非我们使用特殊语法来读取通道或者我们使用范围循环。我们稍后会看到这是如何工作的。

渠道类型

无缓冲通道:这种类型的通道只允许发送一条数据并阻塞当前的 goroutine,直到另一个在通道上执行接收操作。如果在发送操作之前对通道执行接收操作,也会发生同样的事情,进行接收操作的 goroutine 将被阻塞,直到另一个 goroutine 通过同一通道发送消息。

为了在使用无缓冲通道时演示这种阻塞概念,让我们看以下示例:

 package main

 import  (
"fmt"
"time"
)

 var  (
defaultTags = []string{"SystemUser", "User", "NewUser", "System"}
)

type Tag  struct  {
Name, Type string
}

type User struct {
Id, Name, LastName, Status string
Tags                       []*Tag
}

type Post struct {
Title  string
Status string
UserId string
}

func main() {
blocking()
}
 
/*
Main goroutine will be blocked until second goroutine
sends a message letting the main goroutine know that has finished its work
and so the main go routine can continue
*/func blocking() {
user := &User{}
done := make(chan bool) // unbuffered channel

go func() {
fmt.Println("[Second-GoRoutine] Start Building User")
buildingUser(user)
fmt.Println("[Second-GoRoutine] Finished Building User")
done <- true

fmt.Println("[Second-GoRoutine] Set default user tags")
setDefaultTags(user)
}()

fmt.Println("[Main-Goroutine] Start importing Posts")
posts := importingPosts()
fmt.Println("[Main-Goroutine] Finish importing Posts")
fmt.Println("[Main-Goroutine] -----waiting------")
<-done

 merge UserPosts(user, posts)
fmt.Println("Done!!")
fmt.Printf("User %v\n", user)
for _, post := range posts {
fmt.Printf("Post %v\n", post)
}
}

func mergeUserPosts(user *User, posts []*Post) {
fmt.Println("[Main-Goroutine] Start merging user posts")
for _, post := range posts {
post.UserId = user.Id
}
fmt.Println("[Main-Goroutine] Finished merging user posts")
}

func importingPosts() []*Post {
time.Sleep(1 * time.Second)
titles := []string{"Post 1", "Random Post", "Second Post"}
posts := []*Post{}
for _, title := range titles {
posts = append(posts, &Post{Title: title, Status: "draft"})
}

return posts
}

func buildingUser(user *User) {
time.Sleep(2 * time.Second)
user.Name = "John"
user.LastName = "Doe"
user.Status = "active"
user.Id = "1"
}

func setDefaultTags(user *User) {
time.Sleep(1 * time.Second)
for _, tagName := range defaultTags {
user.Tags = append(user.Tags, &Tag{Name: tagName, Type: "System"})
}
}  

运行前面的示例将输出以下内容:

让我们了解输出。程序启动时,分别在第 36 行和第 37 行创建了一个空的用户对象和一个布尔类型的通道(无缓冲通道),然后在第 39 行创建了一个 goroutine,这意味着该函数中的一段代码将在一个单独的 goroutine。

主 goroutine 继续执行,在第 49 行我们有一个 print 语句,然后在第二个 goroutine 中,由于此时它正在并发运行,它到达第 40 行并且还执行了一个 print 语句。

主 goroutine 继续调用 importingPosts 方法,并且还做了两个 print 语句,最后一个是 [Main-Goroutine] — — -waiting—— ,这就是我们之前谈到的阻塞概念的来源play,在第 53 行我们看到主 goroutine 正在从 donechannel 读取,这基本上意味着主 goroutine 将不会继续执行,直到第二个 goroutine 向该通道发送消息。

在第二个 goroutine 中,调用 buildUser 函数并打印 [Second-GoRoutine] Finished Building User ,然后在下一行中,它向通道发送消息。此时,主 goroutine 将检测到这一点,并将继续执行,以及第二个 goroutine。

方法 mergeUserPosts 和 setDefaultTags 分别在 main 和 second goroutine 中被调用,我们得到它们对应的日志。

当我们到达第 57 到 60 行时,会打印出用户及其帖子,但是如果您检查用户结构中的 tags 数组是空的。原因是第二个 goroutine 向主 goroutine 发送消息后,两个 goroutine 继续并发执行,正如我之前提到的,主 goroutine 不会等到其他 goroutine 执行完毕,也就是说,第二个 goroutine 没有完成它的工作在主 goroutine 完成之前将用户标签附加到结构中,这就是数组为空的原因。如果我们删除第 91 行,我们将能够看到 tags 数组现在已填充。

通过这个例子,我们学习了如何使用内置的 make 函数创建一个无缓冲的通道。

 done := make(chan int)  

以及如何从通道发送和接收数据

 done <- true // send
<-done // receive ignorting value
resp := <-done // receive storing value in a variable  

此外,我们还看到了如果没有其他 goroutine 通过通道发送/接收消息,goroutines 是如何阻止执行的。

通道也被用作连接多个 goroutine 的一种方式,通过使用一个 goroutine 的结果作为另一个 goroutine 的参数。

让我们看一下这次使用多个 goroutine 的下一个示例。

 package main

import (
"fmt"
"time"
)

type Tag struct {
Name, Type string
}

type Settings struct {
NotificationsEnabled bool
}

type User struct {
Id, Name, LastName, Status string
Tags                       []*Tag
*Settings
}

type NotificationsService struct {
}

func main() {
usersToUpdate := make(chan []*User)
userToNotify := make(chan *User)
newUsers := []*User{
{Name: "John", Status: "active", Settings: &Settings{NotificationsEnabled: true}},
{Name: "Carl", Status: "active", Settings: &Settings{NotificationsEnabled: false}},
{Name: "Paul", Status: "deactive", Settings: &Settings{NotificationsEnabled: true}},
{Name: "Sam", Status: "active", Settings: &Settings{NotificationsEnabled: true}},
}
existingUsers := []*User{
{Name: "Jessica", Status: "active", Settings: &Settings{NotificationsEnabled: true}},
{Name: "Eric", Status: "active", Settings: &Settings{NotificationsEnabled: true}},
{Name: "Laura", Status: "active", Settings: &Settings{NotificationsEnabled: true}},
}

go filterNewUsersByStatus(usersToUpdate, newUsers)
go updateUsers(usersToUpdate, userToNotify, existingUsers)
notifyUsers(userToNotify, existingUsers)
}

func filterNewUsersByStatus(usersToUpdate chan<- []*User, users []*User) {
defer close(usersToUpdate)
filteredUsers := []*User{}
for _, user := range users {
if user.Status == "active" && user.Settings.NotificationsEnabled {
filteredUsers = append(filteredUsers, user)
}
}

usersToUpdate <- filteredUsers
}

func updateUsers(usersToUpdate <-chan []*User, userToNotify chan<- *User, users []*User) {
defer close(userToNotify)
for _, user := range users {
user.Tags = append(user.Tags, &Tag{Name: "UserNotified", Type: "Notifications"})
}

newUsers := <-usersToUpdate

for _, user := range newUsers {
time.Sleep(1 * time.Second)
user.Tags = append(user.Tags, &Tag{Name: "NewNotification", Type: "Notifications"})
userToNotify <- user
}
}

func notifyUsers(userToNotify <-chan *User, users []*User) {
service := &NotificationsService{}
for _, user := range users {
service.SendEmailNotification(user, "Tags", "A new tag has been added to your profile!!")
}

for user := range userToNotify {
service.SendEmailNotification(user, "Tags", "You got your first tag!!")
}
}

func (n *NotificationsService) SendEmailNotification(user *User, title, message string) {
fmt.Printf("Email Notification Sent to %v, Hi %s, %s\n", user, user.Name, message)
}  

此示例的输出如下所示:

在此示例中,我们有两个通道 usersToUpdate 和 userToNotify,请注意第一个通道如何接受用户数组,而第二个通道仅接受一个用户对象。 然后有两组用户,一组用于现有用户,一组用于新用户。

在第一个 goroutine 中,我们发送 usersToUpdate 通道和 newUsers 切片,因此当程序到达第 40 行时,会创建一个新的 goroutine。

请注意 usersToUpdate 参数的 filterNewUsersByStatus 函数中的语法。

 usersToUpdate chan<- []*User  

默认情况下,通道是双向的,这意味着您可以通过它们发送和接收信息,但是当将通道传递给函数时,您可以更改此行为并告诉通道在函数的上下文中它只会用于一个目的,要么接收信息或发送信息。

所以在这种情况下,我们告诉频道 usersToUpdate 在这个函数的上下文中,这个频道将只接受发送信息而不接受它。

此函数 filterNewUsersByStatus 范围在 newUsers 上,并且只选择那些处于活动状态并且启用了通知设置的那些。之后在第 54 行中,过滤后的用户通过通道发送。

此时,该通道将不再用于发送数据,因此关闭该通道很重要。在这种情况下,我们使用 defer 函数来调用内置的关闭函数并关闭 usersToUpdate 通道。

在第二个 goroutine 中,我们发送 usersToUpdate 通道、userToNotify 通道和existingUsers 切片。这就是使用通道的结果作为另一个 goroutine 的输入的概念发挥作用的地方。

在这个函数中,我们还为每个通道定义了它是用于接收信息还是发送信息,usersToUpdate 将仅用于接收数据,而 userToNotify 将用于发送数据。

在第 59 行,该函数首先通过为每个用户附加一个新标签来更新现有用户。然后在第 63 行,它创建一个新变量,将其分配给 usersToUpdate 通道的结果。此行将阻止该 goroutine 的执行,直到通道发送消息。换句话说,如果 filterNewUsersByStatus 需要很长时间才能发送经过过滤的用户,那么这个 goroutine 将不得不在这一行中等待才能继续。

一旦接收到数据,这个 goroutine 会覆盖 newUsers 并更新它们的标签,但也会在第 68 行通过 userToNotify 通道发送用户。

userToNotify 也需要在这个函数完成它的工作后关闭,所以在第 58 行我们有一个 defer 来关闭通道。

然后在第 42 行,在主 goroutine 中调用了一个函数,该函数将通知用户,它将 userToNotify 通道和现有用户作为参数。

此函数首先初始化用于发送通知的服务,然后覆盖现有用户并向每个用户发送电子邮件通知。

然后在第 78 行,它覆盖了 userToNotify 通道,对于通过此通道发送的每个用户,此函数都会向该用户发送电子邮件通知。这种语法允许我们接收通过这个通道发送的所有信息,一旦通道关闭,for循环也会中断。这将阻止我们从封闭的频道阅读,正如我之前提到的,这是确保您不会从封闭的频道阅读的一种方法。另一种语法如下:

 resp, ok := <-userToNofity  

如果我们从一个封闭的通道中读取,ok 变量将为假,否则为真,但它不会恐慌。

正如您在此示例中看到的那样,这些函数将同时运行,并且它们使用通道相互通信,以发送有关过滤用户和要通知的用户的信息。

在这个例子中,我们学习了如何使用 defer 和 close 函数来关闭通道。

 defer close(done)  

以及如何在将通道传递给函数时使通道单向

 userToNotify <-chan *User // read-only channel
userToNotify chan<- *User // send-only channel  

如何在通道上进行范围,当我们不知道将通过通道发送多少项目但我们想读取所有项目时,这很有用。

 for user := range userToNotify {}  

缓冲通道:这种类型的通道允许您存储多个由容量指定的数据,当达到该容量时,发送到通道的后续消息将阻塞,直到至少读取一条消息,以便通道具有 再次容量。

要创建缓冲通道,我们只需向 make 函数传递一个附加参数:

 ans := make(chan int, 5)  

这个通道将接受 5 个整数而不阻塞 goroutine,但是如果第 6 个整数被发送到这个通道,那么它会阻塞直到执行接收操作。 如果通道是空的并且执行了接收操作,也会发生同样的事情,它会阻塞直到执行了发送操作。

用于跟踪通道容量的数据结构是队列,这意味着第一个进入队列的元素将是第一个离开队列的元素。

让我们使用以下代码来研究一下:

 package main

import (
"fmt"
"time"
)

func main() {
names := make(chan string, 3)
go generateName(names)

  // Simulate that a different process takes 5 seconds to run before the main goroutine
  // starts reading values from the channel.
time.Sleep(5 * time.Second) 
  
for name := range names {
fmt.Printf("Name received: %v\n", name)
}
}

func generateName(names chan<- string) {
defer close(names)
for _, name := range []string{"Carl", "Paul", "May", "Laura", "John"} {
time.Sleep(1 * time.Second) // Simulate some latency of 1sec before sending each name to the channel
names <- name
fmt.Printf("Name sent: %v\n", name)
}
}  

在上面的场景中,我们创建了一个容量为 3 的缓冲通道,这意味着它一次可以容纳 3 个字符串而不会阻塞 goroutine。 在第 10 行,创建了一个 goroutine 并传入了通道。该函数将向通道发送多个名称,每次发送之间的延迟为 1 秒,当完成发送名称时,通道将使用 defer 关闭。

在 main goroutine 中,有一个 sleep 函数调用来模拟 main goroutine 执行另一个操作,该操作需要 5 秒才能从通道中读取值。

让我们看看这段代码的输出:

如您所见,前 3 个名称被发送到通道而没有阻塞,因为缓冲大小为 3,但之后,第二个 goroutine 的执行会阻塞,直到从通道中读取至少一个元素,此时主 goroutine 启动从通道读取,第二个 goroutine 被解除阻塞并继续发送剩余的名称。

关键要点

  • 使用 goroutine 来加速你的 Go 程序。
  • 使用 make 关键字创建无缓冲通道。
  • 使用 make 关键字指定创建缓冲通道的容量。
  • 使用以下语法从通道读取数据 resp := <-names。
  • 使用此语法将数据发送到通道 numbers <- num。
  • 使用范围 for 循环读取发送到通道的所有数据。
  • 使用 defer 和 close 内置函数关闭通道。
  • 在不同的 goroutine 之间阻塞概念。
  • 将双向通道更改为函数上下文中的仅发送或只读通道。
  • 使用通道在不同的 goroutine 之间进行通信。

我们已经在 Golang 中看到了很多与并发相关的概念。我希望你喜欢它并从这篇文章中学习!

感谢您的阅读。敬请关注。

相关文章