Tag: system.reactive

等待IObservable以获取所有元素错误

我有这堂课: public class TestService { public IObservable GetObservable(int max) { var subject = new Subject(); Task.Factory.StartNew(() => { for (int i = 0; i < max; i++) { subject.OnNext(i); } subject.OnCompleted(); }); return subject; } } 我也为此写了一个测试方法: [TestMethod] public void TestServiceTest1() { var testService = new TestService(); var i = 0; var observable = testService.GetObservable(3); […]

Rx如何通过键对一个复杂的对象进行分组,然后在不“停止”流的情况下执行SelectMany?

这与我的其他问题有关。 James World提出了如下解决方案: // idStream is an IObservable of the input stream of IDs // alarmInterval is a Func that gets the interval given the ID var idAlarmStream = idStream .GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key))) .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key))); <编辑2: 问题:如何在不等待第一个事件到达的情况下立即启动计时器? 我猜这是我问题中的根本问题。 为此,我计划发送虚拟对象,其中包含我知道应该存在的ID。 但正如我在下面写的那样,我最终遇到了一些其他问题。 尽管如此,我认为解决这个问题也会很有趣。 然后转发其他有趣的部分! 现在,如果我想将一个复杂的对象分组如下,并按键分组,如下所示(不会编译) var idAlarmStream = idStream .Select(i => new { […]

如何将Observable序列化到云端并返回

我需要将处理序列(如本问题中如何使用.net RX组织数据处理器序列 )拆分为Azure环境中的多个计算单元。 我们的想法是将Observable序列序列化为Azure队列(或Service Bus)并将其反序列化。 如果生产者或消费者失败,其他方应该能够继续生产/消费。 任何人都可以建议一个优雅的方式来做这个和使用什么(Azure队列或服务总线)? 有没有人使用TCP Observable提供程序 – http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider这样的问题是否对一方的失败安全?

C#5 ReadAsync和迭代器

我试图将下面的类转换为懒惰返回一个文件。 public class ObservableFile2 : IObservable { private readonly IObservable subject; public ObservableFile2(string fileName) { subject = Observable.Using ( () => new StreamReader(new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read)), streamReader => ObserveLines(streamReader) ); } private IObservable ObserveLines(StreamReader streamReader) { return ReadLines(streamReader).ToObservable(); } private IEnumerable ReadLines(StreamReader streamReader) { while (!streamReader.EndOfStream) { yield return streamReader.ReadLine(); } } public IDisposable […]

Async Disposable.Create

Disposable.Create需要Action作为参数。 在处理Rx订阅时运行Action 。 在处理Rx订阅时,我想运行一些异步清理代码,但是使用带有Action async () =>与async void相同,我想避免使用它。 有关我为什么要避免这种情况的详细信息,请参阅此处 。 是否可以创建类似Disposable.AsyncCreate东西,它接受Func而不是Action 。 如果是这样,我应该如何将它作为CompositeDisposable一部分使用? 还是有其他模式来处理异步处置?

在Rx中是否有等效的Task.ContinueWith运算符?

在Rx中是否有等效的Task.ContinueWith运算符? 我在Silverlight上使用Rx,我使用FromAsyncPattern方法进行两次webservice调用,我想同步执行它们。 var o1 = Observable.FromAsyncPattern(client.BeginGetData, client.EndGetData); var o2 = Observable.FromAsyncPattern(client.BeginGetData, client.EndGetData); 是否只有在 o1返回Completed 后才会启动/订阅o2的运算符(如Zip)? 我以同样的方式处理任一Web服务调用失败。

将IEnumerable 转换为IObservable ,具有最大并行度

我有一系列异步任务要做(比如,获取N个网页)。 现在我想要的是将它们全部暴露为IObservable 。 我目前的解决方案使用了这个问题的答案: async Task GetPage(string page) { Console.WriteLine(“Before”); var result = await FetchFromInternet(page); Console.WriteLine(“After”); return result; } // pages is an IEnumerable IObservable resultObservable =pages.Select(GetPage). Select(t => Observable.FromAsync(() => t)).Merge(); // Now consume the list foreach(ResultObj obj in resultObservable.ToEnumerable()) { Console.WriteLine(obj.ToString()); } 问题是我不知道要获取的页面数量,而且可能很大。 我不想同时发出数百个请求。 所以我想要一种方法来限制并行执行的最大任务数。 有没有办法限制GetPage的并发调用GetPage ? 有一个Merge重载,它接受一个maxConcurrent参数,但它似乎并没有实际限制函数invokation的并发性。 控制台在After消息之前打印所有Before消息。 注意:我需要转换回IEnumerable 。 我正在编写一个系统的数据源,它给我提取数据的描述符,我需要给它一个下载数据的列表。

对缓冲的Observable进行排序

我有一批令牌非常快,而且处理器相对较慢。 令牌有三种子类型,我希望它们按优先级处理。 因此,我希望令牌在生成后进行缓冲,并等待处理并按优先级对缓冲区进行排序。 这是我的课程: public enum Priority { High = 3, Medium = 2, Low = 1 } public class Base : IComparable { public int Id { get; set; } public int CompareTo(Base other) { return Id.CompareTo(other.Id); } } public class Foo : Base { } public class Bar : Base { } public […]

自定义Rx运算符仅在存在最近值时才进行限制

我正在尝试创建一个看起来非常有用的Rx运算符,但令我惊讶的是没有发现任何与Stackoverflow完全匹配的问题。 我想在Throttle上创建一个变体,如果有一段时间不活动,可以立即通过值。 我想象的用例是这样的: 我有一个下拉列表,当值发生变化时,它会启动Web请求。 如果用户按住箭头键并快速循环显示值,我不想启动每个值的请求。 但是如果我限制流,那么每次用户只需从正常方式从下拉列表中选择一个值时,用户就必须等待节流持续时间。 因此,普通的Throttle看起来像这样: 我想创建如下所示的ThrottleSubsequent : 请注意,大理石1,2和6会毫不拖延地通过,因为它们各自都处于不活动状态。 我对此的尝试如下所示: public static IObservable ThrottleSubsequent(this IObservable source, TimeSpan dueTime, IScheduler scheduler) { // Create a timer that resets with each new source value var cooldownTimer = source .Select(x => Observable.Interval(dueTime, scheduler)) // Each source value becomes a new timer .Switch(); // Switch to the most […]

跟踪Observable中的(观察者数量)?

我有一个可观察的代表股票价格的流。 如果我的可观察序列上没有观察者,我希望能够从提供价格流的远程服务器断开连接,但我不希望这样做,直到每个观察者都调用Dispose()。 然后以类似的方式,当第一个人调用Subscribe时,我想重新连接到远程服务器。 有没有办法弄清楚有多少观察者在可观察量上调用了订阅? 或者也许是一种了解观察者何时调用Subscribe或Dispose的方法?