首页 文章

如何安全地关闭Go中的Chan chan T?

提问于
浏览
1

我正在实现一个简单的工作池算法,其中1个Sender(调度程序)将作业发送到M(Worker)go例程 . 为此,它使用通道通道为第一个空闲工作者分配一个可用的工作:

// builds the pool
func NewWorkerPool(maxWorkers int) WorkerPool {
    pool := make(chan chan Job, maxWorkers)
    workers := make([]Worker, 0)
    return WorkerPool{
        WorkerPool: pool,
        Workers: workers,
        maxWorkers: maxWorkers,
        waitGroup: sync.WaitGroup{}}
}

// Starts the WorkerPool
func (p *WorkerPool) Run(queue chan Job) {
    w := p.waitGroup

    // starting n number of workers
    for i := 0; i < p.maxWorkers; i++ {
        worker := NewWorker(p.WorkerPool)
        p.Workers = append(p.Workers, worker)
        w.Add(1)
        worker.Start(&w)
    }

    go p.dispatch(queue)
}

// dispatches a job to be handled by an idle Worker of the pool
func (p *WorkerPool) dispatch(jobQueue chan Job) {
    for {
        select {
        case job := <-jobQueue:
            // a model request has been received
            go func(job Job) {
                // try to obtain a worker model channel that is available.
                // this will block until a worker is idle
                jobChannel := <-p.WorkerPool

                // dispatch the model to the worker model channel
                jobChannel <- job
            }(job)
        }
    }
}


// checks if a Worker Pool is open or closed - If we can recieve on the channel then it is NOT closed
func (p *WorkerPool) IsOpen() bool {
    _, ok := <-p.WorkerPool
    return ok
}

工作人员启动和停止方法

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start(wg *sync.WaitGroup) {
    go func() {
        defer wg.Done()
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // we have received a work request.
                result := job.Run()
                job.ReturnChannel <- result

                // once result is returned close the job output channel
                close(job.ReturnChannel)

            case <-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}

// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

现在我试图通过使用以下方法关闭池,我使用sync.WaitGroup以等待所有 Worker 关闭:

// stops the Pool
func (p *WorkerPool) Stop() bool {
    // stops all workers
    for _, worker := range p.Workers {
        worker.Stop()
    }
    p.waitGroup.Wait() //Wait for the goroutines to shutdown

    close(p.WorkerPool)

    more := p.IsOpen()

    fmt.Printf(" more? %t", more)

    return ok
}

//打印更多?真正

即使我等待工作人员退出并稍后调用close(p.WorkerPool),我仍然打开通道,在这种情况下缺少什么,如何相应地关闭通道?

1 回答

  • 1

    关闭 Channels 表示不再向其发送任何值 . 这对于与通道的接收器进行通信完成非常有用 .

    Channels 中的数据仍然存在,您可能需要关闭 Channels ,然后删除其中的所有 Channels ,如下所示

    // Stop stops the Pool and free all the channels
    func (p *WorkerPool) Stop() bool {
        // stops all workers
        for _, worker := range p.Workers {
            worker.Stop()
        }
        p.waitGroup.Wait() //Wait for the goroutines to shutdown
        close(p.WorkerPool)
        for channel := range p.WorkerPool {
            fmt.Println("Freeing channel") //remove all the channels
        }
        more := p.IsOpen()
        fmt.Printf(" more? %t", more)
    
        return ok
    }
    

    顺便说一句,人们不能使用 _, ok <- 检查通道是否关闭 . 我建议为该功能使用不同的名称

相关问题