跟踪Observable中的(观察者数量)?

我有一个可观察的代表股票价格的流。 如果我的可观察序列上没有观察者,我希望能够从提供价格流的远程服务器断开连接,但我不希望这样做,直到每个观察者都调用Dispose()。 然后以类似的方式,当第一个人调用Subscribe时,我想重新连接到远程服务器。

有没有办法弄清楚有多少观察者在可观察量上调用了订阅? 或者也许是一种了解观察者何时调用Subscribe或Dispose的方法?

我只想使用RefCount / Publish。 我总觉得如果我正在实施IObservable,我的工作方式太难了。

myColdObservable.Publish().RefCount(); 

这将使您的观察在每个人断开连接后停止发出脉冲。 这是一个示例:

 var coldObservable = Observable .Interval(TimeSpan.FromSeconds(1)) .ObserveOn(Scheduler.TaskPool) .Select(_ => DoSomething()); var refCountObs = coldObservable.Publish().RefCount(); CompositeDisposable d = new CompositeDisposable(); d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n))); d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n))); d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n))); //Wait a bit for work to happen System.Threading.Thread.Sleep(10000); //Everyone unsubscribes d.Dispose(); //Observe that DoSomething is not called. System.Threading.Thread.Sleep(3000); 

这并不包括您实际想知道订户数量的情况,但我认为如果没有订阅者,这符合您停止工作的要求。

有点旧,但我遇到了这个post,因为我有一个问题,我需要知道订阅者的数量。 使用Bart的建议我想出了这个扩展。

 public static IObservable CountSubscribers(this IObservable source, Action countChanged) { int count = 0; return Observable.Defer(() => { count = Interlocked.Increment(ref count); countChanged(count); return source.Finally(() => { count = Interlocked.Decrement(ref count); countChanged(count); }); }); } 

通常,不要实现IObservable; 通常情况下,Rx中已有soemthing可以直接或通过合成来帮助你。 如果您必须实现IObservable,请使用Observable.Create执行此操作,以获得观察者合同等所需的所有保证。

至于你的问题 – 使用Publish和RefCount的建议正是你正在寻找的构成。 如果由于某种原因想要计算自己,请使用Observable.Defer拦截订阅,可能使用Observable.Finally拦截序列终止。 或者,使用Observable.Create包装源,将观察者转发到包装序列,并使用计数逻辑包装返回的IDisposable(使用Disposable.Create)。

干杯,

-Bart(Rx团队)

IObservable是您可以实现的接口 。 在界面的Subscribe方法中,您可以通过在内部维护列表来跟踪观察者。

以下代码段来自MSDN。

 private List> observers; public IDisposable Subscribe(IObserver observer) { if (! observers.Contains(observer)) observers.Add(observer); // ------- If observers.Count == 1 create connection. ------- return new Unsubscriber(observers, observer); } private class Unsubscriber : IDisposable { private List>_observers; private IObserver _observer; public Unsubscriber(List> observers, IObserver observer) { this._observers = observers; this._observer = observer; } public void Dispose() { if (_observer != null && _observers.Contains(_observer)) _observers.Remove(_observer); // ----------- if observers.Count == 0 close connection ----------- } }