create a disposable resource that has the same lifespan as the Observable
即创建一个和 Observable 具有相同生命周期的 disposable 资源。
可以看出:当一个 ObserverA 订阅 Using 返回的 Observable 时,Using 会使用调用者传入的 Resource 工厂方法 [resourceFactory] 创建对应的资源,并且使用 Observable 工厂方法 [observableFactory] 创建 ObserverA 实际上想要订阅的 Observable。当 ObserverA 终止时,对应的 Resource 也会被释放 [dispose]。如下所示:
class MyDisposables: Disposable {
func dispose() {
print("dispose")
}
}
......
let _ = Observable
.using({ () -> MyDisposables in
return MyDisposables()
}) { _ in
return Observable<Int>
.interval(1, scheduler: MainScheduler.instance)
.take(5)
}
.subscribe(onNext: {
print()}
)0
执行结果:
1
2
3
4
let
dispose
可以看到,当 AnonymousObserver[匿名观察者] 订阅 using 返回的 Observable 时,using 内部创建了定期输出 Int 值的 ObservableA,以及资源 MyDisposables。在发送 5 个消息之后,ObservableA 被终止,与此同时,MyDisposables 资源被 using 释放。
二、监听 Obervable
RxSwift 官方 Demo 中的一段关于 GitHub 登录的代码:
= signingIn ActivityIndicator ()self
.=signingIn . signingInasObservable()let
= usernameAndPassword Observable .combineLatest(.input,username. input)password( { ,) }= .
signedIn . inputwithLatestFromloginTaps().usernameAndPassword(
,flatMapLatest { )usernamein passwordreturn API
. signup(,:username) password. passwordobserveOn
(MainScheduler.).instancecatchErrorJustReturn
(false).trackActivity
()}signingIn.
Observable
<flatMapLatest { loggedIn -> Boolinlet> =
? message "Mock: Signed in to GitHub." loggedIn : "Mock: Sign in to GitHub failed" return .
promptFor wireframe(,:message"OK" cancelAction, :[ actions] )// propagate original value.
_
inmap { } }
loggedIn
.
shareReplay
(1)......API
signingIn 是当前是否正在登录 Observable,signedIn 是当前登录动作 Observable。signedIn 体现的事件流如下:
按下登录按钮;
使用当前用户名及密码进行登录;
展示登录结果。 Rx 可观察对象的交互如下:
上面的 GitHub 登录涉及到 Rx 的相关 *** 作:
combineLatest:合并最后的 username 和 password,形成一个新的 Observable;
withLatestFrom:形成一个以 loginTaps 发送事件时间为采样时间点,发送 usernameAndPassword 内容的 Observable。
三、如何实现监听?
上面的 GitHub 登录涉及到记录开始登录的 *** 作如下,那么它是如何监听到当前是否正在登录呢?
.
signup(,:username) password. passwordobserveOn
(MainScheduler.).instancecatchErrorJustReturn
(false).trackActivity
()......signingInpublic
extension
ObservableConvertibleType public func {
trackActivity ( _:ActivityIndicator activityIndicator) Observable< -> Ereturn.> {
trackActivityOfObservable activityIndicator(self)}}
public
class
可以看到 .trackActivity(signingIn) 这个调用,一开始可能会理解为 .trackActivity(signingIn) 是在 signup(username, password: password) 后调用的,也就是说登录事件已经结束,程序才开始监听登录动作,这个理解显然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?最直白的想法应该如下:
设置当前状态为正在执行登录;
执行登录 *** 作;
设置当前状态为没有执行登录。 那么问题来了:signup(username, password: password) 生成登录动作 Observable,当有 Observer 订阅这个 Observable 时,Observable 就会执行登陆 *** 作,并发送对应的结果,这就造成 .trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样,刚好切合上面的那个假设,因此 .trackActivity(signingIn) 应该做到以下几件事情:
保留登录动作 ObservableA,返回自定义的一个 ObservableB;
当外部 Observer 订阅 ObservableB 时,设置当前状态为正在执行登录;
设置当前状态为正在执行登录,然后让外部的 Observer 重新订阅到 ObservableA;
登录 *** 作执行完毕后,设置当前状态为没有执行登录。 signingIn 所属类 ActivityIndicator 的实现如下:
ActivityIndicator : DriverConvertibleType public typealias {
E = Bool private let
= NSRecursiveLock _lock ( )privatelet
= Variable _variable ( 0)privatelet
: Driver _loading< Boolpublicinit>
( )=. {
_loading asDriver _variable().0
}map { . > distinctUntilChanged (
)}fileprivatefunc
trackActivityOfObservable
< O :ObservableConvertibleType(_ :>O) sourceObservable <O -> .EreturnObservable.> {
using (()ActivityToken{ <O -> .Einself.> increment
()returnActivityToken(
: .asObservablesource( source),:self. disposeAction) })decrementin
return. { t asObservable
( t)}}private
func
increment
( ) .lock( {
_lock).=.
_variable+value 1 _variable.value unlock (
_lock)}privatefunc
decrement
( ) .lock( {
_lock).=.
_variable-value 1 _variable.value unlock (
_lock)}publicfunc
asDriver
( ) Driver<E -> return}}> {
let _loading
=
Variable
_variable 对应 Variable 类型,Variable 实际上是 BehaviorSubject 的一层包装,不同的是它只暴露数据,不会被终止或者失败。BehaviorSubject 会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收 BehaviorSubject 随后发送的所有数据。Variable 的示例如下:
( v 0 ).asObservable(
v).subscribe(
:print(onNext) {
}).=
1.
v=value 2 0
v1value 2 =
执行结果:
.
asDriver
(
继续查看 _loading 在 ActivityIndicator 的初始化方法中的赋值如下:
_loading ) _variable.0}.
distinctUntilChangedmap { ( > ) fileprivate
functrackActivityOfObservable<O
其中 _variable 的初始值为 0,因此这部分的逻辑很容易理解:_loading 通过 _variable 发送的值是否大于 0 来判断当前是否在执行动作,并且通过 increment、decrement 方法来设置 _variable 发送的值(改变当前正在执行的动作数)。重点还是在 trackActivityOfObservable 方法:
: ObservableConvertibleType (_:O )>Observable< sourceO .E -> returnObservable.using(> {
( )ActivityToken<O{ .E -> inself.increment(> )
returnActivityToken(:.
asObservable ()source, source:self.)} disposeAction) inreturndecrement.
asObservable( { t )
} t}()ActivityToken
<
O
对应的 resourceFactory 如下:
{ .E -> inself.increment(> )
returnActivityToken(:.
asObservable ()source, source:self.)} disposeActionin return.decrementasObservable
(
observableFactory 如下:
{ t )
} tprivatestructActivityToken<
E
ActivityToken 的实现如下:
: ObservableConvertibleType ,Disposableprivate> let :Observable < {
E private _sourcelet :Cancelableinit>
( : _disposeObservable <
E,source: @escaping()>( disposeAction) ) == -> Disposables.create {
_source ( source
_dispose : )}funcdisposewith( disposeAction)
.
dispose ()} {
_disposefuncasObservable()
Observable
< Ereturn} -> }publicstatic> {
func _source
using
<
可以看到,ActivityToken 就是一个保存当前需要监听的 Observable 的资源,当外部 Observer 订阅 trackActivityOfObservable 返回的 ObservableB 时,using 调用 resourceFactory 做了以下 *** 作:
增加当前正在执行的动作数;
使用 ActivityToken 保存需要监听的 ObservableA,并且在 ActivityToken 释放时,恢复当前正在执行的动作数。 接下来在调用 observableFactory 时,using 把在 resourceFactory 中保存的 ObservableA 重新暴露给 Observer。通过这种方式,就能在 ObservableA 发送数据之前,执行额外的 *** 作 self.increment(),也就是上面 .trackActivity(signingIn) 应该做到的 A2,再由于 using 会在 observableFactory 返回的 ObservableA 终止时释放 resourceFactory 创建的资源,所以当 ObservableA 终止时,会执行 self.decrement,也就是 A4。监听当前是否正在登录就是通过 using *** 作 hold 主需要监听的 Observable,然后在执行想要的额外动作后,重新暴露 Observable 给外部的 Observer。
四、using 内部实现
using 的内部实现:
R : Disposable (_:@escaping (>)throws resourceFactoryR , :@escaping ( -> R) observableFactorythrows Observable <E) Observable -> <Ereturn>Using -> (:,> {
: )}resourceFactoryclass resourceFactoryUsing observableFactory< observableFactorySourceType
,
using 实际上返回的是一个 Using 类,Using 为 Producer 的子类,并且重载 run 方法,如下所示:
ResourceType :Disposable:Producer <SourceType typealias>E =SourceTypetypealias> {
ResourceFactory = ( )
throws ResourceType typealias ObservableFactory= ( -> ResourceType
) throws Observable <SourceTypefileprivate let -> :ResourceFactoryfileprivate>
let : _resourceFactoryObservableFactory init
( : _observableFactory@escaping ResourceFactory
,:resourceFactory@escaping ObservableFactory )= observableFactory= } overridefunc {
_resourceFactory run resourceFactory
_observableFactory < observableFactory
O
: ObserverType (_: O )>Disposablewhere observerO .E -> == E let=UsingSink ( : {
self sink , :)parent. =. observerrun observer(
sink)disposable return sink}}classProducer
< sink
Element
:
Producer 的实现如下:
Observable <Elementoverride> init ()super> {
. init() {
}overridefuncsubscribe<
O
: ObserverType (_: O )>Disposablewhere observerO .E -> == Element if!CurrentThreadScheduler . return {
run ()}isScheduleRequired {
else returnCurrentThreadSchedulerobserver.
.
schedule {
( ()instance)_inreturnself. { run (
) }}}funcobserverrun
<
O
:
ObserverType (_: O )>Disposablewhere observerO .E -> == Element abstractMethod() } } {
classUsingSink<
SourceType
,
Producer 调用 subscribe 时,会调用子类的 run,并传入当前的 Oberver。回到 Using 的实现,Producer 的 run 方法中创建了 UsingSink 实例,并调用它的 run 方法。那么,来看下最关键的 UsingSink:
ResourceType :Disposable,O :ObserverType :Sink <O ,> ObserverType whereO.>E == SourceType typealiasParent= Using < {
SourceType , ResourceType typealiasE=O .>
E private let :Parentinit
( : _parentParent ,
:Oparent) =super observer. init( {
_parent : parent
)}funcrunobserver( observer)
Disposable
var =Disposables. -> create {
( disposable ) dolet=try.
_resourceFactory {
( resource ) = _parentlet=try.
disposable _observableFactory resource
( source ) return _parentDisposables.createresource(
. subscribe(self)
source,)}catchletreturn
disposable
Disposables
. create ( error {
Observable .error()
.subscribe(selferror),)}}funcon
disposable
(
_
:
Event <E) eventswitch caselet.>next {
( event {
) : forwardOn(.valuenext(
))caselet.valueerror(
) : forwardOn(.errorerror(
))dispose()errorcase.
:forwardOn(
. )completeddispose
()}completed}
}
可以看到,在 run 方法中,UsingSink 先是调用 _resourceFactory() 创建了资源 resource,然后以 resource 为参数调用 _observableFactory() 来创建想要的 Obervable,并通过 Disposables.create(source.subscribe(self),disposable) 让 resource 的生命周期和 Obervable 一致。实际上 UsingSink 只是在 run 中做了两件特殊的事情:
在让 source 订阅自身前,创建 resource(一般会在这里做额外的 *** 作);
使用的 source 不是由上游给的,而是通过 _observableFactory 创建的(一般的 *** 作比如 map、flatMap 等都是由上游给的)。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)