我正在编写一个应用程序,用户可以从许多“作业”(实际上是URL)开始 . 在开始(主例程)时,我将这些URL添加到队列中,然后启动适用于这些URL的x goroutines .
在特殊情况下,URL指向的资源可能包含更多必须添加到队列的URL . 3名 Worker 正在等待新的工作进来并加工 . 问题是:一旦每个 Worker 都在等待工作(并且没有人正在 生产环境 任何工作), Worker 应该完全停止工作 . 所以要么他们都工作,要么没人工作 .
我目前的实现看起来像这样,我不认为它很优雅 . 不幸的是,我想不出更好的方式,不包括竞争条件,我不完全确定这个实现是否真的按预期工作:
var queue // from somewhere
const WORKER_COUNT = 3
var done chan struct{}
func work(working chan int) {
absent := make(chan struct{}, 1)
// if x>1 jobs in sequence are popped, send to "absent" channel only 1 struct.
// This implementation also assumes that the select statement will be evaluated "in-order" (channel 2 only if channel 1 yields nothing) - is this actually correct? EDIT: It is, according to the specs.
one := false
for {
select {
case u, ok := <-queue.Pop():
if !ok {
close(absent)
return
}
if !one {
// I have started working (delta + 1)
working <- 1
absent <- struct{}{}
one = true
}
// do work with u (which may lead to queue.Push(urls...))
case <-absent: // no jobs at the moment. consume absent => wait
one = false
working <- -1
}
}
}
func Start() {
working := make(chan int)
for i := 0; i < WORKER_COUNT; i++ {
go work(working)
}
// the amount of actually working workers...
sum := 0
for {
delta := <-working
sum += delta
if sum == 0 {
queue.Close() // close channel -> kill workers.
done <- struct{}{}
return
}
}
}
有没有更好的方法来解决这个问题?
1 回答
您可以use a sync.WaitGroup(参见docs)来控制工作人员的生命周期,并使用非阻塞发送,以便工作人员在尝试排队更多作业时不会死锁:
看起来好像缓冲
jobs
通道可以防止死锁添加作业,但它不会在你开始的地方重新开始 . 缓冲很好,在某些情况下效率很高;它只是没有必要或足以防止僵局 .我在this function to kick off a parallel sort遇到了这种情况,这可能比上面的例子更难以阅读,因为排序的细节 - 比如将小任务与大任务区别对待 - 会混入其中 .