Tag: system.reactive

在反应流水线中执行TPL代码并通过测试调度程序控制执行

我很难理解以下测试不起作用的原因: [Fact] public void repro() { var scheduler = new TestScheduler(); var count = 0; // this observable is a simplification of the system under test // I’ve just included it directly in the test for clarity // in reality it is NOT accessible from the test code – it is // an implementation detail […]

如何获得N hot Observable 实例的“最后”项的总和?

编辑:2013年9月15日 – 我正在描述我的情景进一步分解为帮助每个人更好地了解我的情况的步骤。 添加了整个应用程序的源代码以供下载。 如果要跳转到原始问题,请向下滚动到最后一个标题。 请让我知道这些问题。 谢谢 摘要 阿拉斯加州首府朱诺有一个AST(阿拉斯加州警官)总部大楼,他们希望在这个大屏幕上显示一个自动显示和更新的单个号码。 该号码称为(犯罪商数指数)或CQI CQI基本上是一个计算出的数字,用于显示当前的国家犯罪情况…… 如何计算? 运行屏幕的程序是一个.NET WPF应用程序,它通过Hot IObservable流不断接收CrimeReport对象。 每个城市计算CQI,然后采用所有城市的Sum(),称为国家CQI以下是计算国家CQI的步骤 第1步 – 接收犯罪数据 每次报告犯罪时,CrimeReport都会发送到.NET应用程序。 它具有以下组件 犯罪日期时间 市 – 市/县管辖 SeverityLevel – 严肃/非严重 EstimatedSolveTime – AST确定解决犯罪所需的估计天数。 所以在这一步中,我们订阅了IObservable并创建了MainViewModel的实例 IObservable reportSource = mainSource.Publish(); MainVM = new MainViewModel(reportSource); reportSource.Connect(); 第2步 – 逐个城市,每个城市做数学 当您收到报告时,请按城市分组 var cities = reportSource.GroupBy(k => k.City) .Select(g => new CityDto(g.Key, […]

在不停止序列的情况下处理Reactive Extensions中的exception

为什么RX具有以下语法OnNext* (OnError|OnCompleted) ? 而不是(OnNext|OnError)* OnCompleted ? 从实现角度来看这是非常清楚的(这也与IEnumerable和yield有共同的语义),但我想与现实生活情况不同。 在现实生活中 – 生产者生成混合的数据流和exception(exception不会破坏生产者)。 问题:如果我理解正确,唯一可能的解决方案是使可观察的返回复杂数据结构从初始数据和生成的exception( Observable.Timestamp()和.TimeInterval()具有类似的概念)组合或是否还有其他选项? 目前我得出了以下解决方案:在可观察的生产者内部我手动处理exeptions并将它们传递给以下数据结构,这是我的可观察结果 public class ValueOrException { private readonly Exception ex; private readonly T value; public ValueOrException(T value, Exception ex) { this.value = value; this.ex = ex; } public ValueOrException(T value) { this.value = value; } public T Value { get { return this.value; } […]

如果没有GetAwaiter方法,我可以等待吗?

我看到一些关于设计自定义等待类型的文章: http://books.google.com.br/books?id=1On1glEbTfIC&pg=PA83&lpg=PA83&dq=create+a+custom+awaitable+type 现在考虑以下示例: 和: private async void BtnA_Click(object sender, RoutedEventArgs e) { MessageBox.Show(“Awaiting Button B..”); var sx = Observable.FromEvent(a => (b, c) => a(c), add => BtnB.PreviewMouseDown += add, rem => BtnB.PreviewMouseDown -= rem) .Do(a => a.Handled = true) .Take(1); await sx; MessageBox.Show(“Button B Pressed after Button A”); } private void BtnB_OnClick(object sender, RoutedEventArgs e) […]

以固定或最小间隔处理Rx事件

我有一系列事件,每10-1000毫秒发生一次。 我订阅了这个事件来源,但希望以500ms的固定(或最小)间隔处理它们。 我也想一次处理一个事件,而不是批量处理(比如Buffer(x> 1))。 伪代码中有类似的东西: observable.MinimumInterval(TimeSpan.FromMiliseconds(500)).Subscribe(v=>…); 试过例如: observable.Buffer(1).Delay(TimeSpan.FromMiliseconds(500).Subscribe(v=>…); 以及许多其他潜在的解决方案。 到目前为止没有运气。 有任何想法吗?

如何从rx订阅回调异步function?

我想在Rx订阅中回调一个异步函数。 就像这样: public class Consumer { private readonly Service _service = new Service(); public ReplaySubject Results = new ReplaySubject(); public void Trigger() { Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync()); } public Task RunAsync() { return _service.DoAsync(); } } public class Service { public async Task DoAsync() { return await Task.Run(() => Do()); } private static string […]

NewThreadScheduler.Default计划在同一个线程上的所有工作

我目前正试图用RX .NET来解决并发问题,并对某些事情感到困惑。 我想并行运行四个相对较慢的任务,所以我假设NewThreadScheduler.Default将是要走的路,因为它“代表一个在一个单独的线程上调度每个工作单元的对象”。 。 这是我的设置代码: static void Test() { Console.WriteLine(“Starting. Thread {0}”, Thread.CurrentThread.ManagedThreadId); var query = Enumerable.Range(1, 4); var obsQuery = query.ToObservable(NewThreadScheduler.Default); obsQuery.Subscribe(DoWork, Done); Console.WriteLine(“Last line. Thread {0}”, Thread.CurrentThread.ManagedThreadId); } static void DoWork(int i) { Thread.Sleep(500); Console.WriteLine(“{0} Thread {1}”, i, Thread.CurrentThread.ManagedThreadId); } static void Done() { Console.WriteLine(“Done. Thread {0}”, Thread.CurrentThread.ManagedThreadId); } 我假设“X线程Y”每次都会输出不同的线程ID,但实际输出是: Starting. Thread 1 […]

.NET的react native扩展(Rx):完成所有事件后立即执行操作

作为概念certificate,我想在选中复选框并在文本框中按下某个键(按任意顺序) 后 ,在文本框中写“完成”。 我希望这段代码可以处理这个问题,但是只要事件发生,它就会写完Done。 谢谢你的帮助。 var seq = Observable.FromEvent(this.checkBox, “CheckedChanged”) .Merge(Observable.FromEvent(this.textBox, “KeyPress”)); seq.Subscribe((unused) => this.resultTextBox.Text = “Done”);

如何在Rx中交替缓冲和流动实时数据流

我有两个流。 一个是数据流(可以是任何类型),另一个是作为门的布尔流。 我需要将这些组合成一个具有以下行为的流: 当门打开(最近的值为真)时,数据应该直接流过 当门关闭(最近的值为假)时,应该缓冲数据,以便在下一次打开门时作为单独的元素释放 解决方案应保留数据的所有元素并保留顺序 我真的不确定如何把它放在一起。 我一直在测试的输入是这样的: // a demo data stream that emits every second var dataStream = Observable.Interval(TimeSpan.FromSeconds(1)); // a demo flag stream that toggles every 5 seconds var toggle = false; var gateStream = Observable.Interval(TimeSpan.FromSeconds(5)) .Select(_ => toggle = !toggle);

反应式扩展是否支持滚动缓冲?

我正在使用反应式扩展将数据整理到100ms的缓冲区中: this.subscription = this.dataService .Where(x => !string.Equals(“FOO”, x.Key.Source)) .Buffer(TimeSpan.FromMilliseconds(100)) .ObserveOn(this.dispatcherService) .Where(x => x.Count != 0) .Subscribe(this.OnBufferReceived); 这很好用。 但是,我想要的行为与Buffer操作提供的行为略有不同。 基本上,如果收到另一个数据项,我想重置计时器。 只有当整个100毫秒没有收到数据时我才能处理它。 这开启了永不处理数据的可能性,因此我还应该能够指定最大计数。 我会想象一下: .SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000) 我已经环顾四周,在Rx中找不到这样的东西? 任何人都可以确认/否认这个吗?