用于馈线应用的multithreading架构
这是我在这里的第一篇文章,如果结构不合理,请道歉。
我们的任务是设计一个工具,它将:
- 读取(帐户ID)文件,CSV格式
- 从Web下载每个帐户的帐户数据文件(通过Id)(REST API)
- 将文件传递给将生成报告的转换器(财务预测等)[~20ms]
- 如果预测阈值在限制范围内,运行解析器来分析数据[400ms]
- 为上面的分析生成报告[80ms]
- 将生成的所有文件上传到Web(REST API)
现在所有这些个点都相对容易。 我很想知道如何最好地构建一些东西来处理这个并在我们的硬件上快速有效地完成它。
我们必须处理大约2百万个账户。 方括号可以了解每个流程平均需要多长时间。 我想使用机器上可用的最大资源–24核Xeon处理器。 这不是一个内存密集型过程。
使用TPL并将每个作为一项任务创建是一个好主意吗? 每个都必须按顺序发生,但许多可以一次完成。 不幸的是,解析器不是multithreading感知的,我们没有源(它本质上是我们的黑盒子)。
我的想法是这样的 – 假设我们正在使用TPL:
- 加载帐户数据(基本上是CSV导入或SQL SELECT)
- 对于每个帐户(Id):
- 下载每个帐户的数据文件
- 继续使用数据文件,发送到转换器
- ContinueWith check threshold,发送给解析器
- ContinueWith生成报告
- ContinueWith上传输出
这听起来可行还是我没有正确理解? 以不同的方式分解步骤会更好吗?
我有点不确定如何处理解析器抛出exception的问题(它非常挑剔)或者当我们上传失败时。
所有这些都将在预定的作业中,作为控制台应用程序在下class后运行。
我会考虑使用某种消息总线。 因此,您可以单独执行这些步骤,如果一个步骤不起作用(例如,因为REST服务在一段时间内无法访问),您可以存储该消息以便稍后处理它们。
根据您用作消息总线的内容,您可以使用它引入线程。
在我看来,如果你有像服务总线这样的更高级别的抽象,你可以更好地设计工作流程,处理exception状态等等。
另外,这些部件可以独立运行,它们不会相互阻挡。
一种简单的方法是使用Redis ServiceBus的servicestack消息传递 。
引用的一些优点:
基于消息的设计允许更容易的并行化和内省计算
DLQ消息可以在服务器更新后进行内省,修复并稍后重播,并重新加入正常的消息工作流程
我认为在你的情况下从多个线程开始的简单方法是将每个帐户id的整个操作放在一个线程中 (或者更好,在ThreadPool中 )。 在下面提出的方法中,我认为您不需要控制线程间操作。
这样的东西将数据放在线程池队列中 :
var accountIds = new List(); foreach (var accountId in accountIds) { ThreadPool.QueueUserWorkItem(ProcessAccount, accountId); }
这是您将处理每个帐户的function:
public static void ProcessAccount(object accountId) { // Download the data file for this account // ContinueWith using the data file, send to the converter // ContinueWith check threshold, send to parser // ContinueWith Generate Report // ContinueWith Upload outputs }