具有消息队列的任务处理状态

我正在研究一个产品数据导入系统,该系统从外部源下载产品数据,将其转换为正确的模式,并存储结果 – 实际上是一个ETL系统。 系统处理的核心消息类型是“ImportProductCommand”,它指定要导入的产品和源。 但是,导入命令很少单独发送。 典型的业务需求是从给定的源导入一整套产品。 目前,这表示为“ImportProductsCommand”消息,可以指定要导入的多个产品。 命令处理程序使用此消息,将其转换为单独的“ImportProductCommand”消息,并将它们发送到队列进行处理。 单个导入请求的使用者发布“ProductImportedEvent”或“ProductImportFailedEvent”。 收到“ImportProductsCommand”消息后,服务会为消息分配GUID标记,将消息放入队列,然后返回标记。 然后将令牌用作相关ID,以便可以将单个导入请求与批量导入请求相关联。 给定此基础结构,可以确定与给定令牌关联的事件数,从而确定导入产品或失败导入的数量。 缺少的是一个显式事件,表明批量导入已完成。 单个导入请求的处理程序未明确意识到它是批量导入请求的一部分。 当然,这可以通过了解要导入的产品数量以及通过计算与特定相关ID关联的导入事件的数量来推断。 当前实现利用消息队列系统来处理进程重新启动和失败,但对批量导入请求不太明确。 总的来说,系统需要回答的查询是:

  • 是否完成了给定的批量导入?
  • 给定批次导入剩余多少个别导入?
  • 完成了多少个别import?
  • 有多少是错的?

有哪些最佳实践或建议的方法来支持这些查询,并仍然利用消息排队系统来实现弹性? 目前,将它们联系在一起的是上面提到的令牌,但是没有明确的记录来表示批量导入请求实体,如果有,那么单个导入请求处理器需要知道这样的实体来更新地位相应。

所有这些都是使用C#,NServiceBus实现的,并作为IIS WCF应用程序托管。

这可以作为NServiceBus Saga实现。 ImportProductsCommand应由Saga(ImportProductsSaga )处理,Saga数据可以在发送ImportProductCommand时具有要导入的产品数。 ImportProductsSaga应该处理ProductImportedEventProductImportFailedEvent 。 在ImportProductsSaga中处理的每个事件中,增加ProductsImportedProdctsFailedToImport 。 同时检查(ProductsImported + ProdctsFailedToImport)的总和等于ProdctsToBeImport,如果是,则完成传奇。

ImportProductsSaga数据需要跟踪ImportProductCommand发送的No,并且收到回复并且您可以计算待处理的回复等Saga数据看起来如下所示:

public class ImportProductsSataData{ public Guid Id {get; set} public int ProdctsToBeImported {get; set} public int ProdctsImported {get; set} public int ProdctsFailedToImport {get; set} }