在 Go 语言的并发编程世界中,sync.WaitGroup 是一个简单却极其重要的同步工具。今天我们就来深入探讨一下它的应用场景和使用技巧,帮助你在实际项目中更好地管理并发任务。
什么是sync.WaitGroup?
sync.WaitGroup 是 Go 标准库 sync 包中的一个同步工具,用于等待一组 goroutine 完成执行。它的核心是通过一个计数器来跟踪并发任务的数量:当计数器为零时,等待的 goroutine 可以继续执行。
WaitGroup提供了三个核心方法:
Add(delta int)
: 增加计数器的值,表示需要等待的 goroutine 数量Done()
: 将计数器减 1,表示一个 goroutine 已完成Wait()
: 阻塞当前 goroutine,直到计数器归零
主要应用场景
1. 并行任务执行
当需要同时执行多个任务,并且等待所有任务完成后再进行下一步操作时,WaitGroup 非常有用。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
urls := []string{"https://example.com", "https://example.org", "https://example.net"}
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
fmt.Printf("Fetching data from %s...\n", u)
time.Sleep(2 * time.Second) // 模拟网络请求
fmt.Printf("Data from %s fetched.\n", u)
}(url)
}
wg.Wait()
fmt.Println("All data fetched. Proceeding to next step.")
}
这种模式在爬虫程序、批量文件处理等场景中特别常见。
2. 并行数据处理
处理大量数据时,可以将数据分成多个部分,每个 goroutine 处理一部分,最后汇总结果。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
chunkSize := 2
resultChan := make(chan int)
for i := 0; i < len(data); i += chunkSize {
end := i + chunkSize
if end > len(data) {
end = len(data)
}
wg.Add(1)
go func(chunk []int) {
defer wg.Done()
sum := 0
for _, num := range chunk {
sum += num
}
resultChan <- sum
}(data[i:end])
}
go func() {
wg.Wait()
close(resultChan)
}()
totalSum := 0
for sum := range resultChan {
totalSum += sum
}
fmt.Printf("Total sum of data: %d\n", totalSum)
}
3. 服务初始化与资源清理
在服务启动时,可能需要并行初始化多个组件,等待所有组件初始化完成后再启动服务。同样,在程序退出时,也需要等待所有清理操作完成。
4. 并发测试
在编写并发测试用例时,WaitGroup 可以确保所有测试 goroutine 都完成后才检查测试结果。
func TestConcurrentOperations(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// 执行并发操作
}(i)
}
wg.Wait()
// 检查测试结果
}
高级应用场景
多级等待组
在复杂应用中,可能需要使用嵌套的 WaitGroup 来实现多级同步。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var outerWG sync.WaitGroup
var innerWG sync.WaitGroup
for i := 1; i <= 2; i++ {
outerWG.Add(1)
go outerWorker(i, &outerWG, &innerWG)
}
outerWG.Wait()
fmt.Println("All outer workers have completed.")
}
func outerWorker(id int, outerWG, innerWG *sync.WaitGroup) {
defer outerWG.Done()
fmt.Printf("Outer Worker %d started\n", id)
for j := 1; j <= 3; j++ {
innerWG.Add(1)
go innerWorker(id, j, innerWG)
}
innerWG.Wait()
fmt.Printf("Outer Worker %d completed\n", id)
}
func innerWorker(outerID, innerID int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Inner Worker %d of Outer Worker %d started\n", innerID, outerID)
time.Sleep(2 * time.Second)
fmt.Printf("Inner Worker %d of Outer Worker %d completed\n", innerID, outerID)
}
动态任务添加
在某些场景下,我们可能需要动态添加需要等待的任务。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var dynamicWG sync.WaitGroup
// 第一波任务
for i := 1; i <= 3; i++ {
dynamicWG.Add(1)
go dynamicWorker(i, &dynamicWG)
}
// 模拟动态添加更多任务
time.Sleep(1 * time.Second)
for i := 4; i <= 6; i++ {
dynamicWG.Add(1)
go dynamicWorker(i, &dynamicWG)
}
dynamicWG.Wait()
fmt.Println("All dynamic workers have completed.")
}
func dynamicWorker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Dynamic Worker %d started\n", id)
time.Sleep(2 * time.Second)
fmt.Printf("Dynamic Worker %d completed\n", id)
}
超时控制
结合 select 语句,可以实现带超时控制的 WaitGroup 等待。
package main
import (
"errors"
"fmt"
"sync"
"time"
)
func main() {
var timeoutWG sync.WaitGroup
for i := 1; i <= 3; i++ {
timeoutWG.Add(1)
go timeoutWorker(i, &timeoutWG)
}
// 等待最多5秒,超时则返回错误
err := waitWithTimeout(&timeoutWG, 5*time.Second)
if err != nil {
fmt.Printf("Timeout reached. Not all workers have completed. Error: %v\n", err)
} else {
fmt.Println("All timeout workers have completed.")
}
}
func timeoutWorker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Timeout Worker %d started\n", id)
time.Sleep(time.Duration(id) * time.Second)
fmt.Printf("Timeout Worker %d completed\n", id)
}
func waitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) error {
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-done:
return nil
case <-time.After(timeout):
return errors.New("timeout reached")
}
}
使用注意事项
-
Add必须在goroutine启动前调用:如果在 goroutine 启动后再调用 Add,可能导致计数器不匹配,引发死锁或 panic 。
-
Done必须与Add次数匹配:每次 Add(n) 后,必须调用 n 次 Done(),否则计数器无法归零,导致 Wait() 永远阻塞。
-
可复用的WaitGroup:WaitGroup 在 Wait() 返回后,可以继续 Add() 和 Done(),然后再次 Wait()。
-
传递指针而非值:WaitGroup 必须通过指针传递给 goroutine,否则会复制副本,导致死锁。
-
错误处理:即使在 goroutine 中发生 panic,使用 defer 调用 Done() 也能确保计数器正确减少,避免死锁。
写在最后
sync.WaitGroup 是 Go 语言并发编程中一个简单而强大的工具,它通过计数器机制帮助我们协调多个 goroutine 的执行。无论是简单的并行任务,还是复杂的多级同步,WaitGroup 都能提供有效的解决方案。
掌握 WaitGroup 的正确使用方式,可以帮助我们编写出更加健壮、高效的并发程序,让并发任务协作得心应手。