首页 文章

同步工作程序以进行递归爬网

提问于
浏览
0

我想用 n Worker 实现"crawler",其中每个 Worker 都能够添加额外的工作 . 当没有工作和所有 Worker 完成工作时,程序应该停止 .

我有以下代码(你可以在https://play.golang.org/p/_j22p_OfYv玩它):

package main

import (
    "fmt"
    "sync"
)

func main() {
    pathChan := make(chan string)
    fileChan := make(chan string)
    workers := 3
    var wg sync.WaitGroup

    paths := map[string][]string{
        "/":     {"/test", "/foo", "a", "b"},
        "/test": {"aa", "bb", "cc"},
        "/foo":  {"/bar", "bbb", "ccc"},
        "/bar":  {"aaaa", "bbbb", "cccc"},
    }

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            for {
                path, ok := <-pathChan
                if !ok {
                    break
                }

                for _, f := range paths[path] {
                    if f[0] == '/' {
                        pathChan <- f
                    } else {
                        fileChan <- f
                    }
                }
            }

            wg.Done()
        }()
    }

    pathChan <- "/"

    for {
        filePath, ok := <-fileChan
        if !ok {
            break
        }

        fmt.Println(filePath)
    }

    wg.Wait()
    close(pathChan)
}

不幸的是,这以死锁结束 . 问题究竟在哪里?此外,编写此类功能的最佳做法是什么?渠道是否正确使用?

EDIT:

我已经更新了我的代码以使用两个等待组,一个用于作业,一个用于工作者(请参阅https://play.golang.org/p/bueUJzMhqj):

package main

import (
    "fmt"
    "sync"
)

func main() {
    pathChan := make(chan string)
    fileChan := make(chan string)
    jobs := new(sync.WaitGroup)
    workers := new(sync.WaitGroup)
    nworkers := 2

    paths := map[string][]string{
        "/":     {"/test", "/foo", "a", "b"},
        "/test": {"aa", "bb", "cc"},
        "/foo":  {"/bar", "bbb", "ccc"},
        "/bar":  {"aaaa", "bbbb", "cccc"},
    }

    for i := 0; i < nworkers; i++ {
        workers.Add(1)
        go func() {
            defer workers.Done()
            for {
                path, ok := <-pathChan
                if !ok {
                    break
                }

                for _, f := range paths[path] {
                    if f[0] == '/' {
                        jobs.Add(1)
                        pathChan <- f
                    } else {
                        fileChan <- f
                    }
                }

                jobs.Done()
            }

        }()
    }

    jobs.Add(1)
    pathChan <- "/"

    go func() {
        jobs.Wait()
        close(pathChan)
        workers.Wait()
        close(fileChan)
    }()

    for {
        filePath, ok := <-fileChan
        if !ok {
            break
        }

        fmt.Println(filePath)
    }

}

这似乎确实有效,但显然如果将 nworkers 设置为 1 ,仍会发生死锁,因为单个工作人员在向通道 pathChan 添加内容时将永远等待 . 为了解决这个问题,可以增加通道缓冲区(例如 pathChan := make(chan string, 2) ),但只有两个缓冲区对我来说似乎是一个干净的解决方案,这才会起作用 .

这是我意识到使用某种队列而不是通道更容易的地方,其中可以无阻塞地添加和删除元素,并且队列的大小不固定 . Go标准库中是否存在此类队列?

1 回答

  • 0

    如果要等待任意数量的工作程序完成,标准库包含sync.WaitGroup就是为了这个目的 .

    还有其他并发问题:

    • 您正在使用 Channels 关闭信令,但您在同一 Channels 上发送了多个goroutines . 这通常是不好的做法:由于每个例程永远不知道其他例程何时完成,因此您永远无法正确关闭 Channels .

    • 关闭一个通道等待另一个通道先关闭,但它永远不会关闭,所以它会死锁 .

    • 它没有立即死锁的唯一原因是你的例子恰好比"/"下的目录拥有更多的 Worker . 在"/"下再添加两个目录,它立即死锁 .

    有一些解决方案:

    • 转储工作池并为每个子目录旋转一个goroutine,让调度程序担心其余的:https://play.golang.org/p/ck2DkNFnyF

    • 每个根级目录使用一个worker,并让每个worker以递归方式处理其目录,而不是将它找到的子目录排队到通道 .

相关问题