Tag: nservicebus

具有消息队列的任务处理状态

我正在研究一个产品数据导入系统,该系统从外部源下载产品数据,将其转换为正确的模式,并存储结果 – 实际上是一个ETL系统。 系统处理的核心消息类型是“ImportProductCommand”,它指定要导入的产品和源。 但是,导入命令很少单独发送。 典型的业务需求是从给定的源导入一整套产品。 目前,这表示为“ImportProductsCommand”消息,可以指定要导入的多个产品。 命令处理程序使用此消息,将其转换为单独的“ImportProductCommand”消息,并将它们发送到队列进行处理。 单个导入请求的使用者发布“ProductImportedEvent”或“ProductImportFailedEvent”。 收到“ImportProductsCommand”消息后,服务会为消息分配GUID标记,将消息放入队列,然后返回标记。 然后将令牌用作相关ID,以便可以将单个导入请求与批量导入请求相关联。 给定此基础结构,可以确定与给定令牌关联的事件数,从而确定导入产品或失败导入的数量。 缺少的是一个显式事件,表明批量导入已完成。 单个导入请求的处理程序未明确意识到它是批量导入请求的一部分。 当然,这可以通过了解要导入的产品数量以及通过计算与特定相关ID关联的导入事件的数量来推断。 当前实现利用消息队列系统来处理进程重新启动和失败,但对批量导入请求不太明确。 总的来说,系统需要回答的查询是: 是否完成了给定的批量导入? 给定批次导入剩余多少个别导入? 完成了多少个别import? 有多少是错的? 有哪些最佳实践或建议的方法来支持这些查询,并仍然利用消息排队系统来实现弹性? 目前,将它们联系在一起的是上面提到的令牌,但是没有明确的记录来表示批量导入请求实体,如果有,那么单个导入请求处理器需要知道这样的实体来更新地位相应。 所有这些都是使用C#,NServiceBus实现的,并作为IIS WCF应用程序托管。

使用TypesToScan()后,为什么NServiceBus配置中断

我有一个控制台应用程序,您可以在其中指定参数,根据指定的参数将加载各种处理程序。 例如: prgm.exe nyse prgm.exe nasdaq 目标是在我的代码中我有INyseHandlers和INasdaqHandlers并且在第一种情况下只加载任何扩展前者的处理程序,类似于后者的情况。 目标是让一个程序可以根据其运行方式收听各种或所有来源。 为实现这一目标,我已经设置了如上所述的接口。 然后在我的配置设置中: var configuration = new BusConfiguration(); configuration.InitializeStepBusConventions(); // extension method, not the problem // Load all the handlers specified in command line arguments if (!Args.Contains(“any”) && Args.Length != 0) { List handlersToLoad = new List(); foreach (var argument in Args) { Console.WriteLine(“Adding {0} subscribers to loaded […]

NServiceBus – 如何为接收者订阅的每种消息类型获取单独的队列?

我有以下情况: 因此,接收者订阅了两种事件:eventA和eventB。 NServiceBus为接收方(Receiver)创建队列,并将eventA和eventB类型的消息放入同一队列。 问题是,如果我可以配置NServiceBus为接收器的每种类型的事件使用单独的队列(ReceiverEventA和ReceiverEventB)? 或者我可以在单个进程中有两个接收器(并且每个接收器单独的队列)。 事实上,EventA需要比EventB花费更长的时间,并且它们是独立的 – 所以如果它们在不同的队列中,它们可以同时处理。 更新:如果我采用这样的天真方法,接收器无法启动null引用exception: private static IBus GetBus() { var bus = Configure.With(new List { typeof(THandler), typeof(TEvent), typeof(CompletionMessage) }) .Log4Net() .DefaultBuilder() .XmlSerializer() .MsmqTransport() .IsTransactional(true) .PurgeOnStartup(false) .UnicastBus() .LoadMessageHandlers() .ImpersonateSender(false); bus.Configurer.ConfigureProperty(x => x.InputQueue, “Queue” + typeof(THandler).Name); return bus.CreateBus().Start(); } [STAThread] static void Main() { Busses = new List { GetBus(), GetBus() }; […]

NServiceBus警告“无法找到消息类型的处理程序”

我是NServiceBus的新手,我正在尝试开发一个发布者和单独的订阅者(我正在使用v3.2.0.0),到目前为止,它的工作正常 – 发布者和订阅者都在NServiceBus Host中运行。 我的消息全部发布正常但间歇性地不会被订阅者接收,发布者显示以下错误: 2012-09-05 14:27:37,491 [Worker.6] WARN NServiceBus.Unicast.UnicastBus [(null)] – No handlers could be found for message type: MyNamespace.MyMessage 但是,对于所有消息,此警告不会出现,因此,如果我在消息之后继续发布消息,我可能会看到其中一半显示消息,因此订阅者不会接收消息,尽管所有消息都出现在MSMQ队列中。 我承认我很难掌握这个,所以我的一些代码到目前为止可能完全是垃圾! 我按如下方式向NSB发布消息,消息输入是我定义的几种不同类型之一: private void Publish(T message) { var myBus = Configure.Instance.Builder.Build(); myBus.Publish(message); } 发布者的EndpointConfig如下: [EndpointName(“MyQueue”)] public class EndpointConfig : IConfigureThisEndpoint, AsA_Publisher, IWantCustomInitialization { /// /// Initialisation for NServiceBus. /// public void Init() { […]

NServicebus与文件系统观察器

我希望我的端点在检测到特定文件夹中的文件被删除时发送一个事件。 我能够通过使用实现IWantToRunWhenBusStartsAndStops的类来实现它,后者又设置了一个FileSystemWatcher来监视给定的文件夹。 我的问题是,这是使用nservicebus进行此操作的最佳方式,还是我错过了可能导致我遇到麻烦的问题? 这是我的代码: public class FileSystem : IWantToRunWhenBusStartsAndStops { private FileSystemWatcher watcher; public void Start() { ConfigFileWatcher(); } public void Stop() { } [PermissionSet(SecurityAction.Demand, Name = “FullTrust”)] private void ConfigFileWatcher() { watcher = new FileSystemWatcher(); watcher.Path = @”c:\”; /* Watch for changes in LastAccess and LastWrite times, and the renaming of files or directories. […]

NServiceBus延迟消息处理

我有一个NServiceBus应用程序,由于某些外部事件没有发生,可能无法处理给定的消息。 因为这个其他事件不是NSB事件,所以我无法正确实施传奇。 但是,不是仅仅重新排队消息(这会导致循环直到发生外部事件),而是将消息包装在另一个消息(DelayMessage)中并对其进行排队。 DelayMessage由不同的服务获取并放置在数据库中,直到重试间隔到期。 此时,延迟服务在原始队列上重新排队消息,以便进行另一次尝试。 但是,如果外部事件仍然没有发生,这种情况可能不止一次发生,而且即使从未发生这种情况,我也希望限制消息的往返次数。 这意味着DelayMessage具有MaxRetries属性,但是当延迟服务将原始消息排队以进行重试时,该属性会丢失。 我还缺少哪些其他选择? 我很高兴接受这个问题有一个完全不同的解决方案。

Guid导致格式exception

我正在尝试设置RavenDb 3.5和NServiceBus 6.在我输入我在NServiceBus端点中设置的传奇之后,我输入了一个处理程序。 完成此处理程序后,我收到此错误: System.FormatException:Guid应包含32位数字,包含4个破折号(xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx)。 我的代码: public static class AutoFacConfig { public static IContainer ConfigureAutofac() { var builder = new ContainerBuilder(); var resourceManagerId = new Guid(“6c9abcbb-c7ca-4a67-a149-5142f633f535”); var dtcRecoveryBasePath = Environment.GetFolderPath(Environment.SpecialFolder.CommonApplicationData); var recoveryPath = Path.Combine(dtcRecoveryBasePath, “NServiceBus.RavenDB”, resourceManagerId.ToString()); builder.Register(x => { var store = new DocumentStore { ConnectionStringName = “RavenDB”, ResourceManagerId = resourceManagerId, TransactionRecoveryStorage = new LocalDirectoryTransactionRecoveryStorage(recoveryPath) […]

可以安全地在IIS中托管NServiceBus发布者吗?

我想知道在IIS中托管作为事件发布者的NServiceBus端点是否安全? 为了澄清,我们使用IIS中托管的应用程序作为我们的CRM系统(Microsoft Dynamics CRM),并且我希望在更新联系人信息时使用NServiceBus发布事件。 MS CRM允许使用自定义插件来响应联系人更新,我打算创建一个插件,通过NServiceBus发布“ContactUpdated”事件。 因此,这实际上意味着我的NServiceBus事件发布端点托管在IIS中。 现在,我知道自托管的NServiceBus端点将创建自己的工作线程,以监视来自队列的传入消息(在本例中为订阅消息)。 因为如果没有更多的传入Web请求,IIS可以自由卸载工作进程,因此使用IIS来托管长时间运行的进程通常不是一个好主意 。 但是,我会说NServiceBus队列监视线程不符合长时间运行的进程,因为它不进行任何处理并且可以随时停止:新的订阅请求将简单地排队,直到Web应用程序为止。再次重启。 我只是想知道从NServiceBus的角度来看,IIS清理这个NServiceBus线程的方式是否安全? (顺便说一下,我也发现了这篇文章,但我不得不承认只有Scaling out论证才能引起我的共鸣,这与我们的情况无关。)

nservicebus并发访问saga数据

我使用NServiceBus作为企业服务总线,在我正在开发的解决方案中,我有一个编排服务,它从所有客户端应用程序接收多达10k条消息。 我想提高架构性能,从而增强所提供的解决方案。 我使用Saga Data类,我想在所有工作线程之间共享对它的访问(到目前为止我只设置一个线程但我想设置至少10个),当多个线程尝试访问时会发生什么同样的传奇? NSB是否已经提供了这样的并发function? 我必须自己实施吗? 如果后者有任何提示? 提前致谢

将NServiceBus与Asp.Net MVC 2一起使用

有没有办法在Asp.Net MVC 2中使用NServiceBus? 我想从Asp.Net MVC2应用程序向服务发送请求消息,该服务处理消息并回复响应消息。 有没有办法清楚地做到这一点?