首页 文章

F#的异步如何真正起作用?

提问于
浏览
33

我想学习 asynclet! 如何在F#中工作 . 我使用Async.RunSynchronously运行异步块的所有文档都是've read seem confusing. What'?这是异步还是同步?看起来像是一个矛盾 .

文档说Async.StartImmediate在当前线程中运行 . 如果它在同一个线程中运行,它对我来说看起来并不是异步......或者asyncs更像是协程而不是线程 . 如果是这样的话,他们什么时候退回?

引用MS文档:

使用let的代码行!开始计算,然后暂停线程直到结果可用,此时执行继续 .

如果线程等待结果,我为什么要使用它?看起来像普通的旧函数调用 .

Async.Parallel有什么作用?它接收一系列Async <'T> . 为什么不能并行执行一系列普通函数?

我想我在这里缺少一些非常基本的东西 . 我想在我理解之后,所有文档和样本都将开始有意义 .

7 回答

  • 2

    一些东西 .

    一,区别

    let resp = req.GetResponse()
    

    let! resp = req.AsyncGetReponse()
    

    对于Web请求为'at sea'的大概数百毫秒(CPU的永恒),前者使用一个线程(在I / O上阻塞),而后者使用零线程 . 这是async最常见的'win':您可以编写非阻塞I / O,这些I / O不会强制执行控制反转并将事物转化为回调 . )

    其次, Async.StartImmediate 将在当前线程上启动异步 . 典型的用途是使用GUI,你有一些想要的GUI应用程序,例如更新UI(例如在某处说"loading..."),然后执行一些后台工作(从磁盘或其他任何东西加载),然后返回到前台UI线程以在完成时更新UI("done!") . StartImmediate 启用异步以在操作开始时更新UI并捕获 SynchronizationContext ,以便在操作结束时可以返回GUI以执行UI的最终更新 .

    接下来, Async.RunSynchronously 很少使用(一篇论文是你在任何应用程序中最多调用一次) . 在限制中,如果您将整个程序编写为异步,则在"main"方法中,您将调用 RunSynchronously 来运行程序并等待结果(例如,在控制台应用程序中打印出结果) . 这确实会阻塞一个线程,因此它通常仅在程序的异步部分的非常有用的情况下,在边界背面同步 . (更高级的用户可能更喜欢 StartWithContinuations - RunSynchronously 有点"easy hack"从异步回到同步 . )

    最后, Async.Parallel 执行fork-join并行性 . 您可以编写一个类似于函数而不是 async 的函数(就像TPL中的东西一样),但F#中的典型最佳点是并行I / O绑定计算,它们已经是异步对象,所以这是最常见的有用的签名 . (对于CPU绑定的并行性,您可以使用asyncs,但您也可以使用TPL . )

  • 12

    异步的用法是保存使用中的线程数 .

    请参阅以下示例:

    let fetchUrlSync url = 
        let req = WebRequest.Create(Uri url)
        use resp = req.GetResponse()
        use stream = resp.GetResponseStream()
        use reader = new StreamReader(stream)
        let contents = reader.ReadToEnd()
        contents 
    
    let sites = ["http://www.bing.com";
                 "http://www.google.com";
                 "http://www.yahoo.com";
                 "http://www.search.com"]
    
    // execute the fetchUrlSync function in parallel 
    let pagesSync = sites |> PSeq.map fetchUrlSync  |> PSeq.toList
    

    上面的代码是你想要做的:定义一个函数并并行执行 . 那么为什么我们需要异步呢?

    让我们考虑一下大事 . 例如 . 如果网站的数量不是4,而是说10,000!然后需要10,000个线程并行运行它们,这是一个巨大的资源成本 .

    在异步时:

    let fetchUrlAsync url =
        async { let req =  WebRequest.Create(Uri url)
                use! resp = req.AsyncGetResponse()
                use stream = resp.GetResponseStream()
                use reader = new StreamReader(stream)
                let contents = reader.ReadToEnd()
                return contents }
    let pagesAsync = sites |> Seq.map fetchUrlAsync |> Async.Parallel |> Async.RunSynchronously
    

    当代码在 use! resp = req.AsyncGetResponse() 中时,当前线程被放弃,其资源可用于其他目的 . 如果响应在1秒后回复,那么你的线程可以使用这1秒来处理其他东西 . 否则线程被阻塞,浪费线程资源1秒钟 .

    因此,即使您以异步方式并行下载10000个网页,线程数也仅限于少量 .

    我认为你不是.Net / C#程序员 . 异步教程通常假定人们知道.Net以及如何在C#中编写异步IO(很多代码) . F#中Async构造的神奇之处不在于并行 . 因为简单的并行可以通过其他结构实现,例如ParallelFor在.Net并行扩展中 . 但是,异步IO更复杂,因为您看到线程放弃执行,当IO完成时,IO需要唤醒其父线程 . 这是异步魔术用于的地方:在几行简洁代码中,您可以进行非常复杂的控制 .

  • 31

    许多这里的答案很好,但我认为我对这个问题采取了不同的观点:F#的异步真的如何运作?

    与C#F#中的 async/await 不同,开发人员实际上可以实现自己的 Async 版本 . 这可以是了解 Async 如何工作的好方法 .

    (感兴趣的是 Async 的源代码可以在这里找到:https://github.com/Microsoft/visualfsharp/blob/fsharp4/src/fsharp/FSharp.Core/control.fs

    作为我们DIY工作流程的基本构建块,我们定义:

    type DIY<'T> = ('T->unit)->unit
    

    这是一个函数,它接受在 'T 类型的结果准备就绪时调用的另一个函数(称为continuation) . 这允许 DIY<'T> 在不阻塞调用线程的情况下启动后台任务 . 当结果准备就绪时,将调用continuation以允许继续计算 .

    F# Async 构建块有点复杂,因为它还包括取消和异常延续,但基本上就是这样 .

    为了支持F#工作流语法,我们需要定义一个计算表达式(https://msdn.microsoft.com/en-us/library/dd233182.aspx) . 虽然这是一个相当先进的F#功能,但它也是F#最神奇的功能之一 . 要定义的两个最重要的操作是 returnbind ,F#使用这些操作将我们的 DIY<_> 构建块组合成聚合的 DIY<_> 构建块 .

    adaptTask 用于使 Task<'T> 适应 DIY<'T> . startChild 允许启动几个simulatenous DIY<'T> ,请注意它不会启动新线程以便这样做但重用调用线程 .

    这里没有任何进一步的例子是示例程序:

    open System
    open System.Diagnostics
    open System.Threading
    open System.Threading.Tasks
    
    // Our Do It Yourself Async workflow is a function accepting a continuation ('T->unit).
    // The continuation is called when the result of the workflow is ready. 
    // This may happen immediately or after awhile, the important thing is that 
    //  we don't block the calling thread which may then continue executing useful code.
    type DIY<'T> = ('T->unit)->unit
    
    // In order to support let!, do! and so on we implement a computation expression.
    // The two most important operations are returnValue/bind but delay is also generally 
    //  good to implement.
    module DIY =
    
        // returnValue is called when devs uses return x in a workflow.
        // returnValue passed v immediately to the continuation.
        let returnValue (v : 'T) : DIY<'T> =
            fun a ->
                a v
    
        // bind is called when devs uses let!/do! x in a workflow
        // bind binds two DIY workflows together
        let bind (t : DIY<'T>) (fu : 'T->DIY<'U>) : DIY<'U> =
            fun a ->
                let aa tv =
                    let u = fu tv
                    u a
                t aa
    
        let delay (ft : unit->DIY<'T>) : DIY<'T> =
            fun a ->
                let t = ft ()
                t a
    
        // starts a DIY workflow as a subflow
        // The way it works is that the workflow is executed 
        //  which may be a delayed operation. But startChild
        //  should always complete immediately so in order to
        //  have something to return it returns a DIY workflow
        // postProcess checks if the child has computed a value 
        //  ie rv has some value and if we have computation ready
        //  to receive the value (rca has some value).
        //  If this is true invoke ca with v
        let startChild (t : DIY<'T>) : DIY<DIY<'T>> =
            fun a ->
                let l   = obj()
                let rv  = ref None
                let rca = ref None
    
                let postProcess () =
                    match !rv, !rca with
                    | Some v, Some ca ->
                        ca v
                        rv  := None
                        rca := None
                    | _ , _ -> ()
    
                let receiver v =
                    lock l <| fun () ->
                        rv := Some v
                        postProcess ()
    
                t receiver
    
                let child : DIY<'T> =
                    fun ca ->
                        lock l <| fun () ->
                            rca := Some ca
                            postProcess ()
    
                a child
    
        let runWithContinuation (t : DIY<'T>) (f : 'T -> unit) : unit =
            t f
    
        // Adapts a task as a DIY workflow
        let adaptTask (t : Task<'T>) : DIY<'T> =
            fun a ->
                let action = Action<Task<'T>> (fun t -> a t.Result)
                ignore <| t.ContinueWith action
    
        // Because C# generics doesn't allow Task<void> we need to have
        //  a special overload of for the unit Task.
        let adaptUnitTask (t : Task) : DIY<unit> =
            fun a ->
                let action = Action<Task> (fun t -> a ())
                ignore <| t.ContinueWith action
    
        type DIYBuilder() =
            member x.Return(v)  = returnValue v
            member x.Bind(t,fu) = bind t fu
            member x.Delay(ft)  = delay ft
    
    let diy = DIY.DIYBuilder()
    
    open DIY
    
    [<EntryPoint>]
    let main argv = 
    
        let delay (ms : int) = adaptUnitTask <| Task.Delay ms
    
        let delayedValue ms v =
            diy {
                do! delay ms
                return v
            }
    
        let complete = 
            diy {
                let sw = Stopwatch ()
                sw.Start ()
    
                // Since we are executing these tasks concurrently 
                //  the time this takes should be roughly 700ms
                let! cd1 = startChild <| delayedValue 100 1
                let! cd2 = startChild <| delayedValue 300 2
                let! cd3 = startChild <| delayedValue 700 3
    
                let! d1 = cd1
                let! d2 = cd2
                let! d3 = cd3
    
                sw.Stop ()
    
                return sw.ElapsedMilliseconds,d1,d2,d3
            }
    
        printfn "Starting workflow"
    
        runWithContinuation complete (printfn "Result is: %A")
    
        printfn "Waiting for key"
    
        ignore <| Console.ReadKey ()
    
        0
    

    程序的输出应该是这样的:

    Starting workflow
    Waiting for key
    Result is: (706L, 1, 2, 3)
    

    在运行程序时,注意 Waiting for key 会立即打印,因为控制台线程未被阻止启动工作流程 . 大约700ms后打印结果 .

    我希望这对一些F#开发者来说很有意思

  • 6

    最近我简要概述了异步模块中的功能:here . 也许它会有所帮助 .

  • 1

    在其他答案中有很多细节,但是在我初学的时候,我被C#和F#之间的差异搞砸了 .

    对于代码应该如何运行,F#异步块是 recipe ,实际上并不是运行它的指令 .

    您可以构建您的配方,可能与其他配方(例如Async.Parallel)结合使用 . 只有这样你才能让系统运行它,你可以在当前线程(例如Async.StartImmediate)或新任务或其他各种方式上执行此操作 .

    所以这是你想做什么与谁应该做的脱钩 .

    C#模型通常被称为“热门任务”,因为任务是作为其定义的一部分为您启动的,而不是F#“冷任务”模型 .

  • 9

    let!Async.RunSynchronously 背后的想法是,有时你有一个异步活动,你需要结果才能继续 . 例如,"download a web page"函数可能没有同步等效函数,因此您需要某种方式来同步运行它 . 或者如果你有一个 Async.Parallel ,你可能会同时发生数百个任务,但是你希望它们在继续之前完成 .

    据我所知,你使用 Async.StartImmediate 的原因是你有一些计算需要在当前线程(可能是一个UI线程)上运行而不会阻塞它 . 它是否使用协同程序?我想你可以称之为,尽管.Net中没有一般的协程机制 .

    那么为什么 Async.Parallel 需要一个 Async<'T> 序列?可能是因为它是一种组合 Async<'T> 对象的方式 . 您可以轻松地创建自己的抽象,只使用普通函数(或普通函数和 Async 的组合,但它只是一个方便的功能 .

  • 0

    在异步块中,您可以进行一些同步操作和一些异步操作,因此,例如,您可能有一个网站,它将以多种方式显示用户的状态,因此您可以显示他们是否有即将到期的账单,生日快到了,家庭作业到期了 . 这些都不在同一个数据库中,因此您的应用程序将进行三次单独的调用 . 您可能希望并行进行调用,以便在最慢的调用完成后,您可以将结果放在一起并显示它,因此,最终结果将是显示基于最慢的显示 . 你不关心这些回来的顺序,你只想知道什么时候收到这三个 .

    要完成我的示例,您可能希望同步执行工作以创建UI以显示此信息 . 因此,最后,您希望获取此数据和UI显示,订单无关紧要的部分是并行完成的,订单的重要性可以同步方式完成 .

    您可以将这些作为三个线程执行,但是当第三个线程完成时您必须跟踪和取消原始线程,但是更多的工作,让.NET框架更容易处理这个问题 .

相关问题