如何清除ReplaySubject上的缓冲区?
如何清除ReplaySubject上的缓冲区?
我需要定期清除缓冲区(在我的情况下作为日常事件的结束)以防止ReplaySubject不断增长并最终占用所有内存。
理想情况下,我希望保持相同的ReplaySubject,因为客户端订阅仍然很好。
谢谢。
ReplaySubject
不提供清除缓冲区的方法,但有几种重载以不同方式约束其缓冲区:
- 保留项目的最大
TimeSpan
- 最大项目数
- 上述的组合,只要满足任一条件就会丢弃项目。
一个可清除的重播主题
这是一个非常有趣的问题 – 我决定看看实现ReplaySubject
的变体是多么容易,你可以清楚地使用现有的主题和运算符(因为它们非常强大)。 事实certificate它相当简单。
我通过内存分析器运行它来检查它是否正确。 调用Clear()
来刷新缓冲区,否则它就像常规的无界ReplaySubject
:
public class RollingReplaySubject : ISubject { private readonly ReplaySubject> _subjects; private readonly IObservable _concatenatedSubjects; private ISubject _currentSubject; public RollingReplaySubject() { _subjects = new ReplaySubject>(1); _concatenatedSubjects = _subjects.Concat(); _currentSubject = new ReplaySubject (); _subjects.OnNext(_currentSubject); } public void Clear() { _currentSubject.OnCompleted(); _currentSubject = new ReplaySubject (); _subjects.OnNext(_currentSubject); } public void OnNext(T value) { _currentSubject.OnNext(value); } public void OnError(Exception error) { _currentSubject.OnError(error); } public void OnCompleted() { _currentSubject.OnCompleted(); _subjects.OnCompleted(); // a quick way to make the current ReplaySubject unreachable // except to in-flight observers, and not hold up collection _currentSubject = new Subject (); } public IDisposable Subscribe(IObserver observer) { return _concatenatedSubjects.Subscribe(observer); } }
尊重通常的规则(与任何Subject
)并且不要同时调用此类的方法 – 包括Clear()
。 如果需要,您可以轻松添加同步锁。
它通过在主ReplaySubject中嵌套一系列ReplaySubject来工作。 外部ReplaySubject( _subjects
)保存一个内部ReplaySubject( _currentSubject
)的缓冲区,并在构造时填充。
OnXXX
方法调用_currentSubject
ReplaySubject。
观察者订阅了嵌套ReplaySubjects的连续投影(保存在_concatenatedSubjects
)。 由于_subjects
的缓冲区大小仅为1,因此新订阅者仅获取最近的ReplaySubject
事件。
每当我们需要“清除缓冲区”时,现有的_currentSubject
都是OnCompleted
并且新的ReplaySubject被添加到_subjects
并成为新的_currentSubject
。
增强function
按照@ Brandon的建议,我创建了一个RollingReplaySubject
版本,它使用TimeSpan
或输入流来表示缓冲区清除。 我在这里为此创建了一个要点: https : //gist.github.com/james-world/c46f09f32e2d4f338b07
您可能已经拥有Observable数据源,在这种情况下,这是另一种解决方案。 这个使用现有RX结构的组合而不是建立你自己的ISubject,我个人很谨慎。
public class ClearableReplaySubject : IConnectableObservable { private readonly IConnectableObservable> _underlying; private readonly SerialDisposable _replayConnectDisposable = new SerialDisposable(); public ClearableReplaySubject(IObservable src, IObservable clearTrigger) { _underlying = clearTrigger.Select(_ => Unit.Default).StartWith(Unit.Default) .Select(_ => { var underlyingReplay = src.Replay(); _replayConnectDisposable.Disposable = underlyingReplay.Connect(); return underlyingReplay; }) .Replay(1); } public IDisposable Subscribe(IObserver observer) { return _underlying.Switch().Subscribe(observer); } public IDisposable Connect() { return new CompositeDisposable(_underlying.Connect(), _replayConnectDisposable.Disposable); } }
如果您将以下扩展方法添加到ObservableEx:
public static class ObservableEx { public static IConnectableObservable ReplayWithReset(this IObservable src, IObservable resetTrigger) { return new ClearableReplaySubject(src, resetTrigger); } }
然后你可以获取你的源并添加.ReplayWithReset(…)和你的重置触发器Observable。 这可能是一个计时器或其他什么。
var replay = sourceObservable.ReplayWithReset(triggerObservable); var connection = replay.Connect();
连接的行为与重放的行为相同。