如何通过InMemoryTestHarness成功驱动MassTransitStateMachine?

接下来: 如何编写MassTransitStateMachineunit testing?

这是一个简单的测试类(使用MS测试),用于一个名为ProcedureStateMachine的简单状态机(注意:对于我们来说,这不是一个真正的生产状态机……这只是我过去常常使用MassTransitStateMachine进行的实验..它似乎是一个方便的自成一体的地方,试验进行unit testing也是如此):

 [TestClass] public class ProcedureStateMachineTests { private ProcedureStateMachine _machine; private InMemoryTestHarness _harness; private StateMachineSagaTestHarness _saga; [TestInitialize] public void SetUp() { _machine = new ProcedureStateMachine(); _harness = new InMemoryTestHarness(); _saga = _harness.StateMachineSaga(_machine); _harness.Start().Wait(); } [TestCleanup] public void TearDown() { _harness.Stop().Wait(); } [TestMethod] public async Task That_Can_Start() { // Arrange // Act await _harness.InputQueueSendEndpoint.Send(new BeginProcessing { ProcedureId = Guid.NewGuid(), Steps = new List {"A", "B", "C" } }); // Assert var sagaContext = _saga.Created.First(); sagaContext.Saga.RemainingSteps.ShouldHaveCountOf(2); } } 

这是状态机类本身:

 public class ProcedureStateMachine : MassTransitStateMachine { public State Processing { get; private set; } public State Cancelling { get; private set; } public State CompleteOk { get; private set; } public State CompleteError { get; private set; } public State CompleteCancelled { get; private set; } public Event Begin { get; private set; } public Event StepDone { get; private set; } public Event Cancel { get; private set; } public Event Finalize { get; private set; } public ProcedureStateMachine() { InstanceState(x => x.CurrentState); Event(() => Begin); Event(() => StepDone); Event(() => Cancel); Event(() => Finalize); BeforeEnterAny(binder => binder .ThenAsync(context => Console.Out.WriteLineAsync( $"ENTERING STATE [{context.Data.Name}]"))); Initially( When(Begin) .Then(context => { context.Instance.RemainingSteps = new Queue(context.Data.Steps); }) .ThenAsync(context => Console.Out.WriteLineAsync( $"EVENT [{nameof(Begin)}]: Procedure [{context.Data.ProcedureId}] Steps [{string.Join(",", context.Data.Steps)}]")) .Publish(context => new ExecuteStep { ProcedureId = context.Instance.CorrelationId, StepId = context.Instance.RemainingSteps.Dequeue() }) .Publish(context => new SomeFunMessage { CorrelationId = context.Data.CorrelationId, TheMessage = $"Procedure [{context.Data.CorrelationId} has begun..." }) .TransitionTo(Processing) ); During(Processing, When(StepDone) .Then(context => { if (null == context.Instance.AccumulatedResults) { context.Instance.AccumulatedResults = new List(); } context.Instance.AccumulatedResults.Add( new StepResult { CorrelationId = context.Instance.CorrelationId, StepId = context.Data.StepId, WhatHappened = context.Data.WhatHappened }); }) .ThenAsync(context => Console.Out.WriteLineAsync( $"EVENT [{nameof(StepDone)}]: Procedure [{context.Data.ProcedureId}] Step [{context.Data.StepId}] Result [{context.Data.WhatHappened}] RemainingSteps [{string.Join(",", context.Instance.RemainingSteps)}]")) .If(context => !context.Instance.RemainingSteps.Any(), binder => binder.TransitionTo(CompleteOk)) .If(context => context.Instance.RemainingSteps.Any(), binder => binder.Publish(context => new ExecuteStep { ProcedureId = context.Instance.CorrelationId, StepId = context.Instance.RemainingSteps.Dequeue() })), When(Cancel) .Then(context => { context.Instance.RemainingSteps.Clear(); }) .ThenAsync(context => Console.Out.WriteLineAsync( $"EVENT [{nameof(Cancel)}]: Procedure [{context.Data.ProcedureId}] will be cancelled with following steps remaining [{string.Join(",", context.Instance.RemainingSteps)}]")) .TransitionTo(Cancelling) ); During(Cancelling, When(StepDone) .Then(context => { context.Instance.SomeStringValue = "Booo... we cancelled..."; }) .ThenAsync(context => Console.Out.WriteLineAsync( $"EVENT [{nameof(StepDone)}]: Procedure [{context.Data.ProcedureId}] Step [{context.Data.StepId}] completed while cancelling.")) .TransitionTo(CompleteCancelled)); During(CompleteOk, When(Finalize).Finalize()); During(CompleteCancelled, When(Finalize).Finalize()); During(CompleteError, When(Finalize).Finalize()); // The "SetCompleted*" thing is what triggers purging of the state context info from the store (eg. Redis)... without that, the // old completed state keys will gradually accumulate and dominate the Redis store. SetCompletedWhenFinalized(); } } 

调试此测试时, _harness在其Sent集合中具有BeginProcessing消息,但_saga.Created集合中没有任何_saga.Created 。 看起来我错过了一些管道,导致线束在发送消息时实际驱动状态机?

====

SetUp()TearDown()删除.Wait()调用并将测试更新为以下内容不会更改行为:

  [TestMethod] public async Task That_Can_Start() { try { await _harness.Start(); // Arrange // Act await _harness.InputQueueSendEndpoint.Send(new BeginProcessing { ProcedureId = Guid.NewGuid(), Steps = new List {"A", "B", "C"} }); // Assert var sagaContext = _saga.Created.First(); sagaContext.Saga.RemainingSteps.ShouldHaveCountOf(3); } finally { await _harness.Stop(); } } 

事实certificate,如上所示的测试代码遇到了_harness.InputQueueSendEndpoint.Send操作与StateMachineSagaTestHarness一些异步(超出Send等待的等待)行为之间的竞争条件。 结果,测试代码的“断言”阶段在创建传奇之前执行,并允许处理发送的消息。

深入研究SagaTestHarness代码,我找到了一些帮助方法,我可以使用这些方法等到saga上的某些条件得到满足。 方法是:

 ///  /// Waits until a saga exists with the specified correlationId ///  ///  ///  ///  public async Task Exists(Guid sagaId, TimeSpan? timeout = null) ///  /// Waits until at least one saga exists matching the specified filter ///  ///  ///  ///  public async Task> Match(Expression> filter, TimeSpan? timeout = null) ///  /// Waits until the saga matching the specified correlationId does NOT exist ///  ///  ///  ///  public async Task NotExists(Guid sagaId, TimeSpan? timeout = null) 

所以我决定使用像await _saga.Match(s => null != s.RemainingSteps);这样的东西await _saga.Match(s => null != s.RemainingSteps); 这样可以有效地复制我以后的断言并等待直到超时(默认为30秒)或后来断言的条件变为真(因此可以安全地断言)..以先到者为准。

这将让我不知所措,直到我能够想出一个更好的方法来了解线束何时被“赶上”并准备好被审问。