如何从服务结构actor获取状态而无需等待其他方法完成?

我有一个正在运行的服务,正在迭代X个actor,使用ActorProxy询问它们的状态。

对我来说很重要的是,这个服务不会被等待提醒回调中等待演员中其他一些长时间运行的方法。

有没有办法调用下面的简单示例GetState(),它允许方法正确完成,而不会阻止某些提醒运行。

class Actor : IMyActor{ public Task GetState() => StateManager.GetAsync("key") } 

alterntive。

什么是形成服务呼叫的正确方式,如果它不在5秒内回复,只是包含。

 var proxy = ActorProxy.Create<IMyActor(); var state = await proxy.GetState(); // this will wait until the actor is ready to send back the state. 

即使对于当前正在执行阻塞方法的Actors,也可以读取actor状态。 Actors使用IActorStateManager存储它们的状态, IActorStateManager又使用IActorStateProviderIActorStateProvider每个ActorService实例化一次。 每个分区都实例化负责托管和运行actor的ActorService 。 actor服务的核心是StatefulService (或者更确切地说是StatefulServiceBase ,它是常规有状态服务使用的基类)。 考虑到这一点,我们可以使用与我们使用常规服务相同的方式来满足我们的Actors的ActorService ,即使用基于IService的服务接口。

IActorStateProvider (如果您使用的是持久状态,由KvsActorStateProvider实现)有两个我们可以使用的方法:

 Task LoadStateAsync(ActorId actorId, string stateName, CancellationToken cancellationToken = null); Task> GetActorsAsync(int numItemsToReturn, ContinuationToken continuationToken, CancellationToken cancellationToken); 

对这些方法的调用不受actor的锁影响,这是有道理的,因为它们旨在支持分区上的所有actor。

例:

创建一个自定义的ActorService并使用它来托管你的actor:

 public interface IManyfoldActorService : IService { Task> GetCountsAsync(CancellationToken cancellationToken); } public class ManyfoldActorService : ActorService, IManyfoldActorService { ... } 

Program.Main注册新的ActorService:

 ActorRuntime.RegisterActorAsync( (context, actorType) => new ManyfoldActorService(context, actorType)).GetAwaiter().GetResult(); 

假设我们有一个简单的Actor,其中包含以下方法:

  Task IManyfoldActor.SetCountAsync(int count, CancellationToken cancellationToken) { Task.Delay(TimeSpan.FromSeconds(30), cancellationToken).GetAwaiter().GetResult(); var task = this.StateManager.SetStateAsync("count", count, cancellationToken); ActorEventSource.Current.ActorMessage(this, $"Finished set {count} on {this.Id.GetLongId()}"); return task; } 

它等待30秒(模拟长时间运行,阻塞,方法调用),然后将状态值"count"设置为int

在单独的服务中,我们现在可以为Actors调用SetCountAsync来生成一些状态数据:

  protected override async Task RunAsync(CancellationToken cancellationToken) { var actorProxyFactory = new ActorProxyFactory(); long iterations = 0; while (true) { cancellationToken.ThrowIfCancellationRequested(); iterations += 1; var actorId = iterations % 10; var count = Environment.TickCount % 100; var manyfoldActor = actorProxyFactory.CreateActorProxy(new ActorId(actorId)); manyfoldActor.SetCountAsync(count, cancellationToken).ConfigureAwait(false); ServiceEventSource.Current.ServiceMessage(this.Context, $"Set count {count} on {actorId} @ {iterations}"); await Task.Delay(TimeSpan.FromSeconds(3), cancellationToken); } } 

这种方法简单地循环不断地改变演员的价值。 (注意总共10个演员之间的相关性,延迟3秒和演员30秒的延迟。简单地设计这种方式以防止等待锁定的Actor调用的无限累积)。 每个调用也会执行即发即弃,因此我们可以在返回之前继续更新下一个actor的状态。 它是一段愚蠢的代码,它只是用这种方式来certificate这个理论。

现在在actor服务中,我们可以像这样实现GetCountsAsync方法:

  public async Task> GetCountsAsync(CancellationToken cancellationToken) { ContinuationToken continuationToken = null; var actors = new Dictionary(); do { var page = await this.StateProvider.GetActorsAsync(100, continuationToken, cancellationToken); foreach (var actor in page.Items) { var count = await this.StateProvider.LoadStateAsync(actor, "count", cancellationToken); actors.Add(actor.GetLongId(), count); } continuationToken = page.ContinuationToken; } while (continuationToken != null); return actors; } 

这使用底层的ActorStateProvider来查询所有已知的Actors(对于该分区),然后直接读取每个这样的状态’绕过’Actor并且不被actor的方法执行阻止。

最后一块,一些可以调用我们的ActorService并在所有分区上调用GetCountsAsync

  public IDictionary Get() { var applicationName = FabricRuntime.GetActivationContext().ApplicationName; var actorServiceName = $"{typeof(IManyfoldActorService).Name.Substring(1)}"; var actorServiceUri = new Uri($"{applicationName}/{actorServiceName}"); var fabricClient = new FabricClient(); var partitions = new List(); var servicePartitionList = fabricClient.QueryManager.GetPartitionListAsync(actorServiceUri).GetAwaiter().GetResult(); foreach (var servicePartition in servicePartitionList) { var partitionInformation = servicePartition.PartitionInformation as Int64RangePartitionInformation; partitions.Add(partitionInformation.LowKey); } var serviceProxyFactory = new ServiceProxyFactory(); var actors = new Dictionary(); foreach (var partition in partitions) { var actorService = serviceProxyFactory.CreateServiceProxy(actorServiceUri, new ServicePartitionKey(partition)); var counts = actorService.GetCountsAsync(CancellationToken.None).GetAwaiter().GetResult(); foreach (var count in counts) { actors.Add(count.Key, count.Value); } } return actors; } 

运行此代码现在将为我们提供10个演员,每33:d秒获得状态更新,每个演员每次都忙30秒。 每个actor方法返回时,Actor服务都会看到更新后的状态。

此示例中省略了一些内容,例如,当您在actor服务中加载状态时,我们应该防止超时。

没有办法做到这一点。 演员是单线程的。 如果他们正在等待在任何actor方法内完成的长时间运行工作,那么任何其他方法(包括来自外部的方法)都必须等待。