ValveSubject:Rx的排队主题,具有内置缓冲,打开/关闭操作

我经常遇到需要某种阀门构造来控制反应管道流动的情况。 通常,在基于网络的应用程序中,我需要根据连接状态打开/关闭请求流。

该阀门主体应支持打开/关闭流,并按FIFO顺序输出交付。 阀门关闭时,应缓冲输入值。

ConcurrentQueueBlockingCollection通常用于此类场景,但会立即将线程引入图片中。 我一直在寻找这个问题的纯粹react native解决方案。

这是一个主要基于Buffer()BehaviorSubject 。 行为主体跟踪阀门的打开/关闭状态。 阀门的开口开始缓冲窗口,阀门的关闭关闭那些窗口。 缓冲区操作符的输出被“重新注入”到输入上(这样即使是观察者自己也可以关闭阀门):

 ///  /// Subject offering Open() and Close() methods, with built-in buffering. /// Note that closing the valve in the observer is supported. ///  /// As is the case with other Rx subjects, this class is not thread-safe, in that /// order of elements in the output is indeterministic in the case of concurrent operation /// of Open()/Close()/OnNext()/OnError(). To guarantee strict order of delivery even in the /// case of concurrent access,  can be used. /// Elements type public class ValveSubject : IValveSubject { private enum Valve { Open, Closed } private readonly Subject input = new Subject(); private readonly BehaviorSubject valveSubject = new BehaviorSubject(Valve.Open); private readonly Subject output = new Subject(); public ValveSubject() { var valveOperations = valveSubject.DistinctUntilChanged(); input.Buffer( bufferOpenings: valveOperations.Where(v => v == Valve.Closed), bufferClosingSelector: _ => valveOperations.Where(v => v == Valve.Open)) .SelectMany(t => t).Subscribe(input); input.Where(t => valveSubject.Value == Valve.Open).Subscribe(output); } public bool IsOpen { get { return valveSubject.Value == Valve.Open; } } public bool IsClosed { get { return valveSubject.Value == Valve.Closed; } } public void OnNext(T value) { input.OnNext(value); } public void OnError(Exception error) { input.OnError(error); } public void OnCompleted() { output.OnCompleted(); input.OnCompleted(); valveSubject.OnCompleted(); } public IDisposable Subscribe(IObserver observer) { return output.Subscribe(observer); } public void Open() { valveSubject.OnNext(Valve.Open); } public void Close() { valveSubject.OnNext(Valve.Closed); } } public interface IValveSubject:ISubject { void Open(); void Close(); } 

用于冲洗阀门的另一种方法有时是有用的,例如,当它们不再相关时消除剩余的请求。 这是一个基于先例, 适配器样式的实现

 ///  /// Subject with same semantics as , but adding flushing out capability /// which allows clearing the valve of any remaining elements before closing. ///  /// Elements type public class FlushableValveSubject : IFlushableValveSubject { private readonly BehaviorSubject> valvesSubject = new BehaviorSubject>(new ValveSubject()); private ValveSubject CurrentValve { get { return valvesSubject.Value; } } public bool IsOpen { get { return CurrentValve.IsOpen; } } public bool IsClosed { get { return CurrentValve.IsClosed; } } public void OnNext(T value) { CurrentValve.OnNext(value); } public void OnError(Exception error) { CurrentValve.OnError(error); } public void OnCompleted() { CurrentValve.OnCompleted(); valvesSubject.OnCompleted(); } public IDisposable Subscribe(IObserver observer) { return valvesSubject.Switch().Subscribe(observer); } public void Open() { CurrentValve.Open(); } public void Close() { CurrentValve.Close(); } ///  /// Discards remaining elements in the valve and reset the valve into a closed state ///  /// Replayable observable with any remaining elements public IObservable FlushAndClose() { var previousValve = CurrentValve; valvesSubject.OnNext(CreateClosedValve()); var remainingElements = new ReplaySubject(); previousValve.Subscribe(remainingElements); previousValve.Open(); return remainingElements; } private static ValveSubject CreateClosedValve() { var valve = new ValveSubject(); valve.Close(); return valve; } } public interface IFlushableValveSubject : IValveSubject { IObservable FlushAndClose(); } 

正如评论中所提到的,这些主题在并发操作的情况下不再保证交付顺序的意义上不是“线程安全的”。 以与标准Rx SubjectSubject.Synchronize()https://msdn.microsoft.com/en-us/library/hh211643%28v=vs.103%29.aspx )相同的方式,我们可以介绍一些延伸,提供阀门周围的锁定:

 public static class ValveSubjectExtensions { public static IValveSubject Synchronize(this IValveSubject valve) { return Synchronize(valve, new object()); } public static IValveSubject Synchronize(this IValveSubject valve, object gate) { return new SynchronizedValveAdapter(valve, gate); } public static IFlushableValveSubject Synchronize(this IFlushableValveSubject valve) { return Synchronize(valve, new object()); } public static IFlushableValveSubject Synchronize(this IFlushableValveSubject valve, object gate) { return new SynchronizedFlushableValveAdapter(valve, gate); } } internal class SynchronizedValveAdapter : IValveSubject { private readonly object gate; private readonly IValveSubject valve; public SynchronizedValveAdapter(IValveSubject valve, object gate) { this.valve = valve; this.gate = gate; } public void OnNext(T value) { lock (gate) { valve.OnNext(value); } } public void OnError(Exception error) { lock (gate) { valve.OnError(error); } } public void OnCompleted() { lock (gate) { valve.OnCompleted(); } } public IDisposable Subscribe(IObserver observer) { return valve.Subscribe(observer); } public void Open() { lock (gate) { valve.Open(); } } public void Close() { lock (gate) { valve.Close(); } } } internal class SynchronizedFlushableValveAdapter : SynchronizedValveAdapter, IFlushableValveSubject { private readonly object gate; private readonly IFlushableValveSubject valve; public SynchronizedFlushableValveAdapter(IFlushableValveSubject valve, object gate) : base(valve, gate) { this.valve = valve; this.gate = gate; } public IObservable FlushAndClose() { lock (gate) { return valve.FlushAndClose(); } } } 

这是我的延迟运算符实现:

 source.delay(new Func1>() { @Override public Observable call(Integer integer) { return valve.filter(new Func1() { @Override public Boolean call(Boolean aBoolean) { return aBoolean; } }); } }) .toBlocking() .subscribe(new Action1() { @Override public void call(Integer integer) { System.out.println("out: " + integer); } }); 

我们的想法是延迟所有源排放,直到“阀门打开”。 如果阀门已打开,则物品的排放不会延迟。

Rx阀门要点