我想用 n
Worker 实现"crawler",其中每个 Worker 都能够添加额外的工作 . 当没有工作和所有 Worker 完成工作时,程序应该停止 .
我有以下代码(你可以在https://play.golang.org/p/_j22p_OfYv玩它):
package main
import (
"fmt"
"sync"
)
func main() {
pathChan := make(chan string)
fileChan := make(chan string)
workers := 3
var wg sync.WaitGroup
paths := map[string][]string{
"/": {"/test", "/foo", "a", "b"},
"/test": {"aa", "bb", "cc"},
"/foo": {"/bar", "bbb", "ccc"},
"/bar": {"aaaa", "bbbb", "cccc"},
}
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
for {
path, ok := <-pathChan
if !ok {
break
}
for _, f := range paths[path] {
if f[0] == '/' {
pathChan <- f
} else {
fileChan <- f
}
}
}
wg.Done()
}()
}
pathChan <- "/"
for {
filePath, ok := <-fileChan
if !ok {
break
}
fmt.Println(filePath)
}
wg.Wait()
close(pathChan)
}
不幸的是,这以死锁结束 . 问题究竟在哪里?此外,编写此类功能的最佳做法是什么?渠道是否正确使用?
EDIT:
我已经更新了我的代码以使用两个等待组,一个用于作业,一个用于工作者(请参阅https://play.golang.org/p/bueUJzMhqj):
package main
import (
"fmt"
"sync"
)
func main() {
pathChan := make(chan string)
fileChan := make(chan string)
jobs := new(sync.WaitGroup)
workers := new(sync.WaitGroup)
nworkers := 2
paths := map[string][]string{
"/": {"/test", "/foo", "a", "b"},
"/test": {"aa", "bb", "cc"},
"/foo": {"/bar", "bbb", "ccc"},
"/bar": {"aaaa", "bbbb", "cccc"},
}
for i := 0; i < nworkers; i++ {
workers.Add(1)
go func() {
defer workers.Done()
for {
path, ok := <-pathChan
if !ok {
break
}
for _, f := range paths[path] {
if f[0] == '/' {
jobs.Add(1)
pathChan <- f
} else {
fileChan <- f
}
}
jobs.Done()
}
}()
}
jobs.Add(1)
pathChan <- "/"
go func() {
jobs.Wait()
close(pathChan)
workers.Wait()
close(fileChan)
}()
for {
filePath, ok := <-fileChan
if !ok {
break
}
fmt.Println(filePath)
}
}
这似乎确实有效,但显然如果将 nworkers
设置为 1
,仍会发生死锁,因为单个工作人员在向通道 pathChan
添加内容时将永远等待 . 为了解决这个问题,可以增加通道缓冲区(例如 pathChan := make(chan string, 2)
),但只有两个缓冲区对我来说似乎是一个干净的解决方案,这才会起作用 .
这是我意识到使用某种队列而不是通道更容易的地方,其中可以无阻塞地添加和删除元素,并且队列的大小不固定 . Go标准库中是否存在此类队列?
1 回答
如果要等待任意数量的工作程序完成,标准库包含sync.WaitGroup就是为了这个目的 .
还有其他并发问题:
您正在使用 Channels 关闭信令,但您在同一 Channels 上发送了多个goroutines . 这通常是不好的做法:由于每个例程永远不知道其他例程何时完成,因此您永远无法正确关闭 Channels .
关闭一个通道等待另一个通道先关闭,但它永远不会关闭,所以它会死锁 .
它没有立即死锁的唯一原因是你的例子恰好比"/"下的目录拥有更多的 Worker . 在"/"下再添加两个目录,它立即死锁 .
有一些解决方案:
转储工作池并为每个子目录旋转一个goroutine,让调度程序担心其余的:https://play.golang.org/p/ck2DkNFnyF
每个根级目录使用一个worker,并让每个worker以递归方式处理其目录,而不是将它找到的子目录排队到通道 .