Tag: system.reactive

我应该在Reactive Extensions(Rx)Subject 上调用Dispose

我使用Reactive Extensions(Rx)Subject作为C#事件的直接替代品,如下所示: public class MyClass { private Subject subject; public IObservable WhenSomethingHappened { get { return this.subject.AsObservable(); } } private void OnSomethingHappened(string something) { this.subject.OnNext(something); } } 请注意,我从未在我的主题上调用OnCompleted。 MyClass应该实现IDisposable并调用this.subject.Dispose吗? 这意味着使用Subject的任何实现都应该实现IDisposable。 我问的原因是IDisposable模式有点像疾病,如果有一件事实现它,使用它的一切都必须实现它。

为什么这个Observable.Generate重载会导致内存泄漏?

在我的机器上大约10秒后,以下Rx.NET代码将消耗大约500 MB的内存。 var stream = Observable.Range(0, 10000) .SelectMany(i => Observable.Generate( 0, j => true, j => j + 1, j => new { N = j }, j => TimeSpan.FromMilliseconds(1))); stream.Subscribe(); 如果我在没有Func参数的情况下使用Observable.Generate重载Func我的内存使用量平均为35 MB。 var stream = Observable.Range(0, 10000) .SelectMany(i => Observable.Generate( 0, j => true, j => j + 1, j => new { N […]

Rx如何并行化长时间运行的任务?

我有以下代码片段,它枚举了一些xml的元素(从svn log –xml …进程的输出中读取),然后为每个xml元素运行一个长时间运行的方法。 var proc = Process.Start(svnProcInfo); var xml = XDocument.Load(proc.StandardOutput); var xElements = xml.Descendants(“path”) .ToObservable() //.SubscribeOn(ThreadPoolScheduler.Instance) .Select(descendant => return LongRunning(descendant)); xElements //.SubscribeOn(NewThreadScheduler.Default) .Subscribe(result => Console.WriteLine(result); Console.ReadKey(); LongRunning方法并不重要,但在其中我记录了它运行的线程。 我们假设它运行一整秒。 我的问题是,取消注释SubscribeOn()行没有任何效果。 对LongRunning的调用是顺序的,并且每隔一秒发生在同一个线程上(尽管与主(初始)线程不同)。 这是一个控制台应用程序。 我是Rx的新手。 我错过了什么? 编辑: 在尝试了Lee Campbell的回答之后,我注意到了另一个问题。 Console.Error.WriteLine(“Main thread ” + Thread.CurrentThread.ManagedThreadId); var xElements = xml.Descendants(“path”).ToObservable() //.ObserveOn(Scheduler.CurrentThread) .SelectMany(descendant => Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default)) .Subscribe(result => Console.WriteLine( “Result […]

react native扩展和重试

所以今天早上我的雷达上出现了一系列文章。 它从这个问题开始,这导致了 GitHub上的原始示例和源代码 。 我稍微重写了一遍,所以我可以在控制台和服务应用程序中开始使用它: public static class Extensions { static readonly TaskPoolScheduler Scheduler = new TaskPoolScheduler(new TaskFactory()); // Licensed under the MIT license with <3 by GitHub /// /// An exponential back off strategy which starts with 1 second and then 4, 8, 16… /// [SuppressMessage(“Microsoft.Security”, “CA2104:DoNotDeclareReadOnlyMutableReferenceTypes”)] public static readonly Func ExponentialBackoff = n […]

我对失去活动的扩展计时器缺少什么?

我有这个: watchers .ToObservable() // needs to be observable .SelectMany(watcher => // working on each watcher Observable // create a timer for the watcher .Timer(watcher.StartTime, TimeSpan.FromHours(watcher.Interval)) .SelectMany(Observable.FromAsync( async () => new { watcher, result = await CheckFolder(watcher.Path) }))) .Subscribe(x => Console.WriteLine(string.Format(“Watcher: {0}\tResult: {1}\tTime: {2}”, x.watcher.Name, x.result, DateTimeOffset.Now))); // tell everyone what happened. 这篇文章中的一小段代码让我开始走这条路。 目标是在每次Timer发布时,根据给定的开始时间和间隔ping Web服务(通过CheckFolder()方法)。 麻烦的是,每次运行程序时,它都会为第一个Watcher输出一条消息,然后程序退出而不会出错。 […]

Observable.Retry无法按预期工作

我有一系列使用异步方法处理的数字 。 我正在模拟可能失败的远程服务调用 。 如果失败,我想重试,直到电话成功。 问题是,对于我正在尝试的代码, 每次在异步方法中抛出exception时,序列似乎永远都会挂起 。 你可以用这个简单的代码片段测试它(在LINQPad中测试) Random rnd = new Random(); void Main() { var numbers = Enumerable.Range(1, 10).ToObservable(); var processed = numbers.SelectMany(n => Process(n).ToObservable().Retry()); processed.Subscribe( f => Console.WriteLine(f)); } public async Task Process(int n) { if (rnd.Next(2) == 1) { throw new InvalidOperationException(); } await Task.Delay(2000); return n*10; } 它应该处理每个元素,重试那些失败的元素。 相反,它永远不会结束,我不知道为什么。 […]

使用reative扩展在linqpad中恢复数据库时显示进度

我有以下C#代码。 var databaseRestore = new Microsoft.SqlServer.Management.Smo.Restore(); //databaseRestore.PercentComplete += CompletionStatusInPercent; //databaseRestore.PercentCompleteNotification = 10; //databaseRestore.Complete += Restore_Completed; … var complete = Observable .FromEventPattern(databaseRestore, “Complete”) .Select(x=>x.EventArgs as ServerMessageEventArgs) .Select(x=>x.Error.Message) .Take(1) .DumpLive(“Complete”); var percentComplete = Observable .FromEventPattern(databaseRestore, “PercentComplete”) .Select(x=>x.EventArgs as PercentCompleteEventArgs) .Select(x=>x.Percent) .TakeUntil(complete) .DumpLive(“PercentComplete”); … databaseRestore.SqlRestore(server); 如果我运行它,首先来自处理程序的输出(如果我取消注释它们)。 然后,首先,Linqpad显示“Live Observables”结果选项卡 “完整”可观察,已完成并带有最终结果消息。 “PercentComplete”,仍在等待“ – ”(但没有条目?) 我想要的只是远离使用react nativeextenxions的事件。 首先应该通过实际进度更新“PercentComplete”可观察量。 然后使用最终消息“完成”。 问题是:如何正确设置可观察量?

ValveSubject:Rx的排队主题,具有内置缓冲,打开/关闭操作

我经常遇到需要某种阀门构造来控制反应管道流动的情况。 通常,在基于网络的应用程序中,我需要根据连接状态打开/关闭请求流。 该阀门主体应支持打开/关闭流,并按FIFO顺序输出交付。 阀门关闭时,应缓冲输入值。 ConcurrentQueue或BlockingCollection通常用于此类场景,但会立即将线程引入图片中。 我一直在寻找这个问题的纯粹react native解决方案。

从Action 获取IObservable 的最佳方法

我有一个方法void OnAction(Action callback) ,我想使用反应式扩展(Rx)从中创建一个IObservable 。 我找到了两个可以帮助我的方法: Observable.FromEvent()和Observable.Start() : var observable = Observable.Start(() => { Person person = null; _mngr.OnAction(p => person = p); return person; }); 和: var observable = Observable.FromEvent( action => _mngr.OnAction(action), //Add Handler action => // Remove Handler { }); 第一个有一个闭包,我必须评估if person != null : var foo= observable.Where(p => { if(p!=null) //… […]

如何将Observable.FromEvent与静态事件一起使用?

我正在尝试使用Reactive Extensions编写代码来处理异步调用,其中启动方法和已完成事件都是静态的。 我不能用 var languageSetsLoaded = Observable .FromEvent( LanguageManager, “LanguageSetsLoaded”) 因为LanguageManager是一个静态类而不是一个实例,所以我试过了 var languageSetsLoaded = Observable .FromEvent( h => LanguageManager.LanguageSetsLoaded += h, h => LanguageManager.LanguageSetsLoaded -= h ) 但是这会产生语法错误“无法将lambda表达式转换为类型’对象’,因为它不是委托类型”。 事件就这样宣布了 public delegate void LoadLanguageSetsCompletedHandler(LoadLanguageSetsCompletedEventArgs e); public static event LoadLanguageSetsCompletedHandler LanguageSetsLoaded = delegate { }; 所以我认为它是一个委托类型,也许它是静态的这一事实产生了误导性的错误描述。 他们是否有某种方法可以做到这一点,或者我只是要坚持这样的非react native代码? private void ChangeLanguage(string languageCode) { LanguageManager.LanguageSetsLoaded += OnLanguageSetsLoaded; LanguageManager.LoadLanguageSets(languageCode, […]