Tag: 反应式编程

react native扩展和重试

所以今天早上我的雷达上出现了一系列文章。 它从这个问题开始,这导致了 GitHub上的原始示例和源代码 。 我稍微重写了一遍,所以我可以在控制台和服务应用程序中开始使用它: public static class Extensions { static readonly TaskPoolScheduler Scheduler = new TaskPoolScheduler(new TaskFactory()); // Licensed under the MIT license with <3 by GitHub /// /// An exponential back off strategy which starts with 1 second and then 4, 8, 16… /// [SuppressMessage(“Microsoft.Security”, “CA2104:DoNotDeclareReadOnlyMutableReferenceTypes”)] public static readonly Func ExponentialBackoff = n […]

等待IObservable以获取所有元素错误

我有这堂课: public class TestService { public IObservable GetObservable(int max) { var subject = new Subject(); Task.Factory.StartNew(() => { for (int i = 0; i < max; i++) { subject.OnNext(i); } subject.OnCompleted(); }); return subject; } } 我也为此写了一个测试方法: [TestMethod] public void TestServiceTest1() { var testService = new TestService(); var i = 0; var observable = testService.GetObservable(3); […]

如何在Rx.NET中以正确的方式约束并发

请观察以下代码段: var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList(); 此代码的问题在于getResultAsync以不受约束的方式并发运行。 在某些情况下,这可能不是我们想要的。 假设我想将其并发限制为最多10个并发调用。 什么是Rx.NET方法呢? 我附上一个简单的控制台应用程序,演示了所描述问题的主题和我的蹩脚解决方案。 还有一些额外的代码,比如Stats类和人工随机睡眠。 它们用于确保我真正获得并发执行并且可以可靠地计算在此过程中达到的最大并发性。 RunUnconstrained方法演示了天真的,无约束的运行。 RunConstrained方法显示了我的解决方案,这不是很优雅。 理想情况下,我想通过简单地将专用Rx运算符应用于Monad来简化约束并发性。 当然,不会牺牲性能。 using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; using System.Threading; using System.Threading.Tasks; namespace RxConstrainedConcurrency { class Program { public class Stats { public int MaxConcurrentCount; public int CurConcurrentCount; public readonly object MaxConcurrentCountGuard […]

如何在长时间运行的查询中间扩展Throttle Timespan?

是否可以在查询中间扩展Throttle Timespan值? 例如,假设一个例子在101 Rx Samples Throttle中有这个查询var throttled = observable.Throttle(TimeSpan.FromMilliseconds(750)); 。 如果我想要改变它以便如果在前500毫秒期间没有事件,那么对于之后的每个事件,节流值将被扩展到例如1500毫秒。 这是一个使用Switch运算符的地方吗?

有没有办法将观察者订阅为异步

给定一个同步观察者,有没有办法做到这一点: observable.SubscribeAsync(observer); 并且observer所有方法都是异步调用的,或者是在创建观察者时我必须处理的东西?

使用Rx和SelectMany限制并发请求

我有一个我想要使用HttpClient同时下载的页面的URL列表。 URL列表可能很大(100或更多!) 我目前有这个代码: var urls = new List { @”http:\\www.amazon.com”, @”http:\\www.bing.com”, @”http:\\www.facebook.com”, @”http:\\www.twitter.com”, @”http:\\www.google.com” }; var client = new HttpClient(); var contents = urls .ToObservable() .SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute))); contents.Subscribe(Console.WriteLine); 问题是:由于SelectMany的使用,几乎同时创建了大量的任务。 似乎如果URL列表足够大,很多任务会给出超时(我得到“任务被取消”例外)。 因此,我认为应该有一种方法,可能使用某种调度程序,来限制并发任务的数量,在给定时间不允许超过5或6。 通过这种方式,我可以获得并发下载而无需启动太多可能会失速的任务,就像他们现在所做的那样。 如何做到这一点,我不会饱和大量的超时任务? 十分感谢。

Rx后退并重试

这基于此SO中提供的代码: 编写Rx“RetryAfter”扩展方法 我正在使用Markus Olsson的代码(目前仅进行评估),之前有人问我试图在Github上抓住Markus,但是我在工作的地方被封锁了,所以我觉得我唯一能做的就是问在这里。 很抱歉,如果这与任何人都很糟糕。 所以我在一个小的演示中使用以下代码: class Attempt1 { private static bool shouldThrow = true; static void Main(string[] args) { Generate().RetryWithBackoffStrategy(3, MyRxExtensions.ExponentialBackoff, ex => { return ex is NullReferenceException; }, Scheduler.TaskPool) .Subscribe( OnNext, OnError ); Console.ReadLine(); } private static void OnNext(int val) { Console.WriteLine(“subscriber value is {0} which was seen on threadId:{1}”, val, Thread.CurrentThread.ManagedThreadId); } […]

如何从rx订阅回调异步function?

我想在Rx订阅中回调一个异步函数。 就像这样: public class Consumer { private readonly Service _service = new Service(); public ReplaySubject Results = new ReplaySubject(); public void Trigger() { Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync()); } public Task RunAsync() { return _service.DoAsync(); } } public class Service { public async Task DoAsync() { return await Task.Run(() => Do()); } private static string […]