首页 文章

如何正确使用sync.Cond?

提问于
浏览
12

我无法弄清楚如何正确使用sync.Cond . 据我所知,锁定Locker和调用条件的Wait方法之间存在竞争条件 . 此示例在主goroutine中的两条线之间添加了一个人为延迟,以模拟竞争条件:

package main

import (
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        time.Sleep(1 * time.Second)
        c.Broadcast()
    }()
    m.Lock()
    time.Sleep(2 * time.Second)
    c.Wait()
}

[在Go Playground上运行]

这会立即引起恐慌:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208, 0x1)
    /usr/local/go/src/runtime/sema.go:241 +0x2e0
sync.(*Cond).Wait(0x10330200, 0x0)
    /usr/local/go/src/sync/cond.go:63 +0xe0
main.main()
    /tmp/sandbox301865429/main.go:17 +0x1a0

我究竟做错了什么?我该如何避免这种明显的竞争状况?我应该使用更好的同步构造吗?


Edit: 我意识到我应该更好地解释我在这里要解决的问题 . 我有一个长期运行的goroutine,下载一个大文件和许多其他goroutine,当它们可用时需要访问HTTP标头 . 这个问题比听起来更难 .

我不能使用 Channels ,因为只有一个goroutine会收到该值 . 而其他一些goroutine会在它们已经可用之后很久就试图检索它们 .

下载程序goroutine可以简单地将HTTP标头存储在变量中,并使用互斥锁来保护对它们的访问 . 但是,这并没有为其他goroutines提供“等待”它们可用的方法 .

我原以为 sync.Mutexsync.Cond 一起可以完成这个目标,但似乎这是不可能的 .

6 回答

  • 2

    我终于发现了一种方法,它根本不涉及 sync.Cond - 只是互斥体 .

    type Task struct {
        m       sync.Mutex
        headers http.Header
    }
    
    func NewTask() *Task {
        t := &Task{}
        t.m.Lock()
        go func() {
            defer t.m.Unlock()
            // ...do stuff...
        }()
        return t
    }
    
    func (t *Task) WaitFor() http.Header {
        t.m.Lock()
        defer t.m.Unlock()
        return t.headers
    }
    

    这是如何运作的?

    互斥锁在任务开始时被锁定,确保调用 WaitFor() 的任何内容都将被阻止 . 一旦标头可用并且互斥锁解锁互斥锁,每次调用 WaitFor() 将一次执行一个 . 所有未来的调用(即使在goroutine结束后)都可以锁定互斥锁,因为它将始终保持解锁状态 .

  • 6

    OP回答了他自己,但没有直接回答原来的问题,我将发布如何正确使用 sync.Cond .

    如果每次写入和读取都有一个goroutine,那么你真的不需要 sync.Cond - 单个 sync.Mutex 就足以在它们之间进行通信 . sync.Cond 在多个读者等待共享资源可用的情况下非常有用 .

    var sharedRsc = make(map[string]interface{})
    func main() {
        var wg sync.WaitGroup
        wg.Add(2)
        m := sync.Mutex{}
        c := sync.NewCond(&m)
        go func() {
            // this go routine wait for changes to the sharedRsc
            c.L.Lock()
            for len(sharedRsc) == 0 {
                c.Wait()
            }
            fmt.Println(sharedRsc["rsc1"])
            c.L.Unlock()
            wg.Done()
        }()
    
        go func() {
            // this go routine wait for changes to the sharedRsc
            c.L.Lock()
            for len(sharedRsc) == 0 {
                c.Wait()
            }
            fmt.Println(sharedRsc["rsc2"])
            c.L.Unlock()
            wg.Done()
        }()
    
        // this one writes changes to sharedRsc
        c.L.Lock()
        sharedRsc["rsc1"] = "foo"
        sharedRsc["rsc2"] = "bar"
        c.Broadcast()
        c.L.Unlock()
        wg.Wait()
    }
    

    Playground

    话虽如此,如果情况允许,仍然建议使用 Channels 传递数据 .

    注意: sync.WaitGroup 此处仅用于等待goroutines完成执行 .

  • 1

    您需要确保c.Broadcast被称为 after 您对c.Wait的调用 . 您的程序的正确版本将是:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        m := &sync.Mutex{}
        c := sync.NewCond(m)
        m.Lock()
        go func() {
            m.Lock() // Wait for c.Wait()
            c.Broadcast()
            m.Unlock()
        }()
        c.Wait() // Unlocks m
    }
    

    https://play.golang.org/p/O1r8v8yW6h

  • 5
    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func main() {
        m := sync.Mutex{}
        m.Lock() // main gouroutine is owner of lock
        c := sync.NewCond(&m)
        go func() {
            m.Lock() // obtain a lock
            defer m.Unlock()
            fmt.Println("3. goroutine is owner of lock")
            time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
            c.Broadcast()               // State has been changed, publish it to waiting goroutines
            fmt.Println("4. goroutine will release lock soon (deffered Unlock")
        }()
        fmt.Println("1. main goroutine is owner of lock")
        time.Sleep(1 * time.Second) // initialization
        fmt.Println("2. main goroutine is still lockek")
        c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
        // Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
        m.Unlock()
        fmt.Println("Done")
    }
    

    http://play.golang.org/p/fBBwoL7_pm

  • 2

    看起来你c.Wait for Broadcast,你的时间间隔永远不会发生 . 同

    time.Sleep(3 * time.Second) //Broadcast after any Wait for it
    c.Broadcast()
    

    你的片段似乎有效http://play.golang.org/p/OE8aP4i6gY . 或者我错过了你试图获得的东西吗?

  • 2

    这是一个有两个例程的实际例子 . 他们一个接一个地开始,但是第二个人在继续之前等待第一个播出的条件:

    package main
    
    import (
        "sync"
        "fmt"
        "time"
    )
    
    func main() {
        lock := sync.Mutex{}
        lock.Lock()
    
        cond := sync.NewCond(&lock)
    
        waitGroup := sync.WaitGroup{}
        waitGroup.Add(2)
    
        go func() {
            defer waitGroup.Done()
    
            fmt.Println("First go routine has started and waits for 1 second before broadcasting condition")
    
            time.Sleep(1 * time.Second)
    
            fmt.Println("First go routine broadcasts condition")
    
            cond.Broadcast()
        }()
    
        go func() {
            defer waitGroup.Done()
    
            fmt.Println("Second go routine has started and is waiting on condition")
    
            cond.Wait()
    
            fmt.Println("Second go routine unlocked by condition broadcast")
        }()
    
        fmt.Println("Main go routine starts waiting")
    
        waitGroup.Wait()
    
        fmt.Println("Main go routine ends")
    }
    

    输出可能略有不同,因为第二个例程可以在第一个例程之前开始,反之亦然:

    Main go routine starts waiting
    Second go routine has started and is waiting on condition
    First go routine has started and waits for 1 second before broadcasting condition
    First go routine broadcasts condition
    Second go routine unlocked by condition broadcast
    Main go routine ends
    

    https://gist.github.com/fracasula/21565ea1cf0c15726ca38736031edc70

相关问题