Tag: apache kafka

卡夫卡消费者启动延迟融合了dotnet

在启动汇合点网络消费者时,在调用订阅和后续轮询之后,似乎需要很长时间才能从服务器接收“分配已分配”事件,因此消息(大约10-15秒)。 起初我认为有一个自动主题创建开销,但无论消费者的主题/消费者群体是否已经存在,时间都是相同的。 我使用此配置启动我的使用者,其余代码与汇合的高级消费者示例中的相同: var kafkaConfig = new Dictionary { {“group.id”, config.ConsumerGroup}, {“statistics.interval.ms”, 60000}, {“fetch.wait.max.ms”, 10}, {“bootstrap.servers”, config.BrokerList}, {“enable.auto.commit”, config.AutoCommit}, {“socket.blocking.max.ms”,1}, {“fetch.error.backoff.ms”,1 }, {“socket.nagle.disable”,true }, {“auto.commit.interval.ms”, 5000}, { “default.topic.config”, new Dictionary() { {“auto.offset.reset”, “smallest”} } } }; kafka群集由具有默认设置的远程数据中心中的3台中低端规格机器组成。 是否有可以调整的代理或客户端设置以降低启动时间? 编辑:在启动时间约为2秒的情况下,自己使用Assign而不是Subscribe结果分配分区

如何从单件服务调用方法在整个应用程序生命周期中运行

我已经在Net Core中实现了Kafka事件总线作为单例服务。 服务本身在Startup.cs中配置了Autofac。 该服务有一个Listen()方法: public void Listen() { using(var consumer = new Consumer(_config, null, new StringDeserializer(Encoding.UTF8))) { consumer.Subscribe(new string[] { “business-write-topic” }); consumer.OnMessage += (_, msg) => { Console.WriteLine($”Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}”); consumer.CommitAsync(msg); }; while (true) { consumer.Poll(100); } } } 我的理解是,为了使这个方法在应用程序的生命周期内不断地监听消息,我必须通过某种方式从主机上调用ServicePro,然后获取与主机关联的ServiceProvider,然后检索一个实例。服务,并调用该方法。 我已将Program.cs从默认的Net Core 2.1模板配置为以下内容: public class Program { public static void […]