七叶笔记 » golang编程 » 七爪源码:使用 GoLang 进行并发处理

七爪源码:使用 GoLang 进行并发处理

Go 中的主并发

GoLang 对并发程序的支持令人难以置信,在本文中,我们将了解如何优化处理 CSV 文件以向其用户发送 SMS 通知的程序。

如果您不熟悉 GoLang 并希望更好地了解并发的工作原理,我建议您先阅读这篇文章:GoLang 中的并发、Goroutines 和 Channels Explained。

在撰写本文时,我们将使用 CSV 文件,该程序的目的是读取文件并处理其数据。 文件内容代表 3,000 个用户的列表。

程序应读取此文件并向每个用户及其朋友发送通知。

您可以在此 Github 存储库中找到源代码以供参考。

好吧,让我们开始吧。 我们将首先创建一个 go 模块、main.go 文件和一个名为 CSV 的包。 这个包将有一个名为 ProcessFile 的方法,该方法将从主包中调用。 让我们首先创建 Go 模块。

 go mod init github.com/GithubHandle/ File Processing  

初始文件夹结构如下所示:

如您所见,还有一个students.csv 文件,您可以从存储库中获取该文件。 main.go 和 csv.go 文件如下所示:

 package mainimport "github.com/YairFernando67/fileProcessing/csv"func main() {
  csv.ProcessFile()
}  

 package csvfunc ProcessFile() {}  

现在,让我们打开文件并读取其内容:

 package csv

 import  (
	"bufio"
	"fmt"
	"log"
	"os"
	"strings"
)

 const  (
	FILE_NAME = "csv/students.csv"
)

func ProcessFile() {
	f, err := os.Open(FILE_NAME)
	if err != nil {
		log.Fatal(err)
	}

	users := scanFile(f)
	fmt.Printf("users %v\n", users)
}

func scanFile(f *os.File) []*User {
	s := bufio.NewScanner(f)
	users := []*User{}
	for s.Scan() {
		line := strings. Trim (s.Text(), " ")
		lineArray := strings.Split(line, ",")
		ids := strings.Split(lineArray[5], " ")
		ids = ids[1 : len(ids)-1]
		user := &User{
			Id:        lineArray[0],
			Name:      lineArray[1],
			LastName:  lineArray[2],
			Email:     lineArray[3],
			Phone:     lineArray[4],
			FriendIds: ids,
		}
		users = append(users, user)
	}
	return users
}  

在上面的代码中,我们首先使用 os 包打开 csv 文件。 然后将该文件传递给另一个名为 scanFile 的函数。 该函数使用 bufio 包初始化一个新的扫描仪并扫描文件的每一行。 对于每一行,我们提取有关用户的信息并将其附加到用户切片中。 扫描完成后,该函数将用户切片返回给调用者。

用户结构在 csv/user.go 中定义,看起来像这样。

 package csvtype User  struct  {
  Id, Name, LastName, Email, Phone string
  FriendIds                 []string
}  

如果您运行该程序,您将看到用户部分被打印到控制台。

亲爱的,我们现在将文件的内容表示为一个用户切片,我们现在可以使用这个切片向每个用户及其朋友发送 SMS 通知。

为此,我们将首先了解如何在不使用并发的情况下按顺序执行此操作。 然后我们将修改程序以使其更快。

 package csv

import (
	"bufio"
	"fmt"
	"log"
	"os"
	"strings"
	"time"
)

const (
	FILE_NAME = "csv/students.csv"
)

func ProcessFile() {
	f, err := os.Open(FILE_NAME)
	if err != nil {
		log.Fatal(err)
	}

	users := scanFile(f)

	// sequential processing
	sequentialProcessing(users)
}

func sequentialProcessing(users []*User) {
	visited := make(map[string] bool )
	for _, user := range users {
		if !visited[user.Id] {
			visited[user.Id] = true
			sendSmsNotification(user)
			for _, friendId := range user.FriendIds {
				friend, err := findUserById(friendId, users)
				if err != nil {
					fmt.Printf(" err or %v\n", err)
					continue
				}

				if !visited[friend.Id] {
					visited[friend.Id] = true
					sendSmsNotification(friend)
				}
			}
		}
	}
}

func sendSmsNotification(user *User) {
	time.Sleep(10 * time.Millisecond)
	fmt.Printf("Sending sms notification to %v\n", user.Phone)
}

func findUserById(userId string, users []*User) (*User, error) {
	for _, user := range users {
		if user.Id == userId {
			return user,  nil 
		}
	}

	return nil, fmt.Errorf("User not found with id %v", userId)
}  

对于顺序处理,我们创建了一个函数并将用户传递给它。 此功能覆盖用户,并为每个用户检查之前是否已访问过,如果尚未访问过,则将用户标记为已访问并发送 SMS 通知。 然后它还会遍历用户的朋友 id,找到每个用户,并执行相同的步骤,检查它是否已被访问,将其标记为已访问,并发送通知。

在 sendSmsNotification 函数中,我们使用 time.Sleep 函数来模拟发送通知的一些延迟。

让我们在这个版本的程序上运行一个基准测试,看看它有多快。 这是代码:

 package csv

import "testing"

func BenchmarkProcessFile(b *testing.B) {
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		ProcessFile()
	}
}  

处理所有用户需要 199.723 秒。 让我们看看如何使用并发来提高程序的性能。 为此,我们将添加另一种方法来同时处理用户。

 package csv

import (
	"bufio"
	"fmt"
	"log"
	"os"
	"strings"
	"time"
)

const (
	FILE_NAME      = "csv/students.csv"
	MAX_GOROUTINES = 10
)

func ProcessFile() {
	f, err := os.Open(FILE_NAME)
	if err != nil {
		log.Fatal(err)
	}

	users := scanFile(f)

	// sequential processing
	// sequentialProcessing(users)

	// concurrent processing
	concurrentProcessing(users)
}

func concurrentProcessing(users []*User) {
	usersCh := make(chan []*User)
	unvisitedUsers := make(chan *User)
	go func() { usersCh <- users }()
	initializeWorkers(unvisitedUsers, usersCh, users)
	processUsers(unvisitedUsers, usersCh, len(users))
}

func initializeWorkers(unvisitedUsers <-chan *User, usersCh chan []*User, users []*User) {
	for i := 0; i < MAX_GOROUTINES; i++ {
		go func() {
			for user := range unvisitedUsers {
				sendSmsNotification(user)
				go func(user *User) {
					friendIds := user.FriendIds
					friends := []*User{}
					for _, friendId := range friendIds {
						friend, err := findUserById(friendId, users)
						if err != nil {
							fmt.Printf("Error %v\n", err)
							continue
						}
						friends = append(friends, friend)
					}

					_, ok := <-usersCh
					if ok {
						usersCh <- friends
					}
				}(user)
			}
		}()
	}
}

func processUsers(unvisitedUsers chan<- *User, usersCh chan []*User, size int) {
	visitedUsers := make(map[string]bool)
	count := 0
	for users := range usersCh {
		for _, user := range users {
			if !visitedUsers[user.Id] {
				visitedUsers[user.Id] = true
				count++
				if count >= size {
					close(usersCh)
				}
				unvisitedUsers <- user
			}
		}
	}
}  

对于这个并发实现,我们创建了两个通道,usersCh 将保存初始用户列表,unvisitedUsers 将保存单个未访问用户。

在第 35 行,我们将我们作为参数获得的初始用户列表提供给第一个通道到此函数中。它运行在一个单独的 goroutine 中,因为我们不希望主 goroutine 被阻塞,这是我在本文中谈到的一个概念,如果你仍然对阻塞概念感到困惑,你可以去看看。

然后我们调用initializeWorkers。这个函数本质上初始化了由常量 MAX_GOROUTINES 确定的 N 个 goroutine,在这种情况下,我们将从 10 个开始。每个 worker 都是一个监听 unvisitedUsers 通道的函数,对于它接收到的每个用户,它都会向它发送 SMS 通知该用户还通过在列表中查找每个用户然后将用户发送到 usersCh 频道来处理其朋友 ID。用户好友 ID 的处理在单独的 goroutine 中运行,因为我们也不想在这里阻塞当前的 goroutine。

这个函数将允许程序有 10 个 goroutine 等待用户被发送到 unvisitedUsers 通道。并且这些 goroutine 中的每一个都会同时运行,从而提高程序的性能。

这里要注意的另一件重要的事情是,在第 57 行中,我们使用此语法检查 usersCh 通道是否仍然打开,这会阻止我们向关闭的通道发送数据。在这个例子中,这很重要,因为我们将关闭通道并且我们不希望其他 goroutine 尝试向该通道发送数据。

然后在第 37 行,我们有 processUsers 函数,该函数在 usersCh 通道上,并且对于它接收到的每个用户列表,它检查用户是否已经被访问过,如果没有,则将其标记为已访问并将用户发送到unvisitedUsers 频道​。此函数还使用 count 变量跟踪处理了多少用户,通过这样做,我们可以检查是否达到第 75 行的大小并关闭将终止程序的 usersCh 通道。

让我们为这个版本的程序运行基准测试,看看它改进了多少。

如您所见,完成只用了 19.936 秒!! 这是一个巨大的性能改进。 我们还可以通过增加或减少 MAX_GOROUTINES 常量来控制我们希望程序拥有多少活动的 goroutines/workers。

如果我们增加工人的数量,我们的程序将运行得更快,因为更多的任务将同时运行。

这是一个很好的例子,说明了如何使用通道和 goroutine 来提高程序的性能。 通道是 goroutine 之间的一种很好的通信方式,goroutine 允许您同时运行代码。

我希望你发现这篇文章很有用并学到了一些新东西。 感谢您的阅读。 敬请关注。

相关文章