如何从单件服务调用方法在整个应用程序生命周期中运行

我已经在Net Core中实现了Kafka事件总线作为单例服务。 服务本身在Startup.cs中配置了Autofac。 该服务有一个Listen()方法:

 public void Listen() { using(var consumer = new Consumer(_config, null, new StringDeserializer(Encoding.UTF8))) { consumer.Subscribe(new string[] { "business-write-topic" }); consumer.OnMessage += (_, msg) => { Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); consumer.CommitAsync(msg); }; while (true) { consumer.Poll(100); } } } 

我的理解是,为了使这个方法在应用程序的生命周期内不断地监听消息,我必须通过某种方式从主机上调用ServicePro,然后获取与主机关联的ServiceProvider,然后检索一个实例。服务,并调用该方法。

我已将Program.cs从默认的Net Core 2.1模板配置为以下内容:

 public class Program { public static void Main(string[] args) { var host = CreateWebHost(args); host.Run(); } public static IWebHost CreateWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .UseStartup() .Build(); } 

除了主机可用,所以我可以以某种方式访问​​服务,我不知道从哪里去。 我已经搜索了类似的问题,并在官方文档中阅读,但我似乎无法弄清楚如何访问该服务,以便我可以调用Listen()方法。

这是实现目标的“首选”方式吗? 如果是这样,我该怎么办? 如果不是 – 那就是 – 如果这种任务通常以另一种方式完成,我该怎么做呢?

我建议使用IHostedService 。 IHostedService实现注册为单例,它们一直运行直到服务器关闭。

创建此基类

 public abstract class HostedService : IHostedService { private Task executingTask; private CancellationTokenSource cancellationTokenSource; public Task StartAsync(CancellationToken cancellationToken) { this.cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); this.executingTask = this.ExecuteAsync(this.cancellationTokenSource.Token); return this.executingTask.IsCompleted ? this.executingTask : Task.CompletedTask; } public async Task StopAsync(CancellationToken cancellationToken) { if (this.executingTask == null) { return; } this.cancellationTokenSource.Cancel(); await Task.WhenAny(this.executingTask, Task.Delay(-1, cancellationToken)); } protected abstract Task ExecuteAsync(CancellationToken cancellationToken); } 

然后创建使用者主机

 public class ConsumerHost : HostedService { protected override async Task ExecuteAsync(CancellationToken cancellationToken) { using (var consumer = new Consumer(_config, null, new StringDeserializer(Encoding.UTF8))) { consumer.Subscribe(new string[] {"business-write-topic"}); consumer.OnMessage += (_, msg) => { Console.WriteLine( $"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); consumer.CommitAsync(msg); }; while (!cancellationToken.IsCancellationRequested) // will make sure to stop if the application is being shut down! { consumer.Poll(100); await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken); } } } } 

现在,在ConfigureService方法的启动类中添加单例

 public void ConfigureServices(IServiceCollection services) { services.AddSingleton(); } 

此服务现在将在webhost完成构建时启动,并在关闭服务器时停止。 无需手动触发它,让webhost为您完成。

我认为BackgroundService就是您所需要的。

 public class ListnerBackgroundService : BackgroundService { private readonly ListnerService service; public ListnerBackgroundService(ListnerService service) { this.service = service; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { service.Listen(); return Task.CompletedTask; } } 

并注册:

 public void ConfigureServices(IServiceCollection services) { ... services.AddSingleton(); ... }