Tag: system.reactive

使用Rx Framework使用void AsyncMethod(Action 回调)模式进行异步调用

我已经看到很多关于如何在Rx框架中使用Observable.FromAsyncPattern()来简化异步调用的示例,但我使用的是不使用IAsyncResult的标准异步模式的接口BeginXXX / EndXXX(IAsyncResult) ,所以这对我不起作用。 我正在使用的库使用回调模式公开异步函数: void GetAllObjects(Action<List> callback) 在一个理想的世界里,我想转此: var isLoadingUsers = true; var isLoadingSystems = true; var isLoadingCustomers = true; var isLoadingRules = true; mClient.GetAllUsers(UsersCallback); mClient.GetAllCustomers(CustomersCallback); mClient.GetAllRules(RulesCallback); // set the IsLoadingXXX variables to false in callbacks // once all are false mClient.GetAllSystems(SystemsCallback); 变成这样的东西: var o = Observable.ForkJoin( Observable.Start(GetAllUsers()), Observable.Start(GetAllCustomers()), Observable.Start(GetAllRules()) ).Finally(() => GetAllSystems); 如何将该模式转变为返回IObservable的模式?

什么是使用具有单个并发执行限制的Rx在c#中运行周期性任务的好方法?

我希望运行周期性任务,其限制是在任何给定时间最多只执行一次方法。 我正在尝试使用Rx,但我不确定一旦并发限制,最多如何强加。 var timer = Observable.Interval(TimeSpan.FromMilliseconds(100)); timer.Subscribe(tick => DoSomething()); 此外,如果任务仍在运行,我希望后续计划过去。 即我不希望任务排队并导致问题。 我有2个这样的任务定期执行。 正在执行的任务当前是同步的。 但是,如果有必要,我可以让它们异步。

如何使用Observable实现轮询?

我有一个参数化的rest调用应该每隔五秒用不同的参数执行: Observable restCall = api.method1(param1); 我需要创建一个Observable ,它将使用param1的不同值每5秒轮询一次restCall。 如果api调用失败,我需要收到错误并在5秒内进行下一次调用。 应仅在restCall完成时(成功/错误)测量调用之间的间隔。 我目前正在使用RxJava,但.NET示例也会很好。

Rx和任务 – 在生成新任务时取消运行任务?

我有一个我想用Rx处理的用户交互场景。 该场景类似于规范“当用户停止输入,做一些工作”(通常,搜索用户到目前为止输入的内容)(1) – 但我还需要: (2)只获得“做一些工作”单位的最新结果(见下文) (3)当新的工作单元开始时, 取消正在进行的任何工作(在我的情况下,它是CPU密集型的) 对于(1)我使用IObservable作为用户事件,使用.Throttle()仅触发事件之间的暂停(“用户停止键入”)。 从那,我。选择.Select(_ => CreateMyTask(…).ToObservable()) 。 这给了我一个IObservable<IObservable> ,其中每个内部observable包装一个任务。 为了得到(2)我最终应用.Switch()来获得最新工作单元的结果。 那么(3) – 取消待定任务? 如果我理解正确的话,只要有一个新的内部IObservable .Switch()方法就会订阅它,并取消订阅前一个(),使它们成为Dispose() 。 也许这可能以某种方式连线触发任务取消?

Reactive Extensions:订阅者中的并发

我试图围绕Reactive Extensions对并发性的支持,并且很难获得我追求的结果。 所以我可能还没有得到它 。 我有一个源,它将数据发送到流中的速度比订阅者可以使用它的速度快。 我更喜欢配置流,以便使用另一个线程从流中为每个新项调用订阅者,以便订阅者同时运行多个线程。 我能够确保订户的线程安全性。 以下示例演示了此问题: Observable.Interval( TimeSpan.FromSeconds(1)) .Do( x => Console.WriteLine(“{0} Thread: {1} Source value: {2}”, DateTime.Now, Thread.CurrentThread.ManagedThreadId, x)) .ObserveOn(NewThreadScheduler.Default) .Subscribe(x => { Console.WriteLine(“{0} Thread: {1} Observed value: {2}”, DateTime.Now, Thread.CurrentThread.ManagedThreadId, x); Thread.Sleep(5000); // Simulate long work time }); 控制台输出如下所示(删除日期): 4:25:20 PM Thread: 6 Source value: 0 4:25:20 PM Thread: 11 Observed […]

react native可观察认购出售事项

如果我可以访问一个我知道只会返回一个项目的IObservable,这是否有效并且它是最好的使用模式? IDisposable disposable = null; disposable = myObservable.Subscribe(x => { DoThingWithItem(x); if (disposable != null) { disposable.Dispose(); } });

使用Reactive Extensions搜索TextChanged

我试图在具有10000多条记录的数据库表上实现即时搜索。 搜索文本框内的文本发生更改时搜索开始,当搜索框变空时,我想调用另一种加载所有数据的方法。 此外,如果用户更改搜索字符串,而正在加载另一个搜索的结果,则应停止加载这些结果以支持新搜索。 我像下面的代码一样实现它,但我想知道是否有更好或更清晰的方法来使用Rx(Reactive Extension)运算符,我觉得在第一个observable的subscribe方法中创建第二个observable比声明性的,对于if语句也是如此。 var searchStream = Observable.FromEventPattern(s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s) .Throttle(TimeSpan.FromMilliseconds(300)) .Select(evt => { var txtbox = evt.Sender as TextBox; return txtbox.Text; } ); searchStream .DistinctUntilChanged() .ObserveOn(SynchronizationContext.Current) .Subscribe(searchTerm => { this.parties.Clear(); this.partyBindingSource.ResetBindings(false); long partyCount; var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAll(out partyCount) : partyRepository.SearchByNameAndNotes(searchTerm); foundParties .ToObservable(Scheduler.Default) .TakeUntil(searchStream) .Buffer(500) […]

使用Reactive Extensions重新排序事件

我正在尝试重新排序在不同线程上无序的事件。 是否可以创建匹配这些大理石图的反应式扩展查询: s1 1 2 3 4 s2 1 3 2 4 result 1 2 3 4 和… s1 1 2 3 4 s2 4 3 2 1 result 1234 即:仅以版本号顺序发布结果。 我最接近的是每次s1时刻都使用Join打开一个窗口,只有当s2到达时才使用相同的数字关闭它。 像这样: var publishedEvents = events.Publish().RefCount(); publishedEvents.Join( publishedEvents.Scan(0, (i, o) => i + 1), expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion), _ => Observable.Never(), […]

如何在不使用Subject 支持字段的情况下公开IObservable 属性

在这个关于Subject Enigmativity的问题的答案中提到: 另外,你应该尽量避免使用主题。 一般规则是,如果你正在使用一个主题,那么你做错了什么。 我经常使用subject作为IObservable属性的支持字段,这可能是Rx之前几天的.NET事件。 例如,而不是像 public class Thing { public event EventHandler SomethingHappened; private void DoSomething() { Blah(); SomethingHappened(this, EventArgs.Empty); } } 我可能会这样做 public class Thing { private readonly Subject somethingHappened = new Subject(); public IObservable SomethingHappened { get { return somethingHappened; } } private void DoSomething() { Blah(); somethingHappened.OnNext(Unit.Default); } } 所以,如果我想避免使用Subject ,那么做这种事情的正确方法是什么? […]

Rx中可推进的历史流和直播

我有一个热门观察,我通常使用下面的普通Subject实现,以便有兴趣的人可以订阅实时的通知流。 现在我想保留这个实时流,但是还要公开所有事件的历史流,这些事件已经并且具有绝对时间附加到那些通知以了解它们究竟发生的时间因为允许订户将历史流推进到任何在重播年表之前的时间点。 我相信大部分可以使用HistoricalScheduler及其AdvanceTo方法实现,但我不确定如何? 是否使用Timestamped来节省所需事件的时间? ReplaySubject是否需要将实时流缓存到历史记录中,然后可以使用HistoricalScheduler进行回放? 如何为同一来源实施这两个流,或者换句话说,如何将下面的内容用于当前的要求? [见“重播过去”标题]