Tag: system.reactive

Threading.Tasks在.NET 3.5中的Rx扩展模拟?

是否可以使用Reactive Extensions(Rx)在.NET 3.5中创建执行并行化的应用程序,还是以某种方式限制? 我从http://www.microsoft.com/download/en/confirmation.aspx?id=26649下载了Rx,在创建了一个带引用的反应式程序集的简单项目之后,我找不到与.NET 4.0中的任务相对应的任何类。 我正在尝试为“任务”找到课程,但唉,我找不到任何课程。 难道我做错了什么?

Observable.Timer():如何避免定时器漂移?

在C#(.NET 4.0)应用程序中,我使用Reactive Extensions(2.0.20823.0)生成时间边界,以便将事件分组为聚合值。 为了简化对生成的数据库的查询,需要在整个小时(或下面示例中的秒)对齐这些边界。 使用Observable.Timer() : var time = DefaultScheduler.Instance; var start = new DateTimeOffset(time.Now.DateTime, time.Now.Offset); var span = TimeSpan.FromSeconds(1); start -= TimeSpan.FromTicks(start.Ticks % 10000000); start += span; var boundary = Observable.Timer(start, span, time); boundary.Select(i => start + TimeSpan.FromSeconds(i * span.TotalSeconds)) .Subscribe(t => Console.WriteLine(“ideal: ” + t.ToString(“HH:mm:ss.fff”))); boundary.Select(i => time.Now) .Subscribe(t => Console.WriteLine(“actual: ” + […]

使用Reactive Extensions进行数据库轮询

我必须及时查询数据库以了解遗留系统的状态。 我想过围绕一个Observable包装查询,但我不知道正确的方法。 基本上,它将每5秒进行一次相同的查询。 但我担心我将不得不面对这些问题: 如果执行查询需要10秒钟怎么办? 如果仍在处理上一个查询,我不想执行任何新查询。 此外,应该有一个超时。 如果当前查询在例如20秒之后未执行,则应记录信息性消息并应发送新的尝试(同一查询)。 额外细节: 查询只是一个SELECT ,它返回一个包含状态代码列表的数据集( 工作 , 出错 )。 Observable序列将始终采用从查询接收的最新数据,例如Switch扩展方法。 我想将数据库查询(lenghty操作)包装到一个Task中,但我不确定它是否是最佳选择。 我几乎可以确定查询应该在另一个线程中执行,但是我不知道observable应该是什么样子,曾经阅读过Lee Campbell的Rx简介 。

RX Observable.TakeWhile在每个元素之前检查条件,但我需要在之后执行检查

Observable.TakeWhile允许您在条件为真时运行序列(使用委托,以便我们可以对实际的序列对象执行计算),但是它会在每个元素之前检查这个条件。 如何在每个元素之后执行相同的检查? 以下代码演示了此问题 void RunIt() { List listOfCommands = new List(); listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 }); listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 }); listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 }); var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount); obs.Subscribe(x => { Debug.WriteLine(“{0} of {1}”, […]

C#中的IObserver和IObservable用于观察者与代理,事件

我所要做的就是实现观察者模式。 所以,我提出了这个解决方案: 我们有一个PoliceHeadQuarters,其主要工作是向所有订阅者发送通知。 考虑到DSP,Inspector和SubInspector类订阅了PoliceHeadQuarters。 使用事件和代表我写道 public class HeadQuarters { public delegate void NewDelegate(object sender, EventArgs e); public event EventHandler NewEvent; public void RaiseANotification() { var handler = this.NewEvent; if (handler != null) { handler(this, new EventArgs()); } } } public class SubInspector { public void Listen(object sender, EventArgs e) { MessageBox.Show(string.Format(“Event Notification received by sender […]

System.Reactive.Joins指南

我正在寻找System.Reactive.Joins的介绍/一些文档,其中包括Pattern,Plan,QueryablePattern和QueryablePlan类。 谷歌没有发现任何东西(“System.Reactive.Joins”),MSDN什么都没有,这里没有样本, 这个问题的优秀资源不包括这个命名空间。 有人有指点吗?

在面向.NET 4+的库中公开通知时,IObservable是否应优先于事件

我有一个.NET库,作为对象模型的一部分将发出某些事件的通知。 在我看来, 事件的主要优点是初学者的可接近性(以及某些消费环境中的简单性),主要的负面因素是它们不可组合 ,因此如果你想做任何事情,它们会被立即强制进入Observable.FromEvent *有趣的是没有编写代码丛。 正在解决的问题的本质是事件流量不会特别频繁或大量(它肯定不会尖叫RX),但绝对没有要求支持4.0之前的.NET版本[因此我可以使用System.Reactive内置的IObservable接口,不会强制对消费者产生任何重大依赖关系]。 我对一些通用指南感兴趣, 但是从API设计的角度来看,更喜欢IObservables不是event的一些具体原因 – 无论我的具体案例可能在哪里 – IObservable频谱。 那么,问题是: 如果我采用最简单的方法并且暴露event而不是IObservable那么对于API消费者来说,是否有任何具体的事情变得更加困难或有问题? 或者,重申:除了消费者必须执行Observable.FromEvent *才能组合事件之外,在API中公开通知时,是否真的没有理由更喜欢IObservable而不是event ? 使用IObservable进行非尖叫-RX内容或编码指南的项目的IObservable将是理想的,但并不重要。 在@Adam Houldsworth的评论中提到NB,我对.NET 4+库的API表面的具体内容感兴趣,而不是对我们这个时代代表更好的“默认架构”的意见调查: ) 注意,在C#中, IObserver和IObservable已经触及了这个问题, 对于Observer vs Delegates,事件和IObservable与普通事件或我为什么要使用IObservable? 。 由于SRP违规,我提出的问题的方面没有在任何答复中得到解决。 另一个略微重叠的问题是.NET Rx优于经典事件? 。 (使用IObservable而不是事件)[ 使用IObservable而不是事件属于同一类别。

使用Reactive Extensions(Rx)实现套接字编程?

使用Rx编写GetMessages函数的最简洁方法是什么: static void Main() { Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); var messages = GetMessages(socket, IPAddress.Loopback, 4000); messages.Subscribe(x => Console.WriteLine(x)); Console.ReadKey(); } static IObservable GetMessages(Socket socket, IPAddress addr, int port) { var whenConnect = Observable.FromAsyncPattern(socket.BeginConnect, socket.EndConnect)(addr, port); // now will receive a stream of messages // each message is prefixed with an 4 bytes/Int32 […]

如何与发布和连接共享一个observable?

我有一个可观察的数据流,我正在应用操作,分成两个独立的流,对两个流中的每一个应用更多(不同的)操作,并再次合并在一起。 我试图使用Publish and Connect分享两个订阅者之间的可观察性,但每个订阅者似乎都在使用单独的流。 也就是说,在下面的示例中,我看到两个订阅者为流中的每个项目打印一次“执行昂贵的操作”。 (想象一下,昂贵的操作是在所有订阅者之间只发生一次的事情,因此我试图重用流。)我使用Publish and Connect尝试与两个订阅者共享合并的observable,但它似乎有错误的效果。 问题示例: var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false }); var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler); var expensive = timer.Select(i => { // Converting to strings is an expensive operation Console.WriteLine(“Doing an expensive operation”); return string.Format(“#{0}”, i); }); var a = expensive.Where(s => […]

包装速率限制API调用

我有一个API调用,它接受每秒最大调用率 。 如果超过速率, 则抛出exception 。 我想把这个调用包装成一个抽象,它可以使调用率保持在极限之下。 它将像网络路由器一样:处理多个呼叫并将结果返回给正确的呼叫者,关注呼叫率。 目标是使调用代码尽可能不知道该限制。 否则,具有此调用的代码中的每个部分都必须包装到try-catch中! 例如:想象一下,您可以从可以添加2个数字的extern API调用方法。 此API可以每秒调用5次 。 高于此值的任何内容都将导致exception。 为了说明问题,限制通话费率的外部服务就像这个问题的答案中的那个 如何使用Observables构建速率限制API? 附加信息: 由于每次从代码的任何部分调用此方法时都不需要担心该限制,因此您可以考虑设计一个可以调用的包装器方法,而不必担心速率限制。 在内部你关心限制,但在外面你暴露了一个简单的异步方法。 它类似于Web服务器。 它如何将正确的结果包返回给正确的客户? 多个呼叫者将调用此方法,他们将获得结果。 这种抽象应该像代理一样。 我怎么能这样做? 我确定包装方法的公司应该是这样的 public async Task MyMethod() 在方法内部,它将执行逻辑,可能使用Reactive Extensions(Buffer)。 我不知道。 但是怎么样? 我的意思是,多次调用此方法应该将结果返回给正确的调用者。 这有可能吗? 非常感谢!