始终在Windows服务上运行线程
我正在编写一个Windows服务,它将启动多个工作线程,这些线程将监听Amazon SQS队列并处理消息。 将有大约20个线程监听10个队列。
线程必须始终运行,这就是为什么我倾向于实际使用实际线程作为工作循环而不是线程池线程。
这是一个顶级实现。 Windows服务将启动多个工作线程,每个线程都会监听它的队列和进程消息。
protected override void OnStart(string[] args) { for (int i = 0; i < _workers; i++) { new Thread(RunWorker).Start(); } }
这是工作的实施
public async void RunWorker() { while(true) { // .. get message from amazon sqs sync.. about 20ms var message = sqsClient.ReceiveMessage(); try { await PerformWebRequestAsync(message); await InsertIntoDbAsync(message); } catch(SomeExeception) { // ... log //continue to retry continue; } sqsClient.DeleteMessage(); } }
我知道我可以使用Task.Run执行相同的操作并在线程池线程上执行它而不是启动单个线程,但我没有看到原因,因为每个线程将始终运行。
你看到这个实现有什么问题吗? 保持线程始终以这种方式运行是多么可靠,我该怎么做才能确保每个线程始终在运行?
现有解决方案的一个问题是您以一种即发即new Thread(RunWorker).Start()
方式调用RunWorker
,尽管是在一个新线程(即new Thread(RunWorker).Start()
)上。
RunWorker
是一个async
方法,它将在执行点到达第一个await
时返回调用者(即await PerformWebRequestAsync(message)
)。 如果PerformWebRequestAsync
返回待处理任务,则RunWorker
将返回,您刚刚开始的新线程将终止。
我认为你根本不需要新的线程,只需使用AmazonSQSClient.ReceiveMessageAsync
并await
其结果。 另一件事是你不应该使用async void
方法,除非你真的不关心跟踪异步任务的状态。 请改用async Task
。
您的代码可能如下所示:
List _workers = new List (); CancellationTokenSource _cts = new CancellationTokenSource(); protected override void OnStart(string[] args) { for (int i = 0; i < _MAX_WORKERS; i++) { _workers.Add(RunWorkerAsync(_cts.Token)); } } public async Task RunWorkerAsync(CancellationToken token) { while(true) { token.ThrowIfCancellationRequested(); // .. get message from amazon sqs sync.. about 20ms var message = await sqsClient.ReceiveMessageAsync().ConfigureAwait(false); try { await PerformWebRequestAsync(message); await InsertIntoDbAsync(message); } catch(SomeExeception) { // ... log //continue to retry continue; } sqsClient.DeleteMessage(); } }
现在,要停止所有待处理的工作程序,您可以简单地执行此操作(从主“请求调度程序”线程):
_cts.Cancel(); try { Task.WaitAll(_workers.ToArray()); } catch (AggregateException ex) { ex.Handle(inner => inner is OperationCanceledException); }
注意,对于Windows服务, ConfigureAwait(false)
是可选的,因为默认情况下初始线程上没有同步上下文。 但是,我会保持这种方式使代码独立于执行环境(对于存在同步上下文的情况)。
最后,如果由于某种原因你不能使用ReceiveMessageAsync
,或者你需要调用另一个阻塞API,或者只是在 RunWorkerAsync
开始时做一些CPU密集型工作,只需用Task.Run
包装它(而不是包装整个RunWorkerAsync
):
var message = await Task.Run( () => sqsClient.ReceiveMessage()).ConfigureAwait(false);
好吧,对于一个我使用在服务中实例化的CancellationTokenSource
并传递给工人。 你的while语句将成为:
while(!cancellationTokenSource.IsCancellationRequested) { //rest of the code }
这样,您可以从OnStop
服务方法取消所有工作人员。
此外,你应该注意:
- 如果您正在使用线程外部的线程状态,则可能会抛出
ThreadStateException
或ThreadInterruptedException
或其他一个。 因此,您希望处理正确的线程重启。 - 工作人员是否需要在迭代之间不间断地运行? 我会在那里睡觉(甚至几毫秒),这样他们就不会让CPU一无所获。
- 您需要处理
ThreadStartException
并重新启动worker(如果发生)。
除此之外,没有理由为什么这10个踏板在服务运行的时间内无法运行(一天,几周,几个月)。