Error[8]: Undefined offset: 955, File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 121
File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 473, decode(

一、前言 ReactiveX 官方文档 对于 Using 的描述如下:
create a disposable resource that has the same lifespan as the Observable
即创建一个和 Observable 具有相同生命周期的 disposable 资源。

可以看出:当一个 ObserverA 订阅 Using 返回的 Observable 时,Using 会使用调用者传入的 Resource 工厂方法 [resourceFactory] 创建对应的资源,并且使用 Observable 工厂方法 [observableFactory] 创建 ObserverA 实际上想要订阅的 Observable。当 ObserverA 终止时,对应的 Resource 也会被释放 [dispose]。如下所示:
class MyDisposables: Disposable {
    func dispose() {
        print("dispose")
    }
}

......

let _ = Observable
    .using({ () -> MyDisposables in
        return MyDisposables()
    }) { _ in
        return Observable<Int>
            .interval(1, scheduler: MainScheduler.instance)
            .take(5)
    }
    .subscribe(onNext: {
        print()}
    )0
执行结果:
1
2
3
4
let
dispose
可以看到,当 AnonymousObserver[匿名观察者] 订阅 using 返回的 Observable 时,using 内部创建了定期输出 Int 值的 ObservableA,以及资源 MyDisposables。在发送 5 个消息之后,ObservableA 被终止,与此同时,MyDisposables 资源被 using 释放。 二、监听 Obervable RxSwift 官方 Demo 中的一段关于 GitHub 登录的代码:
= signingIn ActivityIndicator ()self
.=signingIn . signingInasObservable()let

= usernameAndPassword Observable .combineLatest(.input,username. input)password( { ,) }= .

signedIn . inputwithLatestFromloginTaps().usernameAndPassword(
    ,flatMapLatest { )usernamein passwordreturn API
        . signup(,:username) password. passwordobserveOn
            (MainScheduler.).instancecatchErrorJustReturn
            (false).trackActivity
            ()}signingIn.
    Observable
    <flatMapLatest { loggedIn -> Boolinlet> =
        ? message "Mock: Signed in to GitHub." loggedIn : "Mock: Sign in to GitHub failed" return .
        promptFor wireframe(,:message"OK" cancelAction, :[ actions] )// propagate original value.
            _
            inmap { } }
                loggedIn
            .
    shareReplay
    (1)......API
signingIn 是当前是否正在登录 Observable,signedIn 是当前登录动作 Observable。signedIn 体现的事件流如下: 按下登录按钮; 使用当前用户名及密码进行登录; 展示登录结果。 Rx 可观察对象的交互如下:

上面的 GitHub 登录涉及到 Rx 的相关 *** 作: combineLatest:合并最后的 username 和 password,形成一个新的 Observable; withLatestFrom:形成一个以 loginTaps 发送事件时间为采样时间点,发送 usernameAndPassword 内容的 Observable。 三、如何实现监听? 上面的 GitHub 登录涉及到记录开始登录的 *** 作如下,那么它是如何监听到当前是否正在登录呢?
.

signup(,:username) password. passwordobserveOn
(MainScheduler.).instancecatchErrorJustReturn
(false).trackActivity
()......signingInpublic

extension

ObservableConvertibleType public func {
    trackActivity ( _:ActivityIndicator activityIndicator) Observable< -> Ereturn.> {
        trackActivityOfObservable activityIndicator(self)}}
    public
class
可以看到 .trackActivity(signingIn) 这个调用,一开始可能会理解为 .trackActivity(signingIn) 是在 signup(username, password: password) 后调用的,也就是说登录事件已经结束,程序才开始监听登录动作,这个理解显然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?最直白的想法应该如下: 设置当前状态为正在执行登录; 执行登录 *** 作; 设置当前状态为没有执行登录。 那么问题来了:signup(username, password: password) 生成登录动作 Observable,当有 Observer 订阅这个 Observable 时,Observable 就会执行登陆 *** 作,并发送对应的结果,这就造成 .trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样,刚好切合上面的那个假设,因此 .trackActivity(signingIn) 应该做到以下几件事情: 保留登录动作 ObservableA,返回自定义的一个 ObservableB; 当外部 Observer 订阅 ObservableB 时,设置当前状态为正在执行登录; 设置当前状态为正在执行登录,然后让外部的 Observer 重新订阅到 ObservableA; 登录 *** 作执行完毕后,设置当前状态为没有执行登录。 signingIn 所属类 ActivityIndicator 的实现如下:
ActivityIndicator : DriverConvertibleType public typealias {
    E = Bool private let

    = NSRecursiveLock _lock ( )privatelet
    = Variable _variable ( 0)privatelet
    : Driver _loading< Boolpublicinit>

    ( )=. {
        _loading asDriver _variable().0
            }map { . > distinctUntilChanged (
            )}fileprivatefunc
    trackActivityOfObservable

    < O :ObservableConvertibleType(_ :>O) sourceObservable <O -> .EreturnObservable.> {
        using (()ActivityToken{ <O -> .Einself.> increment
            ()returnActivityToken(
            : .asObservablesource( source),:self. disposeAction) })decrementin
        return. { t asObservable
            ( t)}}private
        func
    increment

    ( ) .lock( {
        _lock).=.
        _variable+value 1 _variable.value unlock (
        _lock)}privatefunc
    decrement

    ( ) .lock( {
        _lock).=.
        _variable-value 1 _variable.value unlock (
        _lock)}publicfunc
    asDriver

    ( ) Driver<E -> return}}> {
        let _loading
    =
Variable
_variable 对应 Variable 类型,Variable 实际上是 BehaviorSubject 的一层包装,不同的是它只暴露数据,不会被终止或者失败。BehaviorSubject 会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收 BehaviorSubject 随后发送的所有数据。Variable 的示例如下:
( v 0 ).asObservable(
v).subscribe(
    :print(onNext) {
        }).=
    1.

v=value 2 0
v1value 2 =
执行结果:
.
asDriver
(
继续查看 _loading 在 ActivityIndicator 的初始化方法中的赋值如下:
_loading ) _variable.0}.
	distinctUntilChangedmap { ( > ) fileprivate
	functrackActivityOfObservable<O
其中 _variable 的初始值为 0,因此这部分的逻辑很容易理解:_loading 通过 _variable 发送的值是否大于 0 来判断当前是否在执行动作,并且通过 increment、decrement 方法来设置 _variable 发送的值(改变当前正在执行的动作数)。重点还是在 trackActivityOfObservable 方法:
: ObservableConvertibleType (_:O )>Observable< sourceO .E -> returnObservable.using(> {
    ( )ActivityToken<O{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeAction) inreturndecrement.
    asObservable( { t )
        } t}()ActivityToken
    <
O
对应的 resourceFactory 如下:
{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeActionin return.decrementasObservable
    (
observableFactory 如下:
{ t )
        } tprivatestructActivityToken<
    E
ActivityToken 的实现如下:
: ObservableConvertibleType ,Disposableprivate> let :Observable < {
    E private _sourcelet :Cancelableinit>
    ( : _disposeObservable <

    E,source: @escaping()>( disposeAction) ) == -> Disposables.create {
        _source ( source
        _dispose : )}funcdisposewith( disposeAction)
    .

    dispose ()} {
        _disposefuncasObservable()
    Observable

    < Ereturn} -> }publicstatic> {
        func _source
    using
<
可以看到,ActivityToken 就是一个保存当前需要监听的 Observable 的资源,当外部 Observer 订阅 trackActivityOfObservable 返回的 ObservableB 时,using 调用 resourceFactory 做了以下 *** 作: 增加当前正在执行的动作数; 使用 ActivityToken 保存需要监听的 ObservableA,并且在 ActivityToken 释放时,恢复当前正在执行的动作数。 接下来在调用 observableFactory 时,using 把在 resourceFactory 中保存的 ObservableA 重新暴露给 Observer。通过这种方式,就能在 ObservableA 发送数据之前,执行额外的 *** 作 self.increment(),也就是上面 .trackActivity(signingIn) 应该做到的 A2,再由于 using 会在 observableFactory 返回的 ObservableA 终止时释放 resourceFactory 创建的资源,所以当 ObservableA 终止时,会执行 self.decrement,也就是 A4。监听当前是否正在登录就是通过 using *** 作 hold 主需要监听的 Observable,然后在执行想要的额外动作后,重新暴露 Observable 给外部的 Observer。 四、using 内部实现 using 的内部实现:
R : Disposable (_:@escaping (>)throws resourceFactoryR , :@escaping ( -> R) observableFactorythrows Observable <E) Observable -> <Ereturn>Using -> (:,> {
        : )}resourceFactoryclass resourceFactoryUsing observableFactory< observableFactorySourceType
    ,
using 实际上返回的是一个 Using 类,Using 为 Producer 的子类,并且重载 run 方法,如下所示:
ResourceType :Disposable:Producer <SourceType typealias>E =SourceTypetypealias> {
    
    ResourceFactory = ( )
    
    throws ResourceType typealias ObservableFactory= ( -> ResourceType
    ) throws Observable <SourceTypefileprivate let -> :ResourceFactoryfileprivate>
    
    let : _resourceFactoryObservableFactory init
    ( : _observableFactory@escaping ResourceFactory
    
    
    ,:resourceFactory@escaping ObservableFactory )= observableFactory= } overridefunc {
        _resourceFactory run resourceFactory
        _observableFactory < observableFactory
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == E let=UsingSink ( : {
        self sink , :)parent. =. observerrun observer(
        sink)disposable return sink}}classProducer
        < sink
    Element
:
Producer 的实现如下:
Observable <Elementoverride> init ()super> {
    . init() {
        }overridefuncsubscribe<
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == Element if!CurrentThreadScheduler . return {
        run ()}isScheduleRequired {
            else returnCurrentThreadSchedulerobserver.
        .
        schedule {
            ( ()instance)_inreturnself. { run (
                ) }}}funcobserverrun
            <
        O
    :
    
    ObserverType (_: O )>Disposablewhere observerO .E -> == Element abstractMethod() } } {
        classUsingSink<
    SourceType
,
Producer 调用 subscribe 时,会调用子类的 run,并传入当前的 Oberver。回到 Using 的实现,Producer 的 run 方法中创建了 UsingSink 实例,并调用它的 run 方法。那么,来看下最关键的 UsingSink:
ResourceType :Disposable,O :ObserverType :Sink <O ,> ObserverType whereO.>E == SourceType typealiasParent= Using < {

    SourceType , ResourceType typealiasE=O .>
    E private let :Parentinit

    ( : _parentParent ,
    
    :Oparent) =super observer. init( {
        _parent : parent
        )}funcrunobserver( observer)
    Disposable
    
    var =Disposables. -> create {
        ( disposable ) dolet=try.
        
        _resourceFactory {
            ( resource ) = _parentlet=try.
            disposable _observableFactory resource
            ( source ) return _parentDisposables.createresource(
            
            . subscribe(self)
                source,)}catchletreturn
                disposable
            Disposables
        . create ( error {
            Observable .error()
                .subscribe(selferror),)}}funcon
                disposable
            (
        _
    :
    
    Event <E) eventswitch caselet.>next {
        ( event {
        ) : forwardOn(.valuenext(
            ))caselet.valueerror(
        ) : forwardOn(.errorerror(
            ))dispose()errorcase.
            :forwardOn(
        . )completeddispose
            ()}completed}
            }[+++][+++]
        [+++]
    [+++]
[+++]
可以看到,在 run 方法中,UsingSink 先是调用 _resourceFactory() 创建了资源 resource,然后以 resource 为参数调用 _observableFactory() 来创建想要的 Obervable,并通过 Disposables.create(source.subscribe(self),disposable) 让 resource 的生命周期和 Obervable 一致。实际上 UsingSink 只是在 run 中做了两件特殊的事情: 在让 source 订阅自身前,创建 resource(一般会在这里做额外的 *** 作); 使用的 source 不是由上游给的,而是通过 _observableFactory 创建的(一般的 *** 作比如 map、flatMap 等都是由上游给的)。)
File: /www/wwwroot/outofmemory.cn/tmp/route_read.php, Line: 126, InsideLink()
File: /www/wwwroot/outofmemory.cn/tmp/index.inc.php, Line: 165, include(/www/wwwroot/outofmemory.cn/tmp/route_read.php)
File: /www/wwwroot/outofmemory.cn/index.php, Line: 30, include(/www/wwwroot/outofmemory.cn/tmp/index.inc.php)
Error[8]: Undefined offset: 956, File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 121
File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 473, decode(

一、前言 ReactiveX 官方文档 对于 Using 的描述如下:
create a disposable resource that has the same lifespan as the Observable
即创建一个和 Observable 具有相同生命周期的 disposable 资源。

可以看出:当一个 ObserverA 订阅 Using 返回的 Observable 时,Using 会使用调用者传入的 Resource 工厂方法 [resourceFactory] 创建对应的资源,并且使用 Observable 工厂方法 [observableFactory] 创建 ObserverA 实际上想要订阅的 Observable。当 ObserverA 终止时,对应的 Resource 也会被释放 [dispose]。如下所示:
class MyDisposables: Disposable {
    func dispose() {
        print("dispose")
    }
}

......

let _ = Observable
    .using({ () -> MyDisposables in
        return MyDisposables()
    }) { _ in
        return Observable<Int>
            .interval(1, scheduler: MainScheduler.instance)
            .take(5)
    }
    .subscribe(onNext: {
        print()}
    )0
执行结果:
1
2
3
4
let
dispose
可以看到,当 AnonymousObserver[匿名观察者] 订阅 using 返回的 Observable 时,using 内部创建了定期输出 Int 值的 ObservableA,以及资源 MyDisposables。在发送 5 个消息之后,ObservableA 被终止,与此同时,MyDisposables 资源被 using 释放。 二、监听 Obervable RxSwift 官方 Demo 中的一段关于 GitHub 登录的代码:
= signingIn ActivityIndicator ()self
.=signingIn . signingInasObservable()let

= usernameAndPassword Observable .combineLatest(.input,username. input)password( { ,) }= .

signedIn . inputwithLatestFromloginTaps().usernameAndPassword(
    ,flatMapLatest { )usernamein passwordreturn API
        . signup(,:username) password. passwordobserveOn
            (MainScheduler.).instancecatchErrorJustReturn
            (false).trackActivity
            ()}signingIn.
    Observable
    <flatMapLatest { loggedIn -> Boolinlet> =
        ? message "Mock: Signed in to GitHub." loggedIn : "Mock: Sign in to GitHub failed" return .
        promptFor wireframe(,:message"OK" cancelAction, :[ actions] )// propagate original value.
            _
            inmap { } }
                loggedIn
            .
    shareReplay
    (1)......API
signingIn 是当前是否正在登录 Observable,signedIn 是当前登录动作 Observable。signedIn 体现的事件流如下: 按下登录按钮; 使用当前用户名及密码进行登录; 展示登录结果。 Rx 可观察对象的交互如下:

上面的 GitHub 登录涉及到 Rx 的相关 *** 作: combineLatest:合并最后的 username 和 password,形成一个新的 Observable; withLatestFrom:形成一个以 loginTaps 发送事件时间为采样时间点,发送 usernameAndPassword 内容的 Observable。 三、如何实现监听? 上面的 GitHub 登录涉及到记录开始登录的 *** 作如下,那么它是如何监听到当前是否正在登录呢?
.

signup(,:username) password. passwordobserveOn
(MainScheduler.).instancecatchErrorJustReturn
(false).trackActivity
()......signingInpublic

extension

ObservableConvertibleType public func {
    trackActivity ( _:ActivityIndicator activityIndicator) Observable< -> Ereturn.> {
        trackActivityOfObservable activityIndicator(self)}}
    public
class
可以看到 .trackActivity(signingIn) 这个调用,一开始可能会理解为 .trackActivity(signingIn) 是在 signup(username, password: password) 后调用的,也就是说登录事件已经结束,程序才开始监听登录动作,这个理解显然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?最直白的想法应该如下: 设置当前状态为正在执行登录; 执行登录 *** 作; 设置当前状态为没有执行登录。 那么问题来了:signup(username, password: password) 生成登录动作 Observable,当有 Observer 订阅这个 Observable 时,Observable 就会执行登陆 *** 作,并发送对应的结果,这就造成 .trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样,刚好切合上面的那个假设,因此 .trackActivity(signingIn) 应该做到以下几件事情: 保留登录动作 ObservableA,返回自定义的一个 ObservableB; 当外部 Observer 订阅 ObservableB 时,设置当前状态为正在执行登录; 设置当前状态为正在执行登录,然后让外部的 Observer 重新订阅到 ObservableA; 登录 *** 作执行完毕后,设置当前状态为没有执行登录。 signingIn 所属类 ActivityIndicator 的实现如下:
ActivityIndicator : DriverConvertibleType public typealias {
    E = Bool private let

    = NSRecursiveLock _lock ( )privatelet
    = Variable _variable ( 0)privatelet
    : Driver _loading< Boolpublicinit>

    ( )=. {
        _loading asDriver _variable().0
            }map { . > distinctUntilChanged (
            )}fileprivatefunc
    trackActivityOfObservable

    < O :ObservableConvertibleType(_ :>O) sourceObservable <O -> .EreturnObservable.> {
        using (()ActivityToken{ <O -> .Einself.> increment
            ()returnActivityToken(
            : .asObservablesource( source),:self. disposeAction) })decrementin
        return. { t asObservable
            ( t)}}private
        func
    increment

    ( ) .lock( {
        _lock).=.
        _variable+value 1 _variable.value unlock (
        _lock)}privatefunc
    decrement

    ( ) .lock( {
        _lock).=.
        _variable-value 1 _variable.value unlock (
        _lock)}publicfunc
    asDriver

    ( ) Driver<E -> return}}> {
        let _loading
    =
Variable
_variable 对应 Variable 类型,Variable 实际上是 BehaviorSubject 的一层包装,不同的是它只暴露数据,不会被终止或者失败。BehaviorSubject 会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收 BehaviorSubject 随后发送的所有数据。Variable 的示例如下:
( v 0 ).asObservable(
v).subscribe(
    :print(onNext) {
        }).=
    1.

v=value 2 0
v1value 2 =
执行结果:
.
asDriver
(
继续查看 _loading 在 ActivityIndicator 的初始化方法中的赋值如下:
_loading ) _variable.0}.
	distinctUntilChangedmap { ( > ) fileprivate
	functrackActivityOfObservable<O
其中 _variable 的初始值为 0,因此这部分的逻辑很容易理解:_loading 通过 _variable 发送的值是否大于 0 来判断当前是否在执行动作,并且通过 increment、decrement 方法来设置 _variable 发送的值(改变当前正在执行的动作数)。重点还是在 trackActivityOfObservable 方法:
: ObservableConvertibleType (_:O )>Observable< sourceO .E -> returnObservable.using(> {
    ( )ActivityToken<O{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeAction) inreturndecrement.
    asObservable( { t )
        } t}()ActivityToken
    <
O
对应的 resourceFactory 如下:
{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeActionin return.decrementasObservable
    (
observableFactory 如下:
{ t )
        } tprivatestructActivityToken<
    E
ActivityToken 的实现如下:
: ObservableConvertibleType ,Disposableprivate> let :Observable < {
    E private _sourcelet :Cancelableinit>
    ( : _disposeObservable <

    E,source: @escaping()>( disposeAction) ) == -> Disposables.create {
        _source ( source
        _dispose : )}funcdisposewith( disposeAction)
    .

    dispose ()} {
        _disposefuncasObservable()
    Observable

    < Ereturn} -> }publicstatic> {
        func _source
    using
<
可以看到,ActivityToken 就是一个保存当前需要监听的 Observable 的资源,当外部 Observer 订阅 trackActivityOfObservable 返回的 ObservableB 时,using 调用 resourceFactory 做了以下 *** 作: 增加当前正在执行的动作数; 使用 ActivityToken 保存需要监听的 ObservableA,并且在 ActivityToken 释放时,恢复当前正在执行的动作数。 接下来在调用 observableFactory 时,using 把在 resourceFactory 中保存的 ObservableA 重新暴露给 Observer。通过这种方式,就能在 ObservableA 发送数据之前,执行额外的 *** 作 self.increment(),也就是上面 .trackActivity(signingIn) 应该做到的 A2,再由于 using 会在 observableFactory 返回的 ObservableA 终止时释放 resourceFactory 创建的资源,所以当 ObservableA 终止时,会执行 self.decrement,也就是 A4。监听当前是否正在登录就是通过 using *** 作 hold 主需要监听的 Observable,然后在执行想要的额外动作后,重新暴露 Observable 给外部的 Observer。 四、using 内部实现 using 的内部实现:
R : Disposable (_:@escaping (>)throws resourceFactoryR , :@escaping ( -> R) observableFactorythrows Observable <E) Observable -> <Ereturn>Using -> (:,> {
        : )}resourceFactoryclass resourceFactoryUsing observableFactory< observableFactorySourceType
    ,
using 实际上返回的是一个 Using 类,Using 为 Producer 的子类,并且重载 run 方法,如下所示:
ResourceType :Disposable:Producer <SourceType typealias>E =SourceTypetypealias> {
    
    ResourceFactory = ( )
    
    throws ResourceType typealias ObservableFactory= ( -> ResourceType
    ) throws Observable <SourceTypefileprivate let -> :ResourceFactoryfileprivate>
    
    let : _resourceFactoryObservableFactory init
    ( : _observableFactory@escaping ResourceFactory
    
    
    ,:resourceFactory@escaping ObservableFactory )= observableFactory= } overridefunc {
        _resourceFactory run resourceFactory
        _observableFactory < observableFactory
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == E let=UsingSink ( : {
        self sink , :)parent. =. observerrun observer(
        sink)disposable return sink}}classProducer
        < sink
    Element
:
Producer 的实现如下:
Observable <Elementoverride> init ()super> {
    . init() {
        }overridefuncsubscribe<
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == Element if!CurrentThreadScheduler . return {
        run ()}isScheduleRequired {
            else returnCurrentThreadSchedulerobserver.
        .
        schedule {
            ( ()instance)_inreturnself. { run (
                ) }}}funcobserverrun
            <
        O
    :
    
    ObserverType (_: O )>Disposablewhere observerO .E -> == Element abstractMethod() } } {
        classUsingSink<
    SourceType
,
Producer 调用 subscribe 时,会调用子类的 run,并传入当前的 Oberver。回到 Using 的实现,Producer 的 run 方法中创建了 UsingSink 实例,并调用它的 run 方法。那么,来看下最关键的 UsingSink:
ResourceType :Disposable,O :ObserverType :Sink <O ,> ObserverType whereO.>E == SourceType typealiasParent= Using < {

    SourceType , ResourceType typealiasE=O .>
    E private let :Parentinit

    ( : _parentParent ,
    
    :Oparent) =super observer. init( {
        _parent : parent
        )}funcrunobserver( observer)
    Disposable
    
    var =Disposables. -> create {
        ( disposable ) dolet=try.
        
        _resourceFactory {
            ( resource ) = _parentlet=try.
            disposable _observableFactory resource
            ( source ) return _parentDisposables.createresource(
            
            . subscribe(self)
                source,)}catchletreturn
                disposable
            Disposables
        . create ( error {
            Observable .error()
                .subscribe(selferror),)}}funcon
                disposable
            (
        _
    :
    
    Event <E) eventswitch caselet.>next {
        ( event {
        ) : forwardOn(.valuenext(
            ))caselet.valueerror(
        ) : forwardOn(.errorerror(
            ))dispose()errorcase.
            :forwardOn(
        . )completeddispose
            ()}completed}
            }[+++]
        [+++]
    [+++]
[+++]
可以看到,在 run 方法中,UsingSink 先是调用 _resourceFactory() 创建了资源 resource,然后以 resource 为参数调用 _observableFactory() 来创建想要的 Obervable,并通过 Disposables.create(source.subscribe(self),disposable) 让 resource 的生命周期和 Obervable 一致。实际上 UsingSink 只是在 run 中做了两件特殊的事情: 在让 source 订阅自身前,创建 resource(一般会在这里做额外的 *** 作); 使用的 source 不是由上游给的,而是通过 _observableFactory 创建的(一般的 *** 作比如 map、flatMap 等都是由上游给的)。)
File: /www/wwwroot/outofmemory.cn/tmp/route_read.php, Line: 126, InsideLink()
File: /www/wwwroot/outofmemory.cn/tmp/index.inc.php, Line: 165, include(/www/wwwroot/outofmemory.cn/tmp/route_read.php)
File: /www/wwwroot/outofmemory.cn/index.php, Line: 30, include(/www/wwwroot/outofmemory.cn/tmp/index.inc.php)
Error[8]: Undefined offset: 957, File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 121
File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 473, decode(

一、前言 ReactiveX 官方文档 对于 Using 的描述如下:
create a disposable resource that has the same lifespan as the Observable
即创建一个和 Observable 具有相同生命周期的 disposable 资源。

可以看出:当一个 ObserverA 订阅 Using 返回的 Observable 时,Using 会使用调用者传入的 Resource 工厂方法 [resourceFactory] 创建对应的资源,并且使用 Observable 工厂方法 [observableFactory] 创建 ObserverA 实际上想要订阅的 Observable。当 ObserverA 终止时,对应的 Resource 也会被释放 [dispose]。如下所示:
class MyDisposables: Disposable {
    func dispose() {
        print("dispose")
    }
}

......

let _ = Observable
    .using({ () -> MyDisposables in
        return MyDisposables()
    }) { _ in
        return Observable<Int>
            .interval(1, scheduler: MainScheduler.instance)
            .take(5)
    }
    .subscribe(onNext: {
        print()}
    )0
执行结果:
1
2
3
4
let
dispose
可以看到,当 AnonymousObserver[匿名观察者] 订阅 using 返回的 Observable 时,using 内部创建了定期输出 Int 值的 ObservableA,以及资源 MyDisposables。在发送 5 个消息之后,ObservableA 被终止,与此同时,MyDisposables 资源被 using 释放。 二、监听 Obervable RxSwift 官方 Demo 中的一段关于 GitHub 登录的代码:
= signingIn ActivityIndicator ()self
.=signingIn . signingInasObservable()let

= usernameAndPassword Observable .combineLatest(.input,username. input)password( { ,) }= .

signedIn . inputwithLatestFromloginTaps().usernameAndPassword(
    ,flatMapLatest { )usernamein passwordreturn API
        . signup(,:username) password. passwordobserveOn
            (MainScheduler.).instancecatchErrorJustReturn
            (false).trackActivity
            ()}signingIn.
    Observable
    <flatMapLatest { loggedIn -> Boolinlet> =
        ? message "Mock: Signed in to GitHub." loggedIn : "Mock: Sign in to GitHub failed" return .
        promptFor wireframe(,:message"OK" cancelAction, :[ actions] )// propagate original value.
            _
            inmap { } }
                loggedIn
            .
    shareReplay
    (1)......API
signingIn 是当前是否正在登录 Observable,signedIn 是当前登录动作 Observable。signedIn 体现的事件流如下: 按下登录按钮; 使用当前用户名及密码进行登录; 展示登录结果。 Rx 可观察对象的交互如下:

上面的 GitHub 登录涉及到 Rx 的相关 *** 作: combineLatest:合并最后的 username 和 password,形成一个新的 Observable; withLatestFrom:形成一个以 loginTaps 发送事件时间为采样时间点,发送 usernameAndPassword 内容的 Observable。 三、如何实现监听? 上面的 GitHub 登录涉及到记录开始登录的 *** 作如下,那么它是如何监听到当前是否正在登录呢?
.

signup(,:username) password. passwordobserveOn
(MainScheduler.).instancecatchErrorJustReturn
(false).trackActivity
()......signingInpublic

extension

ObservableConvertibleType public func {
    trackActivity ( _:ActivityIndicator activityIndicator) Observable< -> Ereturn.> {
        trackActivityOfObservable activityIndicator(self)}}
    public
class
可以看到 .trackActivity(signingIn) 这个调用,一开始可能会理解为 .trackActivity(signingIn) 是在 signup(username, password: password) 后调用的,也就是说登录事件已经结束,程序才开始监听登录动作,这个理解显然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?最直白的想法应该如下: 设置当前状态为正在执行登录; 执行登录 *** 作; 设置当前状态为没有执行登录。 那么问题来了:signup(username, password: password) 生成登录动作 Observable,当有 Observer 订阅这个 Observable 时,Observable 就会执行登陆 *** 作,并发送对应的结果,这就造成 .trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样,刚好切合上面的那个假设,因此 .trackActivity(signingIn) 应该做到以下几件事情: 保留登录动作 ObservableA,返回自定义的一个 ObservableB; 当外部 Observer 订阅 ObservableB 时,设置当前状态为正在执行登录; 设置当前状态为正在执行登录,然后让外部的 Observer 重新订阅到 ObservableA; 登录 *** 作执行完毕后,设置当前状态为没有执行登录。 signingIn 所属类 ActivityIndicator 的实现如下:
ActivityIndicator : DriverConvertibleType public typealias {
    E = Bool private let

    = NSRecursiveLock _lock ( )privatelet
    = Variable _variable ( 0)privatelet
    : Driver _loading< Boolpublicinit>

    ( )=. {
        _loading asDriver _variable().0
            }map { . > distinctUntilChanged (
            )}fileprivatefunc
    trackActivityOfObservable

    < O :ObservableConvertibleType(_ :>O) sourceObservable <O -> .EreturnObservable.> {
        using (()ActivityToken{ <O -> .Einself.> increment
            ()returnActivityToken(
            : .asObservablesource( source),:self. disposeAction) })decrementin
        return. { t asObservable
            ( t)}}private
        func
    increment

    ( ) .lock( {
        _lock).=.
        _variable+value 1 _variable.value unlock (
        _lock)}privatefunc
    decrement

    ( ) .lock( {
        _lock).=.
        _variable-value 1 _variable.value unlock (
        _lock)}publicfunc
    asDriver

    ( ) Driver<E -> return}}> {
        let _loading
    =
Variable
_variable 对应 Variable 类型,Variable 实际上是 BehaviorSubject 的一层包装,不同的是它只暴露数据,不会被终止或者失败。BehaviorSubject 会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收 BehaviorSubject 随后发送的所有数据。Variable 的示例如下:
( v 0 ).asObservable(
v).subscribe(
    :print(onNext) {
        }).=
    1.

v=value 2 0
v1value 2 =
执行结果:
.
asDriver
(
继续查看 _loading 在 ActivityIndicator 的初始化方法中的赋值如下:
_loading ) _variable.0}.
	distinctUntilChangedmap { ( > ) fileprivate
	functrackActivityOfObservable<O
其中 _variable 的初始值为 0,因此这部分的逻辑很容易理解:_loading 通过 _variable 发送的值是否大于 0 来判断当前是否在执行动作,并且通过 increment、decrement 方法来设置 _variable 发送的值(改变当前正在执行的动作数)。重点还是在 trackActivityOfObservable 方法:
: ObservableConvertibleType (_:O )>Observable< sourceO .E -> returnObservable.using(> {
    ( )ActivityToken<O{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeAction) inreturndecrement.
    asObservable( { t )
        } t}()ActivityToken
    <
O
对应的 resourceFactory 如下:
{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeActionin return.decrementasObservable
    (
observableFactory 如下:
{ t )
        } tprivatestructActivityToken<
    E
ActivityToken 的实现如下:
: ObservableConvertibleType ,Disposableprivate> let :Observable < {
    E private _sourcelet :Cancelableinit>
    ( : _disposeObservable <

    E,source: @escaping()>( disposeAction) ) == -> Disposables.create {
        _source ( source
        _dispose : )}funcdisposewith( disposeAction)
    .

    dispose ()} {
        _disposefuncasObservable()
    Observable

    < Ereturn} -> }publicstatic> {
        func _source
    using
<
可以看到,ActivityToken 就是一个保存当前需要监听的 Observable 的资源,当外部 Observer 订阅 trackActivityOfObservable 返回的 ObservableB 时,using 调用 resourceFactory 做了以下 *** 作: 增加当前正在执行的动作数; 使用 ActivityToken 保存需要监听的 ObservableA,并且在 ActivityToken 释放时,恢复当前正在执行的动作数。 接下来在调用 observableFactory 时,using 把在 resourceFactory 中保存的 ObservableA 重新暴露给 Observer。通过这种方式,就能在 ObservableA 发送数据之前,执行额外的 *** 作 self.increment(),也就是上面 .trackActivity(signingIn) 应该做到的 A2,再由于 using 会在 observableFactory 返回的 ObservableA 终止时释放 resourceFactory 创建的资源,所以当 ObservableA 终止时,会执行 self.decrement,也就是 A4。监听当前是否正在登录就是通过 using *** 作 hold 主需要监听的 Observable,然后在执行想要的额外动作后,重新暴露 Observable 给外部的 Observer。 四、using 内部实现 using 的内部实现:
R : Disposable (_:@escaping (>)throws resourceFactoryR , :@escaping ( -> R) observableFactorythrows Observable <E) Observable -> <Ereturn>Using -> (:,> {
        : )}resourceFactoryclass resourceFactoryUsing observableFactory< observableFactorySourceType
    ,
using 实际上返回的是一个 Using 类,Using 为 Producer 的子类,并且重载 run 方法,如下所示:
ResourceType :Disposable:Producer <SourceType typealias>E =SourceTypetypealias> {
    
    ResourceFactory = ( )
    
    throws ResourceType typealias ObservableFactory= ( -> ResourceType
    ) throws Observable <SourceTypefileprivate let -> :ResourceFactoryfileprivate>
    
    let : _resourceFactoryObservableFactory init
    ( : _observableFactory@escaping ResourceFactory
    
    
    ,:resourceFactory@escaping ObservableFactory )= observableFactory= } overridefunc {
        _resourceFactory run resourceFactory
        _observableFactory < observableFactory
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == E let=UsingSink ( : {
        self sink , :)parent. =. observerrun observer(
        sink)disposable return sink}}classProducer
        < sink
    Element
:
Producer 的实现如下:
Observable <Elementoverride> init ()super> {
    . init() {
        }overridefuncsubscribe<
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == Element if!CurrentThreadScheduler . return {
        run ()}isScheduleRequired {
            else returnCurrentThreadSchedulerobserver.
        .
        schedule {
            ( ()instance)_inreturnself. { run (
                ) }}}funcobserverrun
            <
        O
    :
    
    ObserverType (_: O )>Disposablewhere observerO .E -> == Element abstractMethod() } } {
        classUsingSink<
    SourceType
,
Producer 调用 subscribe 时,会调用子类的 run,并传入当前的 Oberver。回到 Using 的实现,Producer 的 run 方法中创建了 UsingSink 实例,并调用它的 run 方法。那么,来看下最关键的 UsingSink:
ResourceType :Disposable,O :ObserverType :Sink <O ,> ObserverType whereO.>E == SourceType typealiasParent= Using < {

    SourceType , ResourceType typealiasE=O .>
    E private let :Parentinit

    ( : _parentParent ,
    
    :Oparent) =super observer. init( {
        _parent : parent
        )}funcrunobserver( observer)
    Disposable
    
    var =Disposables. -> create {
        ( disposable ) dolet=try.
        
        _resourceFactory {
            ( resource ) = _parentlet=try.
            disposable _observableFactory resource
            ( source ) return _parentDisposables.createresource(
            
            . subscribe(self)
                source,)}catchletreturn
                disposable
            Disposables
        . create ( error {
            Observable .error()
                .subscribe(selferror),)}}funcon
                disposable
            (
        _
    :
    
    Event <E) eventswitch caselet.>next {
        ( event {
        ) : forwardOn(.valuenext(
            ))caselet.valueerror(
        ) : forwardOn(.errorerror(
            ))dispose()errorcase.
            :forwardOn(
        . )completeddispose
            ()}completed}
            }
        [+++]
    [+++]
[+++]
可以看到,在 run 方法中,UsingSink 先是调用 _resourceFactory() 创建了资源 resource,然后以 resource 为参数调用 _observableFactory() 来创建想要的 Obervable,并通过 Disposables.create(source.subscribe(self),disposable) 让 resource 的生命周期和 Obervable 一致。实际上 UsingSink 只是在 run 中做了两件特殊的事情: 在让 source 订阅自身前,创建 resource(一般会在这里做额外的 *** 作); 使用的 source 不是由上游给的,而是通过 _observableFactory 创建的(一般的 *** 作比如 map、flatMap 等都是由上游给的)。)
File: /www/wwwroot/outofmemory.cn/tmp/route_read.php, Line: 126, InsideLink()
File: /www/wwwroot/outofmemory.cn/tmp/index.inc.php, Line: 165, include(/www/wwwroot/outofmemory.cn/tmp/route_read.php)
File: /www/wwwroot/outofmemory.cn/index.php, Line: 30, include(/www/wwwroot/outofmemory.cn/tmp/index.inc.php)
Error[8]: Undefined offset: 958, File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 121
File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 473, decode(

一、前言 ReactiveX 官方文档 对于 Using 的描述如下:
create a disposable resource that has the same lifespan as the Observable
即创建一个和 Observable 具有相同生命周期的 disposable 资源。

可以看出:当一个 ObserverA 订阅 Using 返回的 Observable 时,Using 会使用调用者传入的 Resource 工厂方法 [resourceFactory] 创建对应的资源,并且使用 Observable 工厂方法 [observableFactory] 创建 ObserverA 实际上想要订阅的 Observable。当 ObserverA 终止时,对应的 Resource 也会被释放 [dispose]。如下所示:
class MyDisposables: Disposable {
    func dispose() {
        print("dispose")
    }
}

......

let _ = Observable
    .using({ () -> MyDisposables in
        return MyDisposables()
    }) { _ in
        return Observable<Int>
            .interval(1, scheduler: MainScheduler.instance)
            .take(5)
    }
    .subscribe(onNext: {
        print()}
    )0
执行结果:
1
2
3
4
let
dispose
可以看到,当 AnonymousObserver[匿名观察者] 订阅 using 返回的 Observable 时,using 内部创建了定期输出 Int 值的 ObservableA,以及资源 MyDisposables。在发送 5 个消息之后,ObservableA 被终止,与此同时,MyDisposables 资源被 using 释放。 二、监听 Obervable RxSwift 官方 Demo 中的一段关于 GitHub 登录的代码:
= signingIn ActivityIndicator ()self
.=signingIn . signingInasObservable()let

= usernameAndPassword Observable .combineLatest(.input,username. input)password( { ,) }= .

signedIn . inputwithLatestFromloginTaps().usernameAndPassword(
    ,flatMapLatest { )usernamein passwordreturn API
        . signup(,:username) password. passwordobserveOn
            (MainScheduler.).instancecatchErrorJustReturn
            (false).trackActivity
            ()}signingIn.
    Observable
    <flatMapLatest { loggedIn -> Boolinlet> =
        ? message "Mock: Signed in to GitHub." loggedIn : "Mock: Sign in to GitHub failed" return .
        promptFor wireframe(,:message"OK" cancelAction, :[ actions] )// propagate original value.
            _
            inmap { } }
                loggedIn
            .
    shareReplay
    (1)......API
signingIn 是当前是否正在登录 Observable,signedIn 是当前登录动作 Observable。signedIn 体现的事件流如下: 按下登录按钮; 使用当前用户名及密码进行登录; 展示登录结果。 Rx 可观察对象的交互如下:

上面的 GitHub 登录涉及到 Rx 的相关 *** 作: combineLatest:合并最后的 username 和 password,形成一个新的 Observable; withLatestFrom:形成一个以 loginTaps 发送事件时间为采样时间点,发送 usernameAndPassword 内容的 Observable。 三、如何实现监听? 上面的 GitHub 登录涉及到记录开始登录的 *** 作如下,那么它是如何监听到当前是否正在登录呢?
.

signup(,:username) password. passwordobserveOn
(MainScheduler.).instancecatchErrorJustReturn
(false).trackActivity
()......signingInpublic

extension

ObservableConvertibleType public func {
    trackActivity ( _:ActivityIndicator activityIndicator) Observable< -> Ereturn.> {
        trackActivityOfObservable activityIndicator(self)}}
    public
class
可以看到 .trackActivity(signingIn) 这个调用,一开始可能会理解为 .trackActivity(signingIn) 是在 signup(username, password: password) 后调用的,也就是说登录事件已经结束,程序才开始监听登录动作,这个理解显然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?最直白的想法应该如下: 设置当前状态为正在执行登录; 执行登录 *** 作; 设置当前状态为没有执行登录。 那么问题来了:signup(username, password: password) 生成登录动作 Observable,当有 Observer 订阅这个 Observable 时,Observable 就会执行登陆 *** 作,并发送对应的结果,这就造成 .trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样,刚好切合上面的那个假设,因此 .trackActivity(signingIn) 应该做到以下几件事情: 保留登录动作 ObservableA,返回自定义的一个 ObservableB; 当外部 Observer 订阅 ObservableB 时,设置当前状态为正在执行登录; 设置当前状态为正在执行登录,然后让外部的 Observer 重新订阅到 ObservableA; 登录 *** 作执行完毕后,设置当前状态为没有执行登录。 signingIn 所属类 ActivityIndicator 的实现如下:
ActivityIndicator : DriverConvertibleType public typealias {
    E = Bool private let

    = NSRecursiveLock _lock ( )privatelet
    = Variable _variable ( 0)privatelet
    : Driver _loading< Boolpublicinit>

    ( )=. {
        _loading asDriver _variable().0
            }map { . > distinctUntilChanged (
            )}fileprivatefunc
    trackActivityOfObservable

    < O :ObservableConvertibleType(_ :>O) sourceObservable <O -> .EreturnObservable.> {
        using (()ActivityToken{ <O -> .Einself.> increment
            ()returnActivityToken(
            : .asObservablesource( source),:self. disposeAction) })decrementin
        return. { t asObservable
            ( t)}}private
        func
    increment

    ( ) .lock( {
        _lock).=.
        _variable+value 1 _variable.value unlock (
        _lock)}privatefunc
    decrement

    ( ) .lock( {
        _lock).=.
        _variable-value 1 _variable.value unlock (
        _lock)}publicfunc
    asDriver

    ( ) Driver<E -> return}}> {
        let _loading
    =
Variable
_variable 对应 Variable 类型,Variable 实际上是 BehaviorSubject 的一层包装,不同的是它只暴露数据,不会被终止或者失败。BehaviorSubject 会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收 BehaviorSubject 随后发送的所有数据。Variable 的示例如下:
( v 0 ).asObservable(
v).subscribe(
    :print(onNext) {
        }).=
    1.

v=value 2 0
v1value 2 =
执行结果:
.
asDriver
(
继续查看 _loading 在 ActivityIndicator 的初始化方法中的赋值如下:
_loading ) _variable.0}.
	distinctUntilChangedmap { ( > ) fileprivate
	functrackActivityOfObservable<O
其中 _variable 的初始值为 0,因此这部分的逻辑很容易理解:_loading 通过 _variable 发送的值是否大于 0 来判断当前是否在执行动作,并且通过 increment、decrement 方法来设置 _variable 发送的值(改变当前正在执行的动作数)。重点还是在 trackActivityOfObservable 方法:
: ObservableConvertibleType (_:O )>Observable< sourceO .E -> returnObservable.using(> {
    ( )ActivityToken<O{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeAction) inreturndecrement.
    asObservable( { t )
        } t}()ActivityToken
    <
O
对应的 resourceFactory 如下:
{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeActionin return.decrementasObservable
    (
observableFactory 如下:
{ t )
        } tprivatestructActivityToken<
    E
ActivityToken 的实现如下:
: ObservableConvertibleType ,Disposableprivate> let :Observable < {
    E private _sourcelet :Cancelableinit>
    ( : _disposeObservable <

    E,source: @escaping()>( disposeAction) ) == -> Disposables.create {
        _source ( source
        _dispose : )}funcdisposewith( disposeAction)
    .

    dispose ()} {
        _disposefuncasObservable()
    Observable

    < Ereturn} -> }publicstatic> {
        func _source
    using
<
可以看到,ActivityToken 就是一个保存当前需要监听的 Observable 的资源,当外部 Observer 订阅 trackActivityOfObservable 返回的 ObservableB 时,using 调用 resourceFactory 做了以下 *** 作: 增加当前正在执行的动作数; 使用 ActivityToken 保存需要监听的 ObservableA,并且在 ActivityToken 释放时,恢复当前正在执行的动作数。 接下来在调用 observableFactory 时,using 把在 resourceFactory 中保存的 ObservableA 重新暴露给 Observer。通过这种方式,就能在 ObservableA 发送数据之前,执行额外的 *** 作 self.increment(),也就是上面 .trackActivity(signingIn) 应该做到的 A2,再由于 using 会在 observableFactory 返回的 ObservableA 终止时释放 resourceFactory 创建的资源,所以当 ObservableA 终止时,会执行 self.decrement,也就是 A4。监听当前是否正在登录就是通过 using *** 作 hold 主需要监听的 Observable,然后在执行想要的额外动作后,重新暴露 Observable 给外部的 Observer。 四、using 内部实现 using 的内部实现:
R : Disposable (_:@escaping (>)throws resourceFactoryR , :@escaping ( -> R) observableFactorythrows Observable <E) Observable -> <Ereturn>Using -> (:,> {
        : )}resourceFactoryclass resourceFactoryUsing observableFactory< observableFactorySourceType
    ,
using 实际上返回的是一个 Using 类,Using 为 Producer 的子类,并且重载 run 方法,如下所示:
ResourceType :Disposable:Producer <SourceType typealias>E =SourceTypetypealias> {
    
    ResourceFactory = ( )
    
    throws ResourceType typealias ObservableFactory= ( -> ResourceType
    ) throws Observable <SourceTypefileprivate let -> :ResourceFactoryfileprivate>
    
    let : _resourceFactoryObservableFactory init
    ( : _observableFactory@escaping ResourceFactory
    
    
    ,:resourceFactory@escaping ObservableFactory )= observableFactory= } overridefunc {
        _resourceFactory run resourceFactory
        _observableFactory < observableFactory
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == E let=UsingSink ( : {
        self sink , :)parent. =. observerrun observer(
        sink)disposable return sink}}classProducer
        < sink
    Element
:
Producer 的实现如下:
Observable <Elementoverride> init ()super> {
    . init() {
        }overridefuncsubscribe<
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == Element if!CurrentThreadScheduler . return {
        run ()}isScheduleRequired {
            else returnCurrentThreadSchedulerobserver.
        .
        schedule {
            ( ()instance)_inreturnself. { run (
                ) }}}funcobserverrun
            <
        O
    :
    
    ObserverType (_: O )>Disposablewhere observerO .E -> == Element abstractMethod() } } {
        classUsingSink<
    SourceType
,
Producer 调用 subscribe 时,会调用子类的 run,并传入当前的 Oberver。回到 Using 的实现,Producer 的 run 方法中创建了 UsingSink 实例,并调用它的 run 方法。那么,来看下最关键的 UsingSink:
ResourceType :Disposable,O :ObserverType :Sink <O ,> ObserverType whereO.>E == SourceType typealiasParent= Using < {

    SourceType , ResourceType typealiasE=O .>
    E private let :Parentinit

    ( : _parentParent ,
    
    :Oparent) =super observer. init( {
        _parent : parent
        )}funcrunobserver( observer)
    Disposable
    
    var =Disposables. -> create {
        ( disposable ) dolet=try.
        
        _resourceFactory {
            ( resource ) = _parentlet=try.
            disposable _observableFactory resource
            ( source ) return _parentDisposables.createresource(
            
            . subscribe(self)
                source,)}catchletreturn
                disposable
            Disposables
        . create ( error {
            Observable .error()
                .subscribe(selferror),)}}funcon
                disposable
            (
        _
    :
    
    Event <E) eventswitch caselet.>next {
        ( event {
        ) : forwardOn(.valuenext(
            ))caselet.valueerror(
        ) : forwardOn(.errorerror(
            ))dispose()errorcase.
            :forwardOn(
        . )completeddispose
            ()}completed}
            }
        
    [+++]
[+++]
可以看到,在 run 方法中,UsingSink 先是调用 _resourceFactory() 创建了资源 resource,然后以 resource 为参数调用 _observableFactory() 来创建想要的 Obervable,并通过 Disposables.create(source.subscribe(self),disposable) 让 resource 的生命周期和 Obervable 一致。实际上 UsingSink 只是在 run 中做了两件特殊的事情: 在让 source 订阅自身前,创建 resource(一般会在这里做额外的 *** 作); 使用的 source 不是由上游给的,而是通过 _observableFactory 创建的(一般的 *** 作比如 map、flatMap 等都是由上游给的)。)
File: /www/wwwroot/outofmemory.cn/tmp/route_read.php, Line: 126, InsideLink()
File: /www/wwwroot/outofmemory.cn/tmp/index.inc.php, Line: 165, include(/www/wwwroot/outofmemory.cn/tmp/route_read.php)
File: /www/wwwroot/outofmemory.cn/index.php, Line: 30, include(/www/wwwroot/outofmemory.cn/tmp/index.inc.php)
Error[8]: Undefined offset: 959, File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 121
File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 473, decode(

一、前言 ReactiveX 官方文档 对于 Using 的描述如下:
create a disposable resource that has the same lifespan as the Observable
即创建一个和 Observable 具有相同生命周期的 disposable 资源。

可以看出:当一个 ObserverA 订阅 Using 返回的 Observable 时,Using 会使用调用者传入的 Resource 工厂方法 [resourceFactory] 创建对应的资源,并且使用 Observable 工厂方法 [observableFactory] 创建 ObserverA 实际上想要订阅的 Observable。当 ObserverA 终止时,对应的 Resource 也会被释放 [dispose]。如下所示:
class MyDisposables: Disposable {
    func dispose() {
        print("dispose")
    }
}

......

let _ = Observable
    .using({ () -> MyDisposables in
        return MyDisposables()
    }) { _ in
        return Observable<Int>
            .interval(1, scheduler: MainScheduler.instance)
            .take(5)
    }
    .subscribe(onNext: {
        print()}
    )0
执行结果:
1
2
3
4
let
dispose
可以看到,当 AnonymousObserver[匿名观察者] 订阅 using 返回的 Observable 时,using 内部创建了定期输出 Int 值的 ObservableA,以及资源 MyDisposables。在发送 5 个消息之后,ObservableA 被终止,与此同时,MyDisposables 资源被 using 释放。 二、监听 Obervable RxSwift 官方 Demo 中的一段关于 GitHub 登录的代码:
= signingIn ActivityIndicator ()self
.=signingIn . signingInasObservable()let

= usernameAndPassword Observable .combineLatest(.input,username. input)password( { ,) }= .

signedIn . inputwithLatestFromloginTaps().usernameAndPassword(
    ,flatMapLatest { )usernamein passwordreturn API
        . signup(,:username) password. passwordobserveOn
            (MainScheduler.).instancecatchErrorJustReturn
            (false).trackActivity
            ()}signingIn.
    Observable
    <flatMapLatest { loggedIn -> Boolinlet> =
        ? message "Mock: Signed in to GitHub." loggedIn : "Mock: Sign in to GitHub failed" return .
        promptFor wireframe(,:message"OK" cancelAction, :[ actions] )// propagate original value.
            _
            inmap { } }
                loggedIn
            .
    shareReplay
    (1)......API
signingIn 是当前是否正在登录 Observable,signedIn 是当前登录动作 Observable。signedIn 体现的事件流如下: 按下登录按钮; 使用当前用户名及密码进行登录; 展示登录结果。 Rx 可观察对象的交互如下:

上面的 GitHub 登录涉及到 Rx 的相关 *** 作: combineLatest:合并最后的 username 和 password,形成一个新的 Observable; withLatestFrom:形成一个以 loginTaps 发送事件时间为采样时间点,发送 usernameAndPassword 内容的 Observable。 三、如何实现监听? 上面的 GitHub 登录涉及到记录开始登录的 *** 作如下,那么它是如何监听到当前是否正在登录呢?
.

signup(,:username) password. passwordobserveOn
(MainScheduler.).instancecatchErrorJustReturn
(false).trackActivity
()......signingInpublic

extension

ObservableConvertibleType public func {
    trackActivity ( _:ActivityIndicator activityIndicator) Observable< -> Ereturn.> {
        trackActivityOfObservable activityIndicator(self)}}
    public
class
可以看到 .trackActivity(signingIn) 这个调用,一开始可能会理解为 .trackActivity(signingIn) 是在 signup(username, password: password) 后调用的,也就是说登录事件已经结束,程序才开始监听登录动作,这个理解显然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?最直白的想法应该如下: 设置当前状态为正在执行登录; 执行登录 *** 作; 设置当前状态为没有执行登录。 那么问题来了:signup(username, password: password) 生成登录动作 Observable,当有 Observer 订阅这个 Observable 时,Observable 就会执行登陆 *** 作,并发送对应的结果,这就造成 .trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样,刚好切合上面的那个假设,因此 .trackActivity(signingIn) 应该做到以下几件事情: 保留登录动作 ObservableA,返回自定义的一个 ObservableB; 当外部 Observer 订阅 ObservableB 时,设置当前状态为正在执行登录; 设置当前状态为正在执行登录,然后让外部的 Observer 重新订阅到 ObservableA; 登录 *** 作执行完毕后,设置当前状态为没有执行登录。 signingIn 所属类 ActivityIndicator 的实现如下:
ActivityIndicator : DriverConvertibleType public typealias {
    E = Bool private let

    = NSRecursiveLock _lock ( )privatelet
    = Variable _variable ( 0)privatelet
    : Driver _loading< Boolpublicinit>

    ( )=. {
        _loading asDriver _variable().0
            }map { . > distinctUntilChanged (
            )}fileprivatefunc
    trackActivityOfObservable

    < O :ObservableConvertibleType(_ :>O) sourceObservable <O -> .EreturnObservable.> {
        using (()ActivityToken{ <O -> .Einself.> increment
            ()returnActivityToken(
            : .asObservablesource( source),:self. disposeAction) })decrementin
        return. { t asObservable
            ( t)}}private
        func
    increment

    ( ) .lock( {
        _lock).=.
        _variable+value 1 _variable.value unlock (
        _lock)}privatefunc
    decrement

    ( ) .lock( {
        _lock).=.
        _variable-value 1 _variable.value unlock (
        _lock)}publicfunc
    asDriver

    ( ) Driver<E -> return}}> {
        let _loading
    =
Variable
_variable 对应 Variable 类型,Variable 实际上是 BehaviorSubject 的一层包装,不同的是它只暴露数据,不会被终止或者失败。BehaviorSubject 会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收 BehaviorSubject 随后发送的所有数据。Variable 的示例如下:
( v 0 ).asObservable(
v).subscribe(
    :print(onNext) {
        }).=
    1.

v=value 2 0
v1value 2 =
执行结果:
.
asDriver
(
继续查看 _loading 在 ActivityIndicator 的初始化方法中的赋值如下:
_loading ) _variable.0}.
	distinctUntilChangedmap { ( > ) fileprivate
	functrackActivityOfObservable<O
其中 _variable 的初始值为 0,因此这部分的逻辑很容易理解:_loading 通过 _variable 发送的值是否大于 0 来判断当前是否在执行动作,并且通过 increment、decrement 方法来设置 _variable 发送的值(改变当前正在执行的动作数)。重点还是在 trackActivityOfObservable 方法:
: ObservableConvertibleType (_:O )>Observable< sourceO .E -> returnObservable.using(> {
    ( )ActivityToken<O{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeAction) inreturndecrement.
    asObservable( { t )
        } t}()ActivityToken
    <
O
对应的 resourceFactory 如下:
{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeActionin return.decrementasObservable
    (
observableFactory 如下:
{ t )
        } tprivatestructActivityToken<
    E
ActivityToken 的实现如下:
: ObservableConvertibleType ,Disposableprivate> let :Observable < {
    E private _sourcelet :Cancelableinit>
    ( : _disposeObservable <

    E,source: @escaping()>( disposeAction) ) == -> Disposables.create {
        _source ( source
        _dispose : )}funcdisposewith( disposeAction)
    .

    dispose ()} {
        _disposefuncasObservable()
    Observable

    < Ereturn} -> }publicstatic> {
        func _source
    using
<
可以看到,ActivityToken 就是一个保存当前需要监听的 Observable 的资源,当外部 Observer 订阅 trackActivityOfObservable 返回的 ObservableB 时,using 调用 resourceFactory 做了以下 *** 作: 增加当前正在执行的动作数; 使用 ActivityToken 保存需要监听的 ObservableA,并且在 ActivityToken 释放时,恢复当前正在执行的动作数。 接下来在调用 observableFactory 时,using 把在 resourceFactory 中保存的 ObservableA 重新暴露给 Observer。通过这种方式,就能在 ObservableA 发送数据之前,执行额外的 *** 作 self.increment(),也就是上面 .trackActivity(signingIn) 应该做到的 A2,再由于 using 会在 observableFactory 返回的 ObservableA 终止时释放 resourceFactory 创建的资源,所以当 ObservableA 终止时,会执行 self.decrement,也就是 A4。监听当前是否正在登录就是通过 using *** 作 hold 主需要监听的 Observable,然后在执行想要的额外动作后,重新暴露 Observable 给外部的 Observer。 四、using 内部实现 using 的内部实现:
R : Disposable (_:@escaping (>)throws resourceFactoryR , :@escaping ( -> R) observableFactorythrows Observable <E) Observable -> <Ereturn>Using -> (:,> {
        : )}resourceFactoryclass resourceFactoryUsing observableFactory< observableFactorySourceType
    ,
using 实际上返回的是一个 Using 类,Using 为 Producer 的子类,并且重载 run 方法,如下所示:
ResourceType :Disposable:Producer <SourceType typealias>E =SourceTypetypealias> {
    
    ResourceFactory = ( )
    
    throws ResourceType typealias ObservableFactory= ( -> ResourceType
    ) throws Observable <SourceTypefileprivate let -> :ResourceFactoryfileprivate>
    
    let : _resourceFactoryObservableFactory init
    ( : _observableFactory@escaping ResourceFactory
    
    
    ,:resourceFactory@escaping ObservableFactory )= observableFactory= } overridefunc {
        _resourceFactory run resourceFactory
        _observableFactory < observableFactory
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == E let=UsingSink ( : {
        self sink , :)parent. =. observerrun observer(
        sink)disposable return sink}}classProducer
        < sink
    Element
:
Producer 的实现如下:
Observable <Elementoverride> init ()super> {
    . init() {
        }overridefuncsubscribe<
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == Element if!CurrentThreadScheduler . return {
        run ()}isScheduleRequired {
            else returnCurrentThreadSchedulerobserver.
        .
        schedule {
            ( ()instance)_inreturnself. { run (
                ) }}}funcobserverrun
            <
        O
    :
    
    ObserverType (_: O )>Disposablewhere observerO .E -> == Element abstractMethod() } } {
        classUsingSink<
    SourceType
,
Producer 调用 subscribe 时,会调用子类的 run,并传入当前的 Oberver。回到 Using 的实现,Producer 的 run 方法中创建了 UsingSink 实例,并调用它的 run 方法。那么,来看下最关键的 UsingSink:
ResourceType :Disposable,O :ObserverType :Sink <O ,> ObserverType whereO.>E == SourceType typealiasParent= Using < {

    SourceType , ResourceType typealiasE=O .>
    E private let :Parentinit

    ( : _parentParent ,
    
    :Oparent) =super observer. init( {
        _parent : parent
        )}funcrunobserver( observer)
    Disposable
    
    var =Disposables. -> create {
        ( disposable ) dolet=try.
        
        _resourceFactory {
            ( resource ) = _parentlet=try.
            disposable _observableFactory resource
            ( source ) return _parentDisposables.createresource(
            
            . subscribe(self)
                source,)}catchletreturn
                disposable
            Disposables
        . create ( error {
            Observable .error()
                .subscribe(selferror),)}}funcon
                disposable
            (
        _
    :
    
    Event <E) eventswitch caselet.>next {
        ( event {
        ) : forwardOn(.valuenext(
            ))caselet.valueerror(
        ) : forwardOn(.errorerror(
            ))dispose()errorcase.
            :forwardOn(
        . )completeddispose
            ()}completed}
            }
        
    
[+++]
可以看到,在 run 方法中,UsingSink 先是调用 _resourceFactory() 创建了资源 resource,然后以 resource 为参数调用 _observableFactory() 来创建想要的 Obervable,并通过 Disposables.create(source.subscribe(self),disposable) 让 resource 的生命周期和 Obervable 一致。实际上 UsingSink 只是在 run 中做了两件特殊的事情: 在让 source 订阅自身前,创建 resource(一般会在这里做额外的 *** 作); 使用的 source 不是由上游给的,而是通过 _observableFactory 创建的(一般的 *** 作比如 map、flatMap 等都是由上游给的)。)
File: /www/wwwroot/outofmemory.cn/tmp/route_read.php, Line: 126, InsideLink()
File: /www/wwwroot/outofmemory.cn/tmp/index.inc.php, Line: 165, include(/www/wwwroot/outofmemory.cn/tmp/route_read.php)
File: /www/wwwroot/outofmemory.cn/index.php, Line: 30, include(/www/wwwroot/outofmemory.cn/tmp/index.inc.php)
RxSwift之深入解析Using *** 作的应用和原理_app_内存溢出

RxSwift之深入解析Using *** 作的应用和原理

RxSwift之深入解析Using *** 作的应用和原理,第1张

一、前言 ReactiveX 官方文档 对于 Using 的描述如下:
create a disposable resource that has the same lifespan as the Observable
即创建一个和 Observable 具有相同生命周期的 disposable 资源。

可以看出:当一个 ObserverA 订阅 Using 返回的 Observable 时,Using 会使用调用者传入的 Resource 工厂方法 [resourceFactory] 创建对应的资源,并且使用 Observable 工厂方法 [observableFactory] 创建 ObserverA 实际上想要订阅的 Observable。当 ObserverA 终止时,对应的 Resource 也会被释放 [dispose]。如下所示:
class MyDisposables: Disposable {
    func dispose() {
        print("dispose")
    }
}

......

let _ = Observable
    .using({ () -> MyDisposables in
        return MyDisposables()
    }) { _ in
        return Observable<Int>
            .interval(1, scheduler: MainScheduler.instance)
            .take(5)
    }
    .subscribe(onNext: {
        print()}
    )0
执行结果:
1
2
3
4
let
dispose
可以看到,当 AnonymousObserver[匿名观察者] 订阅 using 返回的 Observable 时,using 内部创建了定期输出 Int 值的 ObservableA,以及资源 MyDisposables。在发送 5 个消息之后,ObservableA 被终止,与此同时,MyDisposables 资源被 using 释放。 二、监听 Obervable RxSwift 官方 Demo 中的一段关于 GitHub 登录的代码:
= signingIn ActivityIndicator ()self
.=signingIn . signingInasObservable()let

= usernameAndPassword Observable .combineLatest(.input,username. input)password( { ,) }= .

signedIn . inputwithLatestFromloginTaps().usernameAndPassword(
    ,flatMapLatest { )usernamein passwordreturn API
        . signup(,:username) password. passwordobserveOn
            (MainScheduler.).instancecatchErrorJustReturn
            (false).trackActivity
            ()}signingIn.
    Observable
    <flatMapLatest { loggedIn -> Boolinlet> =
        ? message "Mock: Signed in to GitHub." loggedIn : "Mock: Sign in to GitHub failed" return .
        promptFor wireframe(,:message"OK" cancelAction, :[ actions] )// propagate original value.
            _
            inmap { } }
                loggedIn
            .
    shareReplay
    (1)......API
signingIn 是当前是否正在登录 Observable,signedIn 是当前登录动作 Observable。signedIn 体现的事件流如下: 按下登录按钮; 使用当前用户名及密码进行登录; 展示登录结果。 Rx 可观察对象的交互如下:

上面的 GitHub 登录涉及到 Rx 的相关 *** 作: combineLatest:合并最后的 username 和 password,形成一个新的 Observable; withLatestFrom:形成一个以 loginTaps 发送事件时间为采样时间点,发送 usernameAndPassword 内容的 Observable。 三、如何实现监听? 上面的 GitHub 登录涉及到记录开始登录的 *** 作如下,那么它是如何监听到当前是否正在登录呢?
.

signup(,:username) password. passwordobserveOn
(MainScheduler.).instancecatchErrorJustReturn
(false).trackActivity
()......signingInpublic

extension

ObservableConvertibleType public func {
    trackActivity ( _:ActivityIndicator activityIndicator) Observable< -> Ereturn.> {
        trackActivityOfObservable activityIndicator(self)}}
    public
class
可以看到 .trackActivity(signingIn) 这个调用,一开始可能会理解为 .trackActivity(signingIn) 是在 signup(username, password: password) 后调用的,也就是说登录事件已经结束,程序才开始监听登录动作,这个理解显然是错误的。那么,要想获得正确的结果,事件流应该是一个怎么样的执行顺序呢?最直白的想法应该如下: 设置当前状态为正在执行登录; 执行登录 *** 作; 设置当前状态为没有执行登录。 那么问题来了:signup(username, password: password) 生成登录动作 Observable,当有 Observer 订阅这个 Observable 时,Observable 就会执行登陆 *** 作,并发送对应的结果,这就造成 .trackActivity(signingIn) 不能直接返回上游传递过来的事件流,因为这样,刚好切合上面的那个假设,因此 .trackActivity(signingIn) 应该做到以下几件事情: 保留登录动作 ObservableA,返回自定义的一个 ObservableB; 当外部 Observer 订阅 ObservableB 时,设置当前状态为正在执行登录; 设置当前状态为正在执行登录,然后让外部的 Observer 重新订阅到 ObservableA; 登录 *** 作执行完毕后,设置当前状态为没有执行登录。 signingIn 所属类 ActivityIndicator 的实现如下:
ActivityIndicator : DriverConvertibleType public typealias {
    E = Bool private let

    = NSRecursiveLock _lock ( )privatelet
    = Variable _variable ( 0)privatelet
    : Driver _loading< Boolpublicinit>

    ( )=. {
        _loading asDriver _variable().0
            }map { . > distinctUntilChanged (
            )}fileprivatefunc
    trackActivityOfObservable

    < O :ObservableConvertibleType(_ :>O) sourceObservable <O -> .EreturnObservable.> {
        using (()ActivityToken{ <O -> .Einself.> increment
            ()returnActivityToken(
            : .asObservablesource( source),:self. disposeAction) })decrementin
        return. { t asObservable
            ( t)}}private
        func
    increment

    ( ) .lock( {
        _lock).=.
        _variable+value 1 _variable.value unlock (
        _lock)}privatefunc
    decrement

    ( ) .lock( {
        _lock).=.
        _variable-value 1 _variable.value unlock (
        _lock)}publicfunc
    asDriver

    ( ) Driver<E -> return}}> {
        let _loading
    =
Variable
_variable 对应 Variable 类型,Variable 实际上是 BehaviorSubject 的一层包装,不同的是它只暴露数据,不会被终止或者失败。BehaviorSubject 会在订阅者订阅时,发送一个最近或初始数据,并且订阅者可以接收 BehaviorSubject 随后发送的所有数据。Variable 的示例如下:
( v 0 ).asObservable(
v).subscribe(
    :print(onNext) {
        }).=
    1.

v=value 2 0
v1value 2 =
执行结果:
.
asDriver
(
继续查看 _loading 在 ActivityIndicator 的初始化方法中的赋值如下:
_loading ) _variable.0}.
	distinctUntilChangedmap { ( > ) fileprivate
	functrackActivityOfObservable<O
其中 _variable 的初始值为 0,因此这部分的逻辑很容易理解:_loading 通过 _variable 发送的值是否大于 0 来判断当前是否在执行动作,并且通过 increment、decrement 方法来设置 _variable 发送的值(改变当前正在执行的动作数)。重点还是在 trackActivityOfObservable 方法:
: ObservableConvertibleType (_:O )>Observable< sourceO .E -> returnObservable.using(> {
    ( )ActivityToken<O{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeAction) inreturndecrement.
    asObservable( { t )
        } t}()ActivityToken
    <
O
对应的 resourceFactory 如下:
{ .E -> inself.increment(> )
        returnActivityToken(:.
        asObservable ()source, source:self.)} disposeActionin return.decrementasObservable
    (
observableFactory 如下:
{ t )
        } tprivatestructActivityToken<
    E
ActivityToken 的实现如下:
: ObservableConvertibleType ,Disposableprivate> let :Observable < {
    E private _sourcelet :Cancelableinit>
    ( : _disposeObservable <

    E,source: @escaping()>( disposeAction) ) == -> Disposables.create {
        _source ( source
        _dispose : )}funcdisposewith( disposeAction)
    .

    dispose ()} {
        _disposefuncasObservable()
    Observable

    < Ereturn} -> }publicstatic> {
        func _source
    using
<
可以看到,ActivityToken 就是一个保存当前需要监听的 Observable 的资源,当外部 Observer 订阅 trackActivityOfObservable 返回的 ObservableB 时,using 调用 resourceFactory 做了以下 *** 作: 增加当前正在执行的动作数; 使用 ActivityToken 保存需要监听的 ObservableA,并且在 ActivityToken 释放时,恢复当前正在执行的动作数。 接下来在调用 observableFactory 时,using 把在 resourceFactory 中保存的 ObservableA 重新暴露给 Observer。通过这种方式,就能在 ObservableA 发送数据之前,执行额外的 *** 作 self.increment(),也就是上面 .trackActivity(signingIn) 应该做到的 A2,再由于 using 会在 observableFactory 返回的 ObservableA 终止时释放 resourceFactory 创建的资源,所以当 ObservableA 终止时,会执行 self.decrement,也就是 A4。监听当前是否正在登录就是通过 using *** 作 hold 主需要监听的 Observable,然后在执行想要的额外动作后,重新暴露 Observable 给外部的 Observer。 四、using 内部实现 using 的内部实现:
R : Disposable (_:@escaping (>)throws resourceFactoryR , :@escaping ( -> R) observableFactorythrows Observable <E) Observable -> <Ereturn>Using -> (:,> {
        : )}resourceFactoryclass resourceFactoryUsing observableFactory< observableFactorySourceType
    ,
using 实际上返回的是一个 Using 类,Using 为 Producer 的子类,并且重载 run 方法,如下所示:
ResourceType :Disposable:Producer <SourceType typealias>E =SourceTypetypealias> {
    
    ResourceFactory = ( )
    
    throws ResourceType typealias ObservableFactory= ( -> ResourceType
    ) throws Observable <SourceTypefileprivate let -> :ResourceFactoryfileprivate>
    
    let : _resourceFactoryObservableFactory init
    ( : _observableFactory@escaping ResourceFactory
    
    
    ,:resourceFactory@escaping ObservableFactory )= observableFactory= } overridefunc {
        _resourceFactory run resourceFactory
        _observableFactory < observableFactory
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == E let=UsingSink ( : {
        self sink , :)parent. =. observerrun observer(
        sink)disposable return sink}}classProducer
        < sink
    Element
:
Producer 的实现如下:
Observable <Elementoverride> init ()super> {
    . init() {
        }overridefuncsubscribe<
    O
    
    : ObserverType (_: O )>Disposablewhere observerO .E -> == Element if!CurrentThreadScheduler . return {
        run ()}isScheduleRequired {
            else returnCurrentThreadSchedulerobserver.
        .
        schedule {
            ( ()instance)_inreturnself. { run (
                ) }}}funcobserverrun
            <
        O
    :
    
    ObserverType (_: O )>Disposablewhere observerO .E -> == Element abstractMethod() } } {
        classUsingSink<
    SourceType
,
Producer 调用 subscribe 时,会调用子类的 run,并传入当前的 Oberver。回到 Using 的实现,Producer 的 run 方法中创建了 UsingSink 实例,并调用它的 run 方法。那么,来看下最关键的 UsingSink:
ResourceType :Disposable,O :ObserverType :Sink <O ,> ObserverType whereO.>E == SourceType typealiasParent= Using < {

    SourceType , ResourceType typealiasE=O .>
    E private let :Parentinit

    ( : _parentParent ,
    
    :Oparent) =super observer. init( {
        _parent : parent
        )}funcrunobserver( observer)
    Disposable
    
    var =Disposables. -> create {
        ( disposable ) dolet=try.
        
        _resourceFactory {
            ( resource ) = _parentlet=try.
            disposable _observableFactory resource
            ( source ) return _parentDisposables.createresource(
            
            . subscribe(self)
                source,)}catchletreturn
                disposable
            Disposables
        . create ( error {
            Observable .error()
                .subscribe(selferror),)}}funcon
                disposable
            (
        _
    :
    
    Event <E) eventswitch caselet.>next {
        ( event {
        ) : forwardOn(.valuenext(
            ))caselet.valueerror(
        ) : forwardOn(.errorerror(
            ))dispose()errorcase.
            :forwardOn(
        . )completeddispose
            ()}completed}
            }
        
    

可以看到,在 run 方法中,UsingSink 先是调用 _resourceFactory() 创建了资源 resource,然后以 resource 为参数调用 _observableFactory() 来创建想要的 Obervable,并通过 Disposables.create(source.subscribe(self),disposable) 让 resource 的生命周期和 Obervable 一致。实际上 UsingSink 只是在 run 中做了两件特殊的事情: 在让 source 订阅自身前,创建 resource(一般会在这里做额外的 *** 作); 使用的 source 不是由上游给的,而是通过 _observableFactory 创建的(一般的 *** 作比如 map、flatMap 等都是由上游给的)。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/993628.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存