将异步 – 等待C#代码转换为与调度程序相关的F#

我想知道这是否是一个太广泛的问题,但最近我让自己遇到了一段代码,我想确定如何从C#转换为适当的F#。 旅程从这里开始(1) (TPL-F#交互的原始问题),并继续这里(2) (我正在考虑转换为F#的一些示例代码)。

示例代码在这里重现的时间太长,但有趣的函数是ActivateAsyncRefreshHubsAddHub 。 特别有趣的是

  1. AddHub具有private async Task AddHub(string address)的签名。
  2. RefreshHubs在一个循环中调用AddHub并收集一个tasks列表,然后通过await Task.WhenAll(tasks)它最终await Task.WhenAll(tasks) ,因此返回值与其private async Task RefreshHubs(object _)签名相匹配。
  3. RefreshHubsActivateAsync调用,就像await RefreshHubs(null) ,最后有一个调用await base.ActivateAsync()匹配函数签名public override async Task ActivateAsync()

题:

将这些函数签名正确转换为F#仍然保持接口和function并尊重默认的自定义调度程序是什么? 我也不太确定这个“在F#中异步/等待”。 至于如何“机械地”做到这一点。 🙂

原因是在链接“here(1)”中似乎存在问题(我没有validation过),因为F#异步操作不遵循(Orleans)运行时设置的自定义协作调度程序。 此外, 这里指出TPL操作逃脱调度程序并转到任务池,因此禁止使用它们。

我能想到解决这个问题的一种方法是使用F#函数,如下所示

 //Sorry for the inconvenience of shorterned code, for context see the link "here (1)"... override this.ActivateAsync() = this.RegisterTimer(new Func(this.FlushQueue), null, TimeSpan.FromMilliseconds(100.0), TimeSpan.FromMilliseconds(100.0)) |> ignore if RoleEnvironment.IsAvailable then this.RefreshHubs(null) |> Async.awaitPlainTask |> Async.RunSynchronously else this.AddHub("http://localhost:48777/") |> Async.awaitPlainTask |> Async.RunSynchronously //Return value comes from here. base.ActivateAsync() member private this.RefreshHubs(_) = //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience... //The return value is Task. //In the C# version the AddHub provided tasks are collected and then the //on the last line there is return await Task.WhenAll(newHubAdditionTasks) newHubs |> Array.map(fun i -> this.AddHub(i)) |> Task.WhenAll member private this.AddHub(address) = //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience... //In the C# version: //... //hubs.Add(address, new Tuple(hubConnection, hub)) //} //so this is "void" and could perhaps be Async in F#... //The return value is Task. hubConnection.Start() |> Async.awaitTaskVoid |> Async.RunSynchronously TaskDone.Done 

startAsPlainTask函数由Sacha Barber从这里开始 。 另一个有趣的选择可能在这里

 module Async = let AwaitTaskVoid : (Task -> Async) = Async.AwaitIAsyncResult >> Async.Ignore 

<编辑:我刚注意到Task.WhenAll也需要等待。 但是什么是正确的方法? 呃,是时候睡觉了(不好的双关语)……

<编辑2:在Codeplex中,在这里(1) (TPL-F#交互的原始问题),提到F#使用同步上下文将工作推送到线程,而TPL则没有。 现在,这是一个看似合理的解释,我觉得(虽然无论自定义调度程序如何,我仍然无法正确翻译这些代码段)。 一些有趣的附加信息可能来自

  • 如何获取使用SynchronizationContext的任务? 那么SynchronizationContext是如何使用的呢?
  • Await,SynchronizationContext和Console Apps,其中提供了一个示例SingleThreadSynchronizationContext ,它看起来像要执行的工作的队列。 也许应该用这个?

我想在这种情况下我需要提一下Hopac ,作为一个有趣的切线,并且还提到我在接下来的50个小时左右是不可及的,以防我的所有交叉发布都失控。

<编辑3 : Daniel和svick在评论中提供了很好的建议,以便使用自定义任务构建器。 Daniel提供了一个已经在FSharpx中定义的链接 。

查看源代码,我看到带参数的接口定义为

 type TaskBuilder(?continuationOptions, ?scheduler, ?cancellationToken) = let contOptions = defaultArg continuationOptions TaskContinuationOptions.None let scheduler = defaultArg scheduler TaskScheduler.Default let cancellationToken = defaultArg cancellationToken CancellationToken.None 

如果要在奥尔良使用它,看起来TaskScheduler应该是TaskScheduler.Current ,根据这里的文档

Orleans拥有自己的任务调度程序,它提供了谷物中使用的单线程执行模型。 在运行任务时使用Orleans调度程序非常重要,而不是.NET线程池。

如果您的谷物代码需要创建子任务,您应该使用Task.Factory.StartNew:

等待Task.Factory.StartNew(()=> {/ * logic * /});

该技术将使用当前的任务调度程序,它将是Orleans调度程序。

您应该避免使用始终使用.NET线程池的Task.Run,​​因此不会在单线程执行模型中运行。

它看起来在TaskScheduler.Current和TaskScheduler.Default之间存在细微差别。 也许这保证了一个问题,在哪种情况下会出现意想不到的差异。 由于奥尔良文档指出不使用Task.Run而是指导Task.Factory.StartNew ,我想知道是否应该定义TaskCreationOptions.DenyAttachChild,正如在任务.Run与Task.Factory上由Stephen Toub这样的权威机构推荐的那样。在StartNew的 StartNew和Stephen Cleary 是危险的 。 嗯,看起来.Default会是.DenyAttachChilld除非我弄错了。

此外,由于Task.Run Task.Factory.CreateNew有关自定义调度程序的问题,我想知道是否可以通过使用自定义TaskFactory删除此特定问题,如任务计划程序(Task.Factory)中所述并控制该数字线程和如何:创建限制并发的任务计划程序 。

嗯,这已经成为一个漫长的“思考”。 我想知道我应该怎么关闭它? 也许如果斯维克丹尼尔可以将他们的评论作为答案,我会同时投票并接受svick的

您可以在FSharpx中使用TaskBuilder并传入TaskScheduler.Current 。 这是我尝试翻译RefreshHubs 。 请注意,使用Task代替Task

 let RefreshHubs _ = let task = TaskBuilder(scheduler = TaskScheduler.Current) task { let addresses = RoleEnvironment.Roles.["GPSTracker.Web"].Instances |> Seq.map (fun instance -> let endpoint = instance.InstanceEndpoints.["InternalSignalR"] sprintf "http://%O" endpoint.IPEndpoint ) |> Seq.toList let newHubs = addresses |> List.filter (not << hubs.ContainsKey) let deadHubs = hubs.Keys |> Seq.filter (fun x -> not (List.exists ((=) x) addresses)) // remove dead hubs deadHubs |> Seq.iter (hubs.Remove >> ignore) // add new hubs let! _ = Task.WhenAll [| for hub in newHubs -> AddHub hub |] return () }