首页 文章

在Go中实现作业工作者池

提问于
浏览
0

由于Go没有泛型,所有预制解决方案都使用我不太喜欢的类型铸造 . 我也希望自己实现它并尝试以下代码 . 但是,有时它并不等待所有goroutines,我是否过早关闭了工作渠道?我没有任何东西可以从他们那里取 . 我可能也使用了伪输出通道并等待从它们获取确切的数量但是我相信以下代码也应该工作 . 我错过了什么?

func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()

    for job := range jobs {
        item := ParseItem(job)
        item.SaveItem()
        MarkJobCompleted(item.ID)
        log.Println("Saved", item.Title)
    }
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

    jobs := make(chan string)

    list := GetJobs()
    // Start workers
    var wg sync.WaitGroup
    for w := 0; w < 10; w++ {
        go jobWorker(w, jobs, wg)
    }

    for _, url := range list {
        jobs <- url
    }

    close(jobs)
    wg.Wait()
}

2 回答

  • 1

    在goroutine外部调用wg.Add并将指针传递给等待组 .

    如果从goroutine内部调用Add,则主goroutine可以在goroutines有机会运行之前调用Wait . 如果尚未调用Add,则Wait将立即返回 .

    将指针传递给goroutine . 否则,goroutines使用自己的等待组副本 .

    func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
    
        defer wg.Done()
    
        for job := range jobs {
            item := ParseItem(job)
            item.SaveItem()
            MarkJobCompleted(item.ID)
            log.Println("Saved", item.Title)
        }
    }
    
    // ProcessJobs processes the jobs from the list and deletes them
    func ProcessJobs() {
    
        jobs := make(chan string)
    
        list := GetJobs()
        // Start workers
        var wg sync.WaitGroup
        for w := 0; w < 10; w++ {
            wg.Add(1)
            go jobWorker(w, jobs, &wg)
        }
    
        for _, url := range list {
            jobs <- url
        }
    
        close(jobs)
        wg.Wait()
    }
    
  • 1

    您需要将指针传递给waitgroup,否则每个作业都会收到它自己的副本 .

    func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
        wg.Add(1)
        defer wg.Done()
    
        for job := range jobs {
            item := ParseItem(job)
            item.SaveItem()
            MarkJobCompleted(item.ID)
            log.Println("Saved", item.Title)
        }
    }
    
    // ProcessJobs processes the jobs from the list and deletes them
    func ProcessJobs() {
    
        jobs := make(chan string)
    
        list := GetJobs()
        // Start workers
        var wg sync.WaitGroup
        for w := 0; w < 10; w++ {
            go jobWorker(w, jobs, &wg)
        }
    
        for _, url := range list {
            jobs <- url
        }
    
        close(jobs)
        wg.Wait()
    }
    

    在这里看到差异:without pointerwith pointer .

相关问题