首页 文章

没有阅读,如何检查 Channels 是否关闭?

提问于
浏览
47

这是@Jimt撰写的Go中 Worker 和控制器模式的一个很好的例子,回答“Is there some elegant way to pause & resume any other goroutine in golang?

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// Possible worker states.
const (
    Stopped = 0
    Paused  = 1
    Running = 2
)

// Maximum number of workers.
const WorkerCount = 1000

func main() {
    // Launch workers.
    var wg sync.WaitGroup
    wg.Add(WorkerCount + 1)

    workers := make([]chan int, WorkerCount)
    for i := range workers {
        workers[i] = make(chan int)

        go func(i int) {
            worker(i, workers[i])
            wg.Done()
        }(i)
    }

    // Launch controller routine.
    go func() {
        controller(workers)
        wg.Done()
    }()

    // Wait for all goroutines to finish.
    wg.Wait()
}

func worker(id int, ws <-chan int) {
    state := Paused // Begin in the paused state.

    for {
        select {
        case state = <-ws:
            switch state {
            case Stopped:
                fmt.Printf("Worker %d: Stopped\n", id)
                return
            case Running:
                fmt.Printf("Worker %d: Running\n", id)
            case Paused:
                fmt.Printf("Worker %d: Paused\n", id)
            }

        default:
            // We use runtime.Gosched() to prevent a deadlock in this case.
            // It will not be needed of work is performed here which yields
            // to the scheduler.
            runtime.Gosched()

            if state == Paused {
                break
            }

            // Do actual work here.
        }
    }
}

// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
    // Start workers
    for i := range workers {
        workers[i] <- Running
    }

    // Pause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Paused
    }

    // Unpause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Running
    }

    // Shutdown workers.
    <-time.After(1e9)
    for i := range workers {
        close(workers[i])
    }
}

但是这段代码也存在一个问题:如果要在 worker() 退出时删除 workers 中的工作程序通道,则会发生死锁 .

如果你 close(workers[i]) ,下次控制器写入它会引起恐慌,因为go无法写入封闭的通道 . 如果您使用一些互斥锁来保护它,那么它将被卡在 workers[i] <- Running 上,因为 worker 没有从通道读取任何内容并且写入将被阻止,并且互斥锁将导致死锁 . 您还可以为通道提供更大的缓冲区作为解决方案,但这还不够好 .

所以我认为解决这个问题的最佳方法是退出时关闭通道,如果控制器发现通道关闭,它将跳过它并且什么都不做 . 但是我现在很困惑.2666632_ .

PS:我已经尝试过恢复引起恐慌的恐慌,但它会关闭引起恐慌的goroutine . 在这种情况下,它将是控制器,所以它是没用的 .

不过,我认为Go团队在下一版Go中实现这个功能很有用 .

8 回答

  • 38

    以一种hacky的方式,可以通过恢复引发的恐慌来尝试写入的 Channels . 但是如果没有读取它就无法检查读取通道是否已关闭 .

    要么你会

    • 最终从中读取"true"值( v <- c

    • 读取"true"值和'not closed'指标( v, ok <- c

    • 读取零值和'closed'指标( v, ok <- c

    • 将在 Channels 中阻止永久读取( v <- c

    从技术上讲,只有最后一个没有从 Channels 读取,但这没什么用处 .

  • 0

    没有办法编写一个安全的应用程序,你需要知道一个 Channels 是否打开而不与它交互 .

    做你想要做的事情的最好方法是有两个渠道 - 一个用于工作,一个用于表示改变状态的愿望(以及完成状态改变,如果这很重要) .

    Channels 很便宜 . 复杂的设计重载语义不是 .

    [也]

    <-time.After(1e9)
    

    写作是一种非常令人困惑和非显而易见的方式

    time.Sleep(time.Second)
    

    保持简单,每个人(包括你)都能理解它们 .

  • 56

    我知道这个答案太晚了,我写了这个解决方案,黑客Go run-time,这不安全,可能会崩溃:

    import (
        "unsafe"
        "reflect"
    )
    
    
    func isChanClosed(ch interface{}) bool {
        if reflect.TypeOf(ch).Kind() != reflect.Chan {
            panic("only channels!")
        }
    
        // get interface value pointer, from cgo_export 
        // typedef struct { void *t; void *v; } GoInterface;
        // then get channel real pointer
        cptr := *(*uintptr)(unsafe.Pointer(
            unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
        ))
    
        // this function will return true if chan.closed > 0
        // see hchan on https://github.com/golang/go/blob/master/src/runtime/chan.go 
        // type hchan struct {
        // qcount   uint           // total data in the queue
        // dataqsiz uint           // size of the circular queue
        // buf      unsafe.Pointer // points to an array of dataqsiz elements
        // elemsize uint16
        // closed   uint32
        // **
    
        cptr += unsafe.Sizeof(uint(0))*2
        cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
        cptr += unsafe.Sizeof(uint16(0))
        return *(*uint32)(unsafe.Pointer(cptr)) > 0
    }
    

    https://gist.github.com/youssifsayed/ca0cfcf9dc87905d37a4fee7beb253c2

  • 0

    也许我错过了一些东西,但似乎处理这个的简单而正确的方法是将“停止”发送到通道(终止go-routine),关闭通道并将其设置为nil .

    如果您认为需要检查封闭的通道而不阅读它,那么您的设计就会出现问题 . (请注意,代码还存在其他问题,例如暂停工作人员的“忙碌循环” . )

  • -4

    从文档:

    可以在内置功能关闭的情况下关闭通道 . 接收运算符的多值赋值形式报告在通道关闭之前是否发送了接收值 .

    https://golang.org/ref/spec#Receive_operator

    Golang in Action示例显示了这种情况:

    // This sample program demonstrates how to use an unbuffered
    // channel to simulate a game of tennis between two goroutines.
    package main
    
    import (
        "fmt"
        "math/rand"
        "sync"
        "time"
    )
    
    // wg is used to wait for the program to finish.
    var wg sync.WaitGroup
    
    func init() {
        rand.Seed(time.Now().UnixNano())
    }
    
    // main is the entry point for all Go programs.
    func main() {
        // Create an unbuffered channel.
        court := make(chan int)
        // Add a count of two, one for each goroutine.
        wg.Add(2)
        // Launch two players.
        go player("Nadal", court)
        go player("Djokovic", court)
        // Start the set.
        court <- 1
        // Wait for the game to finish.
        wg.Wait()
    }
    
    // player simulates a person playing the game of tennis.
    func player(name string, court chan int) {
        // Schedule the call to Done to tell main we are done.
        defer wg.Done()
        for {
            // Wait for the ball to be hit back to us.
            ball, ok := <-court
            fmt.Printf("ok %t\n", ok)
            if !ok {
                // If the channel was closed we won.
                fmt.Printf("Player %s Won\n", name)
                return
            }
            // Pick a random number and see if we miss the ball.
            n := rand.Intn(100)
            if n%13 == 0 {
                fmt.Printf("Player %s Missed\n", name)
                // Close the channel to signal we lost.
                close(court)
                return
            }
    
            // Display and then increment the hit count by one.
            fmt.Printf("Player %s Hit %d\n", name, ball)
            ball++
            // Hit the ball back to the opposing player.
            court <- ball
        }
    }
    
  • 4

    那么,您可以使用 default 分支来检测它,对于一个封闭的通道将被选中,例如:以下代码将选择 defaultchannelchannel ,第一个选择未被阻止 .

    func main() {
        ch := make(chan int)
    
        go func() {
            select {
            case <-ch:
                log.Printf("1.channel")
            default:
                log.Printf("1.default")
            }
            select {
            case <-ch:
                log.Printf("2.channel")
            }
            close(ch)
            select {
            case <-ch:
                log.Printf("3.channel")
            default:
                log.Printf("3.default")
            }
        }()
        time.Sleep(time.Second)
        ch <- 1
        time.Sleep(time.Second)
    }
    
  • -4

    如果您收听此 Channels ,您始终可以找到该 Channels 已关闭 .

    case state, opened := <-ws:
        if !opened {
             // channel was closed 
             // return or made some final work
        }
        switch state {
            case Stopped:
    

    但请记住,你不能两次关闭一个 Channels . 这会引起恐慌 .

  • 0

    如果通道有元素,则更容易检查,这将确保通道处于活动状态 .

    func isChanClosed(ch chan interface{}) bool {
        if len(ch) == 0 {
            select {
            case _, ok := <-ch:
                return !ok
            }
        }
        return false 
    }
    

相关问题