Tag: system.reactive

WinRT事件如何与.NET互操作

在Rx团队Bart De Smet的最新video中:Rx Update – .NET 4.5,Async,WinRT我看到一些非常奇怪的元数据暴露给.NET的WinRT事件,更多的是prenexly – add_ / remove_ pair方法签名: EventRegistrationToken add_MyEvent(EventHandler handler) { … } void remove_MyEvent(EventRegistrationToken registrationToken) { … } 它看起来非常棒,允许通过“处理”注册令牌取消订阅事件(Rx做同样的事情,从Subscribe()方法返回IDisposable实例)。 因此,可以轻松取消订阅事件中的lamba表达式,但…… 那么C#如何允许处理这类事件呢? 在.NET中,可以使用委托上的一个实例订阅方法(静态和实例),并使用指向同一方法的完全另一个委托实例取消订阅。 因此,如果我使用WinRT事件并且只是在C#中取消订阅某个委托类型实例…编译器在哪里获得正确的EventRegistrationToken ? 所有这些魔法如何运作? – 更新 – 实际上EventRegistrationToken不允许简单地通过调用某种Dispose()方法取消订阅,这真的很遗憾: public struct EventRegistrationToken { internal ulong Value { get; } internal EventRegistrationToken(ulong value) public static bool operator ==(EventRegistrationToken left, EventRegistrationToken […]

如何避免在RX中使用Subjects

所以我一直在读到使用Subject是“坏”的 – 并且我赞同这个推理。 但是,我试图想出避免使用它的最佳方法,并举一个例子。 目前,我有一个持久化配置类的抽象类,它有一个受保护的Save()方法,只要更改属性应该保持类,就会调用该方法。 此消息将消息泵送到Subject ,通过IObservable接口公开,序列化服务侦听并序列化该类。 这似乎是当时实现这一目标的最明显,最简单,最快捷的方式。 那么在不使用主题的情况下,RX方式会是什么呢? 我是否会公开一个事件并使用Observable.FromEventPattern()来订阅它? – 因为这似乎是一种更复杂的方式。

使用IObservable而不是事件

我最近一直在阅读有关IObservable的内容。 到目前为止,我已经查看了各种SO问题,并观看了他们可以做什么的video。 我正在思考的整个“推动”机制非常棒,但我仍在努力弄清楚究竟是什么。 从我的读数来看,我认为在某种程度上IObservable可以被“监视”,而IObservers则是“观察者”。 所以现在我要尝试在我的应用程序中实现它。 在我开始之前,有一些事情我想坚持下去。 我已经看到IObservable与IEnumerable相反,但是,我无法在我的特定实例中看到任何可以合并到我的应用程序中的地方。 目前,我大量使用事件,以至于我可以看到“管道”开始变得无法管理。 我想,IObservable可以帮助我。 考虑以下设计,这是我的应用程序中的I / O包装器(仅供参考,我通常需要处理字符串): 我有一个名为IDataIO的基本接口: public interface IDataIO { event OnDataReceived; event OnTimeout: event OnTransmit; } 现在,我目前有三个实现此接口的类,这些类中的每一个都以某种方式利用异步方法调用,引入了某种类型的multithreading处理: public class SerialIO : IDataIO; public class UdpIO : IDataIO; public class TcpIO : IDataIO; 每个类的一个实例都包含在我的最终类中,称为IO(它也实现了IDataIO – 遵循我的策略模式): public class IO : IDataIO { public SerialIO Serial; public UdpIO Udp; […]

当没有值传入时,是否有Rx方法定期重复上一个值?

我遇到的一个用例,我怀疑我不能成为唯一一个用例,如下所示: IObservable Observable.RepeatLastValueDuringSilence(this IObservable inner, TimeSpan maxQuietPeriod); 它将返回内部observable中的所有未来项,但是,如果内部observable在一段时间内没有调用OnNext(maxQuietPeriod),它只重复最后一个值(当然内部调用OnCompleted或OnError) 。 理由是服务定期ping出定期状态更新。 例如: var myStatus = Observable.FromEvent( h=>this.StatusUpdate+=h, h=>this.StatusUpdate-=h); var messageBusStatusPinger = myStatus .RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1)) .Subscribe(update => _messageBus.Send(update)); 这样的事情存在吗? 还是我过度估计它的用处? 谢谢,Alex PS:我为任何不正确的术语/语法道歉,因为我只是第一次探索Rx。

Rx如何从pub / sub模式创建序列

我正在尝试使用Rx来评估从发布/子模式创建序列(即经典观察者模式,其中下一个元素由生产者发布)。 这基本上与.net事件相同,除了我们需要概括它以使得事件不是必需的,所以我无法利用Observable.FromEvent。 我玩过Observable.Create和Observable.Generate,发现自己最终不得不编写代码来处理pub / sub(即我必须编写生产者/消费者代码来存储已发布的项目,然后通过用它来调用IObserver.OnNext()),所以看起来我并没有真正利用Rx …… 我向下看正确的路径还是适合Rx? 谢谢

Rx – 如何从Task 创建IObservable ,以便取消订阅取消任务?

我是Rx的新手所以请耐心等待。 我想在IObservable包装一个Task IObservable 。 到现在为止还挺好: Task task = Task.Factory.StartNew(…); IObservable obs = task.ToObservable(); 现在,我想要的是在观察者取消订阅时发出取消信号: var cancel = new CancellationToken(); Task task = Task.Factory.StartNew(…, cancel); IObservable obs = task.ToObservable(); //there should be a way to tie the cancel token //to the IObservable (?) IDisposable disposable = obs.Subscribe(…); Thread.Sleep(1000); disposable.Dispose(); // this should signal the task to […]

使用FromAsyncPattern进行unit testing

Reactive Extensions有一个性感的小钩子来简化调用异步方法: var func = Observable.FromAsyncPattern( myWcfService.BeginDoStuff, myWcfService.EndDoStuff); func(inData).ObserveOnDispatcher().Subscribe(x => Foo(x)); 我在WPF项目中使用它,它在运行时运行良好。 不幸的是,当尝试使用这种技术的unit testing方法时,我遇到了随机故障。 包含此代码的测试的每五次执行中约有3次失败。 这是一个示例测试(使用Rhino / unity自动模拟容器实现): [TestMethod()] public void SomeTest() { // arrange var container = GetAutoMockingContainer(); container.Resolve() .Expect(x => x.BeginDoStuff(null, null, null)) .IgnoreArguments() .Do( new Func((inData, asyncCallback, state) => { return new CompletedAsyncResult(asyncCallback, state); })); container.Resolve() .Expect(x => x.EndDoStuff(null)) .IgnoreArguments() .Do( new […]

TakeUntil没有记录的工作?

从TakeUntil运算符的文档 (强调我的): TakeUntil订阅并开始镜像源Observable。 它还监视您提供的第二个Observable。 如果第二个Observable发出一个项目或发送终止通知 ,则TakeUntil返回的Observable将停止镜像源Observable 并终止 。 如果这是真的,那为什么阻止?: Observable.Never() .TakeUntil(Observable.Empty()) .Wait();

使用反应式扩展以正确的顺序处理多个响应

情况 我有一个系统,其中一个请求产生两个响应。 请求和响应具有相应的可观察量: IObservable _requests; IObservable _mainResponses; IObservable _secondaryResponses; 保证RequestSent事件早于MainResponseReceived和SecondaryResponseReceived发生,但响应按随机顺序排列。 是)我有的 最初我想要处理两个响应的处理程序,所以我压缩了observables: _requests .SelectMany(async request => { var main = _mainResponses.FirstAsync(m => m.Id == request.Id); var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id); var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived { Request = request, Main = m, Secondary = s }); return await […]

使用Reactive Extensions(RX),是否可以添加“暂停”命令?

我有一个接收事件流的类,并推出另一个事件流。 所有事件都使用Reactive Extensions(RX)。 使用.OnNext将传入的事件流从外部源推送到IObserver ,并使用IObservable和.Subscribe推出传出的事件流。 我正在使用Subject在幕后管理它。 我想知道RX中有什么技术暂时暂停输出。 这意味着传入事件将在内部队列中累积,当它们取消暂停时,事件将再次流出。