如何清除ReplaySubject上的缓冲区?

如何清除ReplaySubject上的缓冲区?

我需要定期清除缓冲区(在我的情况下作为日常事件的结束)以防止ReplaySubject不断增长并最终占用所有内存。

理想情况下,我希望保持相同的ReplaySubject,因为客户端订阅仍然很好。

谢谢。

ReplaySubject不提供清除缓冲区的方法,但有几种重载以不同方式约束其缓冲区:

  • 保留项目的最大TimeSpan
  • 最大项目数
  • 上述的组合,只要满足任一条件就会丢弃项目。

一个可清除的重播主题

这是一个非常有趣的问题 – 我决定看看实现ReplaySubject的变体是多么容易,你可以清楚地使用现有的主题和运算符(因为它们非常强大)。 事实certificate它相当简单。

我通过内存分析器运行它来检查它是否正确。 调用Clear()来刷新缓冲区,否则它就像常规的无界ReplaySubject

 public class RollingReplaySubject : ISubject { private readonly ReplaySubject> _subjects; private readonly IObservable _concatenatedSubjects; private ISubject _currentSubject; public RollingReplaySubject() { _subjects = new ReplaySubject>(1); _concatenatedSubjects = _subjects.Concat(); _currentSubject = new ReplaySubject(); _subjects.OnNext(_currentSubject); } public void Clear() { _currentSubject.OnCompleted(); _currentSubject = new ReplaySubject(); _subjects.OnNext(_currentSubject); } public void OnNext(T value) { _currentSubject.OnNext(value); } public void OnError(Exception error) { _currentSubject.OnError(error); } public void OnCompleted() { _currentSubject.OnCompleted(); _subjects.OnCompleted(); // a quick way to make the current ReplaySubject unreachable // except to in-flight observers, and not hold up collection _currentSubject = new Subject(); } public IDisposable Subscribe(IObserver observer) { return _concatenatedSubjects.Subscribe(observer); } } 

尊重通常的规则(与任何Subject )并且不要同时调用此类的方法 – 包括Clear() 。 如果需要,您可以轻松添加同步锁。

它通过在主ReplaySubject中嵌套一系列ReplaySubject来工作。 外部ReplaySubject( _subjects )保存一个内部ReplaySubject( _currentSubject )的缓冲区,并在构造时填充。

OnXXX方法调用_currentSubject ReplaySubject。

观察者订阅了嵌套ReplaySubjects的连续投影(保存在_concatenatedSubjects )。 由于_subjects的缓冲区大小仅为1,因此新订阅者仅获取最近的ReplaySubject事件。

每当我们需要“清除缓冲区”时,现有的_currentSubject都是OnCompleted并且新的ReplaySubject被添加到_subjects并成为新的_currentSubject

增强function

按照@ Brandon的建议,我创建了一个RollingReplaySubject版本,它使用TimeSpan或输入流来表示缓冲区清除。 我在这里为此创建了一个要点: https : //gist.github.com/james-world/c46f09f32e2d4f338b07

您可能已经拥有Observable数据源,在这种情况下,这是另一种解决方案。 这个使用现有RX结构的组合而不是建立你自己的ISubject,我个人很谨慎。

 public class ClearableReplaySubject : IConnectableObservable { private readonly IConnectableObservable> _underlying; private readonly SerialDisposable _replayConnectDisposable = new SerialDisposable(); public ClearableReplaySubject(IObservable src, IObservable clearTrigger) { _underlying = clearTrigger.Select(_ => Unit.Default).StartWith(Unit.Default) .Select(_ => { var underlyingReplay = src.Replay(); _replayConnectDisposable.Disposable = underlyingReplay.Connect(); return underlyingReplay; }) .Replay(1); } public IDisposable Subscribe(IObserver observer) { return _underlying.Switch().Subscribe(observer); } public IDisposable Connect() { return new CompositeDisposable(_underlying.Connect(), _replayConnectDisposable.Disposable); } } 

如果您将以下扩展方法添加到ObservableEx:

 public static class ObservableEx { public static IConnectableObservable ReplayWithReset(this IObservable src, IObservable resetTrigger) { return new ClearableReplaySubject(src, resetTrigger); } } 

然后你可以获取你的源并添加.ReplayWithReset(…)和你的重置触发器Observable。 这可能是一个计时器或其他什么。

 var replay = sourceObservable.ReplayWithReset(triggerObservable); var connection = replay.Connect(); 

连接的行为与重放的行为相同。