“ConnectedObservable” – This is a kind of observable which doesn’t emit items even if subscribed to.It only starts emitting items after its .connect() method is called.
.publish()– This method allows us to change an ordinary observable into a “ConnectedObservable”.Simply call this method on an ordinary observable and it becomes a connected one.
refcount() – This operator keeps track of how many subscribers are subscribed to the resulting Observable andrefrains from disconnecting from the source ConnectedObservable until all such Observables are unsubscribed.
所有的这一切看起来都很好。然而这个实现会有一个可能的竞争条件。因为这有两个subscribers(debounce and buffer)而且会在不同的时间点发生,所以竞争条件就会发生。 记住RxBus是由hot/live Subject支持的不断的发射数据。通过使用share操作符,我们保证引用的是同一个资源。 而不是subscribers在不同的时间点订阅,他们会收到准确的相同的数据。
The race condition is when the two consumers subscribe. Often on a hot stream it doesn’t matter when subscribers come and go,and refCount is perfect for that.The race condition refCount protects against is having only 1 active subscription upstream. However,if 2 Subscribers subscribe to a refcounted stream that emits 1, 2, 3, 4, 5, the first may get 1, 2, 3, 4, 5 and the second may get 2, 3, 4, 5.To ensure all subscribers start at exactly the same time and get the exact same values, refCount can not be used.Either ConnectableObservable with a manual, imperative invocation of connect needs to be done, or the variant of publish(function)which connects everything within the function before connecting the upstream.
// don't start emitting items just yet by turning the observable to a connected oneConnectableObservable tapEventEmitter = _rxBus.toObserverable().publish();tapEventEmitter.publish((Func1) (stream) -> {// inside `publish`, "stream" is truly multicasted // applying the same technique for getting a debounced buffer sequence return stream.buffer(stream.debounce(1, TimeUnit.SECONDS)); }).subscribe((Action1) (taps) { _showTapCount(taps.size()); }); // start listening to events now tapEventEmitter.connect();