在 Go 语言的并发编程世界中,sync.WaitGroup 是一个简单却极其重要的同步工具。今天我们就来深入探讨一下它的应用场景和使用技巧,帮助你在实际项目中更好地管理并发任务。

什么是sync.WaitGroup?

sync.WaitGroup 是 Go 标准库 sync 包中的一个同步工具,用于等待一组 goroutine 完成执行。它的核心是通过一个计数器来跟踪并发任务的数量:当计数器为零时,等待的 goroutine 可以继续执行。

WaitGroup提供了三个核心方法:

  • Add(delta int): 增加计数器的值,表示需要等待的 goroutine 数量
  • Done(): 将计数器减 1,表示一个 goroutine 已完成
  • Wait(): 阻塞当前 goroutine,直到计数器归零

主要应用场景

1. 并行任务执行

当需要同时执行多个任务,并且等待所有任务完成后再进行下一步操作时,WaitGroup 非常有用。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    urls := []string{"https://example.com", "https://example.org", "https://example.net"}

    for _, url := range urls {
        wg.Add(1)
        go func(u string) {
            defer wg.Done()
            fmt.Printf("Fetching data from %s...\n", u)
            time.Sleep(2 * time.Second) // 模拟网络请求
            fmt.Printf("Data from %s fetched.\n", u)
        }(url)
    }

    wg.Wait()
    fmt.Println("All data fetched. Proceeding to next step.")
}

这种模式在爬虫程序、批量文件处理等场景中特别常见。

2. 并行数据处理

处理大量数据时,可以将数据分成多个部分,每个 goroutine 处理一部分,最后汇总结果。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    chunkSize := 2
    resultChan := make(chan int)

    for i := 0; i < len(data); i += chunkSize {
        end := i + chunkSize
        if end > len(data) {
            end = len(data)
        }
        wg.Add(1)
        go func(chunk []int) {
            defer wg.Done()
            sum := 0
            for _, num := range chunk {
                sum += num
            }
            resultChan <- sum
        }(data[i:end])
    }

    go func() {
        wg.Wait()
        close(resultChan)
    }()

    totalSum := 0
    for sum := range resultChan {
        totalSum += sum
    }

    fmt.Printf("Total sum of data: %d\n", totalSum)
}

3. 服务初始化与资源清理

在服务启动时,可能需要并行初始化多个组件,等待所有组件初始化完成后再启动服务。同样,在程序退出时,也需要等待所有清理操作完成。

4. 并发测试

在编写并发测试用例时,WaitGroup 可以确保所有测试 goroutine 都完成后才检查测试结果。

func TestConcurrentOperations(t *testing.T) {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            // 执行并发操作
        }(i)
    }
    wg.Wait()
    // 检查测试结果
}

高级应用场景

多级等待组

在复杂应用中,可能需要使用嵌套的 WaitGroup 来实现多级同步。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var outerWG sync.WaitGroup
    var innerWG sync.WaitGroup

    for i := 1; i <= 2; i++ {
        outerWG.Add(1)
        go outerWorker(i, &outerWG, &innerWG)
    }

    outerWG.Wait()
    fmt.Println("All outer workers have completed.")
}

func outerWorker(id int, outerWG, innerWG *sync.WaitGroup) {
    defer outerWG.Done()
    fmt.Printf("Outer Worker %d started\n", id)

    for j := 1; j <= 3; j++ {
        innerWG.Add(1)
        go innerWorker(id, j, innerWG)
    }

    innerWG.Wait()
    fmt.Printf("Outer Worker %d completed\n", id)
}

func innerWorker(outerID, innerID int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Inner Worker %d of Outer Worker %d started\n", innerID, outerID)
    time.Sleep(2 * time.Second)
    fmt.Printf("Inner Worker %d of Outer Worker %d completed\n", innerID, outerID)
}

动态任务添加

在某些场景下,我们可能需要动态添加需要等待的任务。

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var dynamicWG sync.WaitGroup

    // 第一波任务
    for i := 1; i <= 3; i++ {
        dynamicWG.Add(1)
        go dynamicWorker(i, &dynamicWG)
    }

    // 模拟动态添加更多任务
    time.Sleep(1 * time.Second)
    for i := 4; i <= 6; i++ {
        dynamicWG.Add(1)
        go dynamicWorker(i, &dynamicWG)
    }

    dynamicWG.Wait()
    fmt.Println("All dynamic workers have completed.")
}

func dynamicWorker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Dynamic Worker %d started\n", id)
    time.Sleep(2 * time.Second)
    fmt.Printf("Dynamic Worker %d completed\n", id)
}

超时控制

结合 select 语句,可以实现带超时控制的 WaitGroup 等待。

package main

import (
    "errors"
    "fmt"
    "sync"
    "time"
)

func main() {
    var timeoutWG sync.WaitGroup

    for i := 1; i <= 3; i++ {
        timeoutWG.Add(1)
        go timeoutWorker(i, &timeoutWG)
    }

    // 等待最多5秒,超时则返回错误
    err := waitWithTimeout(&timeoutWG, 5*time.Second)
    if err != nil {
        fmt.Printf("Timeout reached. Not all workers have completed. Error: %v\n", err)
    } else {
        fmt.Println("All timeout workers have completed.")
    }
}

func timeoutWorker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Timeout Worker %d started\n", id)
    time.Sleep(time.Duration(id) * time.Second)
    fmt.Printf("Timeout Worker %d completed\n", id)
}

func waitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) error {
    done := make(chan struct{})
    go func() {
        defer close(done)
        wg.Wait()
    }()

    select {
    case <-done:
        return nil
    case <-time.After(timeout):
        return errors.New("timeout reached")
    }
}

使用注意事项

  1. Add必须在goroutine启动前调用:如果在 goroutine 启动后再调用 Add,可能导致计数器不匹配,引发死锁或 panic 。

  2. Done必须与Add次数匹配:每次 Add(n) 后,必须调用 n 次 Done(),否则计数器无法归零,导致 Wait() 永远阻塞。

  3. 可复用的WaitGroup:WaitGroup 在 Wait() 返回后,可以继续 Add() 和 Done(),然后再次 Wait()。

  4. 传递指针而非值:WaitGroup 必须通过指针传递给 goroutine,否则会复制副本,导致死锁。

  5. 错误处理:即使在 goroutine 中发生 panic,使用 defer 调用 Done() 也能确保计数器正确减少,避免死锁。

写在最后

sync.WaitGroup 是 Go 语言并发编程中一个简单而强大的工具,它通过计数器机制帮助我们协调多个 goroutine 的执行。无论是简单的并行任务,还是复杂的多级同步,WaitGroup 都能提供有效的解决方案。

掌握 WaitGroup 的正确使用方式,可以帮助我们编写出更加健壮、高效的并发程序,让并发任务协作得心应手。