Tag: reactive programming

Reactive Extensions(Rx) – 当间隔中没有值时,具有最后已知值的样本

我有一个可观察的流,以不一致的间隔生成值,如下所示: ——1—2——3—————-4————–5— 我希望对此进行采样,但是一旦产生了一个值,就没有任何空样本: ——1—2——3—————-4————–5—– —-_—-1—-2—-3—-3—-3—-4—-4—-4—-5—-5 我显然认为Replay().RefCount()可以在这里用来向Sample()提供最后一个已知的值,但由于它没有重新订阅源流,所以它没有用完。 有关如何做到这一点的任何想法?

我对失去活动的扩展计时器缺少什么?

我有这个: watchers .ToObservable() // needs to be observable .SelectMany(watcher => // working on each watcher Observable // create a timer for the watcher .Timer(watcher.StartTime, TimeSpan.FromHours(watcher.Interval)) .SelectMany(Observable.FromAsync( async () => new { watcher, result = await CheckFolder(watcher.Path) }))) .Subscribe(x => Console.WriteLine(string.Format(“Watcher: {0}\tResult: {1}\tTime: {2}”, x.watcher.Name, x.result, DateTimeOffset.Now))); // tell everyone what happened. 这篇文章中的一小段代码让我开始走这条路。 目标是在每次Timer发布时,根据给定的开始时间和间隔ping Web服务(通过CheckFolder()方法)。 麻烦的是,每次运行程序时,它都会为第一个Watcher输出一条消息,然后程序退出而不会出错。 […]

Observable.Retry无法按预期工作

我有一系列使用异步方法处理的数字 。 我正在模拟可能失败的远程服务调用 。 如果失败,我想重试,直到电话成功。 问题是,对于我正在尝试的代码, 每次在异步方法中抛出exception时,序列似乎永远都会挂起 。 你可以用这个简单的代码片段测试它(在LINQPad中测试) Random rnd = new Random(); void Main() { var numbers = Enumerable.Range(1, 10).ToObservable(); var processed = numbers.SelectMany(n => Process(n).ToObservable().Retry()); processed.Subscribe( f => Console.WriteLine(f)); } public async Task Process(int n) { if (rnd.Next(2) == 1) { throw new InvalidOperationException(); } await Task.Delay(2000); return n*10; } 它应该处理每个元素,重试那些失败的元素。 相反,它永远不会结束,我不知道为什么。 […]

Rx如何通过键对一个复杂的对象进行分组,然后在不“停止”流的情况下执行SelectMany?

这与我的其他问题有关。 James World提出了如下解决方案: // idStream is an IObservable of the input stream of IDs // alarmInterval is a Func that gets the interval given the ID var idAlarmStream = idStream .GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key))) .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key))); <编辑2: 问题:如何在不等待第一个事件到达的情况下立即启动计时器? 我猜这是我问题中的根本问题。 为此,我计划发送虚拟对象,其中包含我知道应该存在的ID。 但正如我在下面写的那样,我最终遇到了一些其他问题。 尽管如此,我认为解决这个问题也会很有趣。 然后转发其他有趣的部分! 现在,如果我想将一个复杂的对象分组如下,并按键分组,如下所示(不会编译) var idAlarmStream = idStream .Select(i => new { […]

使用反应式扩展以正确的顺序处理多个响应

情况 我有一个系统,其中一个请求产生两个响应。 请求和响应具有相应的可观察量: IObservable _requests; IObservable _mainResponses; IObservable _secondaryResponses; 保证RequestSent事件早于MainResponseReceived和SecondaryResponseReceived发生,但响应按随机顺序排列。 是)我有的 最初我想要处理两个响应的处理程序,所以我压缩了observables: _requests .SelectMany(async request => { var main = _mainResponses.FirstAsync(m => m.Id == request.Id); var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id); var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived { Request = request, Main = m, Secondary = s }); return await […]

使用Reactive Extensions(RX),是否可以添加“暂停”命令?

我有一个接收事件流的类,并推出另一个事件流。 所有事件都使用Reactive Extensions(RX)。 使用.OnNext将传入的事件流从外部源推送到IObserver ,并使用IObservable和.Subscribe推出传出的事件流。 我正在使用Subject在幕后管理它。 我想知道RX中有什么技术暂时暂停输出。 这意味着传入事件将在内部队列中累积,当它们取消暂停时,事件将再次流出。

C#.NET Rx- System.Reactive在哪里?

我有一个密集的Java背景,所以请原谅我,如果我忽略了C#中明显的东西,但我的研究让我无处可去。 我正在尝试使用react nativeRx .NET库。 编译器并没有抱怨IObservable而是调用zip方法。 它正在抛出“……你错过了使用指令或汇编引用吗?” 我一直在浏览命名空间,但我找不到想要的东西。 我找不到System.Reactive ,如果使用它也会抛出错误,并且所有引用都已包含在此Windows 8.1应用程序中。 有人可以告诉我有什么问题吗? public sealed class EventEngine { private static readonly EventEngine singleton = new EventEngine(); public static EventEngine get() { return singleton; } public IObservable CurrentKey { get; set; } public IObservable CurrentScale { get; set; } public IObservable CurrentAppliedScale { get { return CurrentScale.zip(CurrentKey, (s, […]

如何在一段时间内对一个流进行分区(GroupBy)并监视Rx中元素的缺失?

前几天我一直在尝试编写一个Rx查询来处理来自源的事件流,并检查是否缺少某些ID。 定义缺席使得存在一系列时间窗口(例如,在从9:00到17:00的所有日期),在该时间窗口期间应该存在最多20分钟而没有在流中出现ID。 更复杂的是,应根据身份certificate确定缺勤时间。 例如,假设出现在事件的组合流(A,A,B,C,A,C,B等)中的三种事件A,B和C,可以定义为 活动将在每天的9:00至10:00进行监控,最多不超过10分钟。 B事件每天从9:00到11:00进行监控,最多没有事件为5分钟。 C事件的监控时间为每天的12:00至15:00,最多没有事件为30分钟。 我想我需要首先对流进行分区以通过GroupBy分离事件,然后使用缺少规则处理生成的单独流。 我已经在Microsoft Rx论坛上对此进行了一些考虑(非常感谢Dave),我有一些工作代码可以生成规则并进行缺失检查,但是我很挣扎,例如,如何将其与分组相结合。 所以,没有进一步的演讲,迄今为止被黑客攻击的代码: //Some sample data bits representing the events. public class FakeData { public int Id { get; set; } public string SomeData { get; set; } } //Note the Now part in DateTime to zero the clock time and have only the date. The purpose […]

使用TcpClient和Reactive Extensions从Stream读取连续字节流

请考虑以下代码: internal class Program { private static void Main(string[] args) { var client = new TcpClient(); client.ConnectAsync(“localhost”, 7105).Wait(); var stream = client.GetStream(); var observable = stream.ReadDataObservable().Repeat(); var s = from d in observable.Buffer(4) let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2)) let b = observable.Take(headerLength) select b.ToEnumerable().ToArray(); s.Subscribe(a => Console.WriteLine(“{0}”, a)); Console.ReadLine(); } } public static class Extensions […]

使用Reactive Extensions进行数据库轮询

我必须及时查询数据库以了解遗留系统的状态。 我想过围绕一个Observable包装查询,但我不知道正确的方法。 基本上,它将每5秒进行一次相同的查询。 但我担心我将不得不面对这些问题: 如果执行查询需要10秒钟怎么办? 如果仍在处理上一个查询,我不想执行任何新查询。 此外,应该有一个超时。 如果当前查询在例如20秒之后未执行,则应记录信息性消息并应发送新的尝试(同一查询)。 额外细节: 查询只是一个SELECT ,它返回一个包含状态代码列表的数据集( 工作 , 出错 )。 Observable序列将始终采用从查询接收的最新数据,例如Switch扩展方法。 我想将数据库查询(lenghty操作)包装到一个Task中,但我不确定它是否是最佳选择。 我几乎可以确定查询应该在另一个线程中执行,但是我不知道observable应该是什么样子,曾经阅读过Lee Campbell的Rx简介 。