Tag: system.reactive

使用IObservable进行批处理

我的服务器端向我发送了批量消息。 批处理和频率中的消息是任意的。 有时我会以1分钟的间隔收到消息,有时候不会收到一小时的消息。 有时是1条消息,有时是10.我当前的实现使用Observable.Buffer(TimeSpan.FromSeconds(5))来分组消息并将消息发送给用户。 有没有一种方法可以配置Observable,如果两条消息之间有几秒钟的延迟,就可以将缓冲的消息发送给用户。 我所处的位置是避免每5秒钟不必要的计时器滴答声。 打开其他建议以优化批处理。

RX AutoCompleteBox

我正在尝试使用RX和WPF构建filter控件。 所以我有一个文本框和一个列表框。 启动时,列表框有100个联系人姓名,用户可以输入名称来过滤列表。 问题是如何构建文本流(键输入)然后发布。 这应该是时间敏感的,所以我想只有在750毫秒之后如果没有检测到键输入,那么可以执行滤波器。 谢谢

如何使用Rx“重建从SerialPort读取的数据行”

我正在学习Rx,并试图使用SerialPort从GPS设备实现“NMEA句子阅读器”。 事实上它的GPS数据对问题的重要性不大,所以让我们澄清NMEA格式由线组成,’$’符号代表新条目的开头,所以你得到的“句子”看起来类似于: $[data for first line goes here] $[data for second line goes here] … 连接SerialPort的DataReceived事件非常简单: var port = new SerialPort(“COM3”, 4800); var serialPortSource = Observable.FromEventPattern ( handler => port.DataReceived += handler, handler => port.DataReceived -= handler ).Select(e => port.ReadExisting()); 这给出了一个IObservable ,但显然这里返回的字符串不一定是NMEA句子。 例如,对于上面的示例数据,可以得到如下序列: $[data for first line g oes here]\r\n$[data for second line goes here] […]

快速重复TakeWhile导致无限循环

在stream.DataAvailable为false之前,如何进行以下可观察重复? 目前它看起来永远不会停止。 在Defer部分内的AsyncReadChunk和Observable.Return使OnNext调用然后OnCompleted调用。 当Repeat接收OnNext调用时,它将它传递给TakeWhile。 当TakeWhile不满意时,它完成了observable,但我认为在OnNext之后的OnCompleted是如此之快以至于它使Repeat重新订阅了observable并导致无限循环。 我该如何纠正这种行为? public static IObservable AsyncRead(this NetworkStream stream, int bufferSize) { return Observable.Defer(() => { try { return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]); } catch (Exception) { return Observable.Return(new byte[0]); } }) .Repeat() .TakeWhile((dataChunk, index) => dataChunk.Length > 0); }

如何在Rx.NET中以正确的方式约束并发

请观察以下代码段: var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList(); 此代码的问题在于getResultAsync以不受约束的方式并发运行。 在某些情况下,这可能不是我们想要的。 假设我想将其并发限制为最多10个并发调用。 什么是Rx.NET方法呢? 我附上一个简单的控制台应用程序,演示了所描述问题的主题和我的蹩脚解决方案。 还有一些额外的代码,比如Stats类和人工随机睡眠。 它们用于确保我真正获得并发执行并且可以可靠地计算在此过程中达到的最大并发性。 RunUnconstrained方法演示了天真的,无约束的运行。 RunConstrained方法显示了我的解决方案,这不是很优雅。 理想情况下,我想通过简单地将专用Rx运算符应用于Monad来简化约束并发性。 当然,不会牺牲性能。 using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; using System.Threading; using System.Threading.Tasks; namespace RxConstrainedConcurrency { class Program { public class Stats { public int MaxConcurrentCount; public int CurConcurrentCount; public readonly object MaxConcurrentCountGuard […]

当缓冲区不包含任何项目时,为什么Rx缓冲区连续执行方法?

我有一个Rx Observable充当缓冲区。 现在,当它获得10个项目时,或者在100毫秒之后,以先到者为准,它在订阅中执行该方法。 我注意到我的方法每100毫秒不断被调用,即使缓冲区中没有项目,这让我感到惊讶。 如果它没有收到缓冲区中的任何项目,那么只需让我的方法立即返回就足够了,但我觉得很奇怪它只是在背景中像这样生长。 为什么是这样? 你怎么建议我最好处​​理这个? 我是Rx的完全新手,所以也许我正在做一些奇怪的事情。 这是我的代码的简化版本: private Subject<KeyValuePair<int, Action>> serverRequests; public MyBufferClass(IMyServer server, IScheduler scheduler) { this.serverRequests = new Subject<KeyValuePair<int, Action>>(); this.serverRequests .Buffer(TimeSpan.FromMilliseconds(100), 10, scheduler) .Subscribe(buffer => GetMultipleItemsFromServer(buffer)); } public void GetSingleItemFromServer(int id, Action callback) { this.serverRequests.OnNext(new KeyValuePair<int, Action>(id, callback)); } public void GetMultipleItemsFromServer(IEnumerable<KeyValuePair<int, Action>> idsWithCallbacks) { if (idsWithCallbacks.IsNullOrEmpty()) return; this.server.GetMultipleItems(idsWithCallbacks) } […]

使用Reactive Extensions对事件进行unit testing

我正在使用Reactive Extensions for .NET(Rx)将事件公开为IObservable 。 我想创建一个unit testing,我断言特定事件被触发。 这是我要测试的类的简化版本: public sealed class ClassUnderTest : IDisposable { Subject subject = new Subject(); public IObservable SomethingHappened { get { return this.subject.AsObservable(); } } public void DoSomething() { this.subject.OnNext(new Unit()); } public void Dispose() { this.subject.OnCompleted(); } } 显然我真正的课程更复杂。 我的目标是validation对被测试类执行某些操作会导致在IObservable上发出一系列事件。 幸运的是,我想测试的类实现IDisposable并在处理对象时对主题调用OnCompleted使得测试更容易。 这是我测试的方式: // Arrange var classUnderTest = new ClassUnderTest(); […]

使用Observable.FromEvent转换事件而不使用EventArgs

我正在努力将以下事件转换为IObservable : public delegate void _dispSolutionEvents_OpenedEventHandler(); event _dispSolutionEvents_OpenedEventHandler Opened; 该活动来自图书馆,所以我无法改变它。 应该执行它的IObservable.FromEvent的重载具有以下签名: public static IObservable FromEvent ( Action addHandler , Action removeHandler ) 所以我尝试转换这样的事件: var opened = Observable.FromEvent ( h => _SolutionEvents.Opened += h , h => _SolutionEvents.Opened -= h ); 但编译器不喜欢_SolutionEvents.Opened += h和_SolutionEvents.Opened += h因为 无法将类型’System.Action’隐式转换为’EnvDTE._dispSolutionEvents_OpenedEventHandler’。 我不认为我只能说_SolutionEvents.Opened += new _dispSolutionEvents_OpenedEventHandler(h)因为那时删除不起作用,因为我有一个不同的实例,对吧? Observable.FromEvent还有另一个重载,带有以下签名: public static IObservable FromEvent […]

为什么Observable.Generate()抛出System.StackOverflowException?

我正在编写一个C#(.NET 4.5)应用程序,用于聚合基于时间的事件以进行报告。 为了使我的查询逻辑可以重用于实时和历史数据,我使用了Reactive Extensions(2.0)及其IScheduler基础结构( HistoricalScheduler和friends)。 例如,假设我们创建了一个事件列表(按时间顺序排序,但它们可能重合!),其唯一的有效负载是它们的时间戳,并希望知道它们在固定持续时间的缓冲区中的分布: const int num = 100000; const int dist = 10; var events = new List(); var curr = DateTimeOffset.Now; var gap = new Random(); var time = new HistoricalScheduler(curr); for (int i = 0; i < num; i++) { events.Add(curr); curr += TimeSpan.FromMilliseconds(gap.Next(dist)); } var stream = Observable.Generate( 0, […]

Rx运算符指向不同的序列

重要提示 :有关结果的说明和更多详细信息,请查看我的答案 我需要对通常复制的一系列对象/事件进行分组和过滤,并使用TimeSpan间隔缓冲它们。 我尝试用大理石图解释它更好: XXXXXYYYZZZZXXYZZ 会产生 X—Y—Z—X—Y—Z 其中X,Y和Z是不同的事件类型,’ – ‘表示间隔。 另外,我还想通过一个关键属性区分它可用于所有类型,因为它们有一个共同的基类: X, Y, Z : A 和A包含属性Key。 使用符号Xa表示X.Key = a,最终样本将是: Xa-Xb-Xa-Yb-Yc-Za-Za-Zc-Zb-Zc 会产生 Xa-Xb—Yb-Yc-Za-Zc-Zb 任何人都可以帮我组合所需的Linq运算符(可能是DistinctUntilChanged和Buffer)来实现这种行为吗? 谢谢 更新18.08.12 : 根据要求,我试着给出一个更好的解释。 我们有设备收集和发送事件到Web服务。 这些设备具有旧的逻辑(由于向后兼容性,我们无法对其进行更改)并且它们会一直发送事件,直到它们收到确认为止; 在确认之后,他们将下一个事件发送到队列中,依此类推。 事件包含单元的网络地址和一些其他属性,用于区分每个设备的队列中的事件。 事件如下所示: class Event { public string NetworkAddress { get; } public string EventCode { get; } public string AdditionalAttribute { get; } } […]