RxSwift 笔记

响应式编程 && 函数式编程

什么是响应式编程?

响应式编程,响应式编程是一种面向数据流和变化传播的编程方式式,简单理解就是异步的数据流的开发。

什么是函数式编程?

特点是将函数作为一等公民,当作参数和返回值使用。典型的如OC和Swift 中的 map函数、filter函数、reduce函数等。每个函数的处理结果给到下一个函数,最后的结果由自身函数调出。

框架图

Observable 被订阅者

定义

  • 可观察序列,可以异步产生一系列可以携带数据的Event事件

Event

  • next:携带数据 的事件
  • error:异常终止,不在发出event事件
  • completed:正常终止,不在发出event事件
public enum Event<Element> {
    /// Next element is produced.
    case next(Element)
 
    /// Sequence terminated with an error.
    case error(Swift.Error)
 
    /// Sequence completed successfully.
    case completed
}

Just()

let observable = Observable<Int>.just(5)

of()

let observable = Observable<String>.of("A", "B", "C")

from()

let observable = Observable.from(["A", "B", "C"])

empty()

let observable = Observable<Int>.empty()

never()

//永远不会发出 Event(也不会终止)的 Observable 序列
let observable = Observable<Int>.never()

error()

enum MyError: Error {
    case A
    case B
}
let observable = Observable<Int>.error(MyError.A)

range()

//以这个范围内所有值作为初始值的Observable序列
let observable = Observable.range(start: 1, count: 5)

repeatElement()

//无限发出给定元素的 Event的 Observable 序列(永不终止)
let observable = Observable.repeatElement(1)

generate()

//只有当提供的所有的判断条件都为 true 的时候,才会给出动作的  Observable 序列
let observable = Observable.generate(
    initialState: 0,
    condition: { $0 <= 10 },
    iterate: { $0 + 2 }
)

create()

//受一个 block 形式的参数,任务是对每一个过来的订阅进行处理
//这个block有一个回调参数observer就是订阅这个Observable对象的订阅者
//当一个订阅者订阅这个Observable对象的时候,就会将订阅者作为参数传入这个block来执行一些内容
let observable = Observable<String>.create{observer in
    //对订阅者发出了.next事件,且携带了一个数据"hangge.com"
    observer.onNext("hangge.com")
    //对订阅者发出了.completed事件
    observer.onCompleted()
    //因为一个订阅行为会有一个Disposable类型的返回值,所以在结尾一定要returen一个Disposable
    return Disposables.create()
}
 
//订阅测试
observable.subscribe {
    print($0)
}

deferred()

//用于标记是奇数、还是偶数
var isOdd = true
 
//使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
let factory : Observable<Int> = Observable.deferred {
     
    //让每次执行这个block时候都会让奇、偶数进行交替
    isOdd = !isOdd
     
    //根据isOdd参数,决定创建并返回的是奇数Observable、还是偶数Observable
    if isOdd {
        return Observable.of(1, 3, 5 ,7)
    }else {
        return Observable.of(2, 4, 6, 8)
    }
}
 
//第1次订阅测试
factory.subscribe { event in
    print("\(isOdd)", event)
}
 
//第2次订阅测试
factory.subscribe { event in
    print("\(isOdd)", event)
}

interval()

// 序列每隔一段设定的时间,会发出一个索引数的元素。而且它会一直发送下去
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}

timer()

//5秒种后发出唯一的一个元素0
let observable = Observable<Int>.timer(5, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}
//延时5秒种后,每隔1秒钟发出一个元素
let observable = Observable<Int>.timer(5, period: 1, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}

Observer 订阅者

AnyObserver

//观察者 AnyObserver 可以用来描叙任意一种观察者。
let observer: AnyObserver<String> = AnyObserver { (event) in
    switch event {
    case .next(let data):
        print(data)
    case .error(let error):
        print(error)
    case .completed:
        print("completed")
    }
}
 
let observable = Observable.of("A", "B", "C")
observable.subscribe(observer)
//observable.map{"当前索引数:\($0 )"}.bind(to: observer).disposed(by: disposeBag)

Binder

相较于AnyObserver的大而全,Binder更专注于特定的场景。Binder主要有以下两个特征:

  • 不会处理错误事件
  • 确保绑定都是在给定Scheduler上执行(默认MainScheduler)
class NewHouseWeekViewModel: BaseViewModel {
    
    @IBOutlet weak var label: UILabel!
    func test() {
        let observer:Binder<String> = Binder(label) { (view,text) in
            //收到发出的索引数后显示到label上
            view.text = text
        }
        let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
        observable.map { "当前索引数:\($0 )"}.bind(to: observer).disposed(by: TWSwiftDisposeBag)
        //使用扩展方式,结合map?配合 bindTo
        observable.map {CGFloat($0)}.bind(to:label.rx.fontSize).disposed(by: TWSwiftDisposeBag)
    }
    
}

//使用扩展方式自定义一个属性,一个观察Binder
extension Reactive where Base: UILabel {
    public var fontSize: Binder<CGFloat> {
        return Binder(self.base) { label, fontSize in
            label.font = UIFont.systemFont(ofSize: fontSize)
        }
    }
}

Subjects 即是订阅又是被订阅者

PublishSubject

  • PublishSubject是最普通的 Subject,它不需要初始值就能创建。
  • PublishSubject 的订阅者从他们开始订阅的时间点起,可以收到订阅后 Subject 发出的新 Event,而不会收到他们在订阅前已发出的 Event。

    总结: 当执行subject.onCompleted() 或者 subject.onError(NSError())

    那么所有的订阅者(即使是后面订阅者)都能收到subject的.completed或者Error事件

let disposeBag = DisposeBag()
 
//创建一个PublishSubject
let subject = PublishSubject<String>()
 
//由于当前没有任何订阅者,所以这条信息不会输出到控制台
subject.onNext("111")
 
//第1次订阅subject
subject.subscribe(onNext: { string in
    print("第1次订阅:", string)
}, onCompleted:{
    print("第1次订阅:onCompleted")
}).disposed(by: disposeBag)
 
//当前有1个订阅,则该信息会输出到控制台
subject.onNext("222")
 
//第2次订阅subject
subject.subscribe(onNext: { string in
    print("第2次订阅:", string)
}, onCompleted:{
    print("第2次订阅:onCompleted")
}).disposed(by: disposeBag)
 
//当前有2个订阅,则该信息会输出到控制台
subject.onNext("333")
 
//让subject结束
subject.onCompleted()
 
//subject完成后会发出.next事件了。
subject.onNext("444")
 
//subject完成后它的所有订阅(包括结束后的订阅),都能收到subject的.completed事件,
subject.subscribe(onNext: { string in
    print("第3次订阅:", string)
}, onCompleted:{
    print("第3次订阅:onCompleted")
}).disposed(by: disposeBag)

第1次订阅: 222
第1次订阅: 333
第2次订阅: 333
第1次订阅:onCompleted
第2次订阅:onCompleted
第3次订阅:onCompleted

BehaviorSubject

  • BehaviorSubject 需要通过一个默认初始值来创建。
  • 当一个订阅者来订阅它的时候,这个订阅者会立即收到 BehaviorSubjects 上一个发出的event。之后就跟正常的情况一样,它也会接收到 BehaviorSubject 之后发出的新的 event。

    总结:就是订阅一次会发出上一次的event事件(即是调用 onNext 事件)

let disposeBag = DisposeBag()
 
//创建一个BehaviorSubject
let subject = BehaviorSubject(value: "111")
 
//第1次订阅subject
subject.subscribe { event in
    print("第1次订阅:", event)
}.disposed(by: disposeBag)
 
//发送next事件
subject.onNext("222")
 
//发送error事件
subject.onError(NSError(domain: "local", code: 0, userInfo: nil))
 
//第2次订阅subject
subject.subscribe { event in
    print("第2次订阅:", event)
}.disposed(by: disposeBag)

第1次订阅: next(111)
第1次订阅: next(222)
第1次订阅: error(Error Domain=local Code=0 "(null)")
第2次订阅: error(Error Domain=local Code=0 "(null)")

ReplaySubject

  • ReplaySubject 在创建时候需要设置一个 bufferSize,表示它对于它发送过的 event 的缓存个数。
  • 比如一个 ReplaySubject 的 bufferSize 设置为 2,它发出了 3 个 .next 的 event,那么它会将后两个(最近的两个)event 给缓存起来。此时如果有一个 subscriber 订阅了这个 ReplaySubject,那么这个 subscriber 就会立即收到前面缓存的两个.next 的 event。
  • 如果一个 subscriber 订阅已经结束的 ReplaySubject,除了会收到缓存的 .next 的 event外,还会收到那个终结的 .error 或者 .complete 的event。

    总结:可以设置缓存数,订阅一次会将缓存起来的发送onNext事件

let disposeBag = DisposeBag()
 
//创建一个bufferSize为2的ReplaySubject
let subject = ReplaySubject<String>.create(bufferSize: 2)
 
//连续发送3个next事件
subject.onNext("111")
subject.onNext("222")
subject.onNext("333")
 
//第1次订阅subject
subject.subscribe { event in
    print("第1次订阅:", event)
}.disposed(by: disposeBag)
 
//再发送1个next事件
subject.onNext("444")
 
//第2次订阅subject
subject.subscribe { event in
    print("第2次订阅:", event)
}.disposed(by: disposeBag)
 
//让subject结束
subject.onCompleted()
 
//第3次订阅subject
subject.subscribe { event in
    print("第3次订阅:", event)
}.disposed(by: disposeBag)

第1次订阅: next(222)
第1次订阅: next(333)
第1次订阅: next(444)
第2次订阅: next(333)
第2次订阅: next(444)
第1次订阅: completed
第2次订阅: completed
第3次订阅: next(333)
第3次订阅: next(444)
第3次订阅: completed

Variable

  • Variable 其实就是对 BehaviorSubject 的封装,所以它也必须要通过一个默认的初始值进行创建。

  • Variable 具有 BehaviorSubject 的功能,能够向它的订阅者发出上一个 event 以及之后新创建的 event。

  • 不同的是,Variable 还会把当前发出的值保存为自己的状态。同时它会在销毁时自动发送 .complete的 event,不需要也不能手动给 Variables 发送 completed或者 error 事件来结束它。

  • 简单地说就是 Variable 有一个 value 属性,我们改变这个 value 属性的值就相当于调用一般 Subjects 的 onNext() 方法,而这个最新的 onNext() 的值就被保存在 value 属性里了,直到我们再次修改它。

let disposeBag = DisposeBag()
//创建一个初始值为111的Variable
 let variable = Variable("111")
//修改value值
variable.value = "222"
//第1次订阅
variable.asObservable().subscribe {
    print("第1次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
variable.value = "333"
 
//第2次订阅
variable.asObservable().subscribe {
    print("第2次订阅:", $0)
}.disposed(by: disposeBag)
//修改value值
variable.value = "444"

第1次订阅: next(222)
第1次订阅: next(333)
第2次订阅: next(333)
第1次订阅: next(444)
第2次订阅: next(444)
第1次订阅: completed
第2次订阅: completed

操作符

变换操作符

buffer
  • buffer 方法作用是缓冲组合,第一个参数是缓冲时间,第二个参数是缓冲个数,第三个参数是线程。
  • 该方法简单来说就是缓存 Observable 中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来。
let subject = PublishSubject<String>()
 
//每缓存3个元素则组合起来一起发出。
//如果1秒钟内不够3个也会发出(有几个发几个,一个都没有发空数组 [])
subject
    .buffer(timeSpan: 1, count: 3, scheduler: MainScheduler.instance)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject.onNext("a")
subject.onNext("b")
subject.onNext("c")
 
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
window
  • window 操作符和 buffer 十分相似。不过 buffer 是周期性的将缓存的元素集合发送出来,而 window 周期性的将元素集合以 Observable 的形态发送出来。
  • 同时 buffer要等到元素搜集完毕后,才会发出元素序列。而 window 可以实时发出元素序列。
let subject = PublishSubject<String>()
 
//每3个元素作为一个子Observable发出。
subject
    .window(timeSpan: 1, count: 3, scheduler: MainScheduler.instance)
    .subscribe(onNext: { [weak self]  in
        print("subscribe: \($0)")
        $0.asObservable()
            .subscribe(onNext: { print($0) })
            .disposed(by: self!.disposeBag)
    })
    .disposed(by: disposeBag)
 
subject.onNext("a")
subject.onNext("b")
subject.onNext("c")
 
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
map
  • 该操作符通过传入一个函数闭包把原来的 Observable 序列转变为一个新的 Observable 序列。
let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3)
    .map { $0 * 10}
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
flatMap
  • map 在做转换的时候容易出现“升维”的情况。即转变之后,从一个序列变成了一个序列的序列。
    而 flatMap 操作符会对源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 - 然后将这些 Observables 的元素合并之后再发送出来。即又将其 “拍扁”(降维)成一个 Observable 序列。
  • 这个操作符是非常有用的。比如当 Observable 的元素本生拥有其他的 Observable 时,我们可以将所有子 Observables 的元素发送出来。
let disposeBag = DisposeBag()
 
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
 
let variable = Variable(subject1)
 
variable.asObservable()
    .flatMap { $0 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject1.onNext("B")
variable.value = subject2
subject2.onNext("2")
subject1.onNext("C")  //会输出,flatMap会对每个Observable应用一个转换方法

//A B 1 2 C
flatMapLatest
  • flatMapLatest与flatMap 的唯一区别是:flatMapLatest只会接收最新的value 事件。
let disposeBag = DisposeBag()
 
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
 
let variable = Variable(subject1)
 
variable.asObservable()
    .flatMapLatest { $0 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject1.onNext("B")
variable.value = subject2
subject2.onNext("2")
subject1.onNext("C")  //不会输出,只保留最新的value

//A B 1 2 
concatMap
  • concatMap 与 flatMap 的唯一区别是:当前一个 Observable 元素发送完毕后,后一个Observable 才可以开始发出元素。或者说等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。
let disposeBag = DisposeBag()
 
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
 
let variable = Variable(subject1)
 
variable.asObservable()
    .concatMap { $0 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject1.onNext("B")
variable.value = subject2
subject2.onNext("2")
subject1.onNext("C")
subject1.onCompleted() //只有前一个序列结束后,才能接收下一个序列

/// A B C 2
scan
  • scan 就是先给一个初始化的数,然后不断的拿前一个结果和最新的值进行处理操作。
let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4, 5)
    .scan(0) { acum, elem in
        acum + elem
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    
    /// 1 3 6 10 15
groupBy
  • groupBy 操作符将源 Observable 分解为多个子 Observable,然后将这些子 Observable 发送出来。
  • 也就是说该操作符会将元素通过某个键进行分组,然后将分组后的元素序列以 Observable 的形态发送出来。
let disposeBag = DisposeBag()
 
//将奇数偶数分成两组
Observable<Int>.of(0, 1, 2, 3, 4, 5)
    .groupBy(keySelector: { (element) -> String in
        return element % 2 == 0 ? "偶数" : "基数"
    })
    .subscribe { (event) in
        switch event {
        case .next(let group):
            group.asObservable().subscribe({ (event) in
                print("key:\(group.key)    event:\(event)")
            })
            .disposed(by: disposeBag)
        default:
            print("")
        }
    }
.disposed(by: disposeBag)

/// 

过滤操作符

filter
  • 该操作符就是用来过滤掉某些不符合要求的事件。
let disposeBag = DisposeBag()
 
Observable.of(2, 30, 22, 5, 60, 3, 40 ,9)
    .filter {
        $0 > 10
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
distinctUntilChanged
  • 该操作符用于过滤掉连续重复的事件。
let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 1, 1, 4)
    .distinctUntilChanged()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    /// 1 2 3 1 4
single
  • 限制只发送一次事件,或者满足条件的第一个事件。
  • 如果存在有多个事件或者没有事件都会发出一个 error 事件。
  • 如果只有一个事件,则不会发出 error事件。
let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .single{ $0 == 2 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
Observable.of("A", "B", "C", "D")
    .single()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    /// 2  A
elementAt
  • 该方法实现只处理在指定位置的事件。

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .elementAt(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    /// 2
  
ignoreElements
  • 该操作符可以忽略掉所有的元素,只发出 error或completed 事件。
  • 如果我们并不关心 Observable 的任何元素,只想知道 Observable 在什么时候终止,那就可以使用 ignoreElements 操作符。
let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .ignoreElements()
    .subscribe{
        print($0)
    }
    .disposed(by: disposeBag)
take
  • 该方法实现仅发送 Observable 序列中的前 n 个事件,在满足数量之后会自动 .completed。
let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .take(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    /// 1  2
takeLast
  • 该方法实现仅发送 Observable序列中的后 n 个事件。
let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .takeLast(1)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    /// 4
skip
  • 该方法用于跳过源 Observable 序列发出的前 n 个事件。
let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4)
    .skip(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    /// 3 4
Sample
  • Sample 除了订阅源Observable 外,还可以监视另外一个 Observable, 即 notifier 。
  • 每当收到 notifier 事件,就会从源序列取一个最新的事件并发送。而如果两次 notifier 事件之间没有源序列的事件,则不发送值。
let disposeBag = DisposeBag()
 
let source = PublishSubject<Int>()
let notifier = PublishSubject<String>()
 
source
    .sample(notifier)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
source.onNext(1)
 
//让源序列接收接收消息
notifier.onNext("A")
 
source.onNext(2)
 
//让源序列接收接收消息
notifier.onNext("B")
notifier.onNext("C")
 
source.onNext(3)
source.onNext(4)
 
//让源序列接收接收消息
notifier.onNext("D")
 
source.onNext(5)
 
//让源序列接收接收消息
notifier.onCompleted()
///1 2 4 5
debounce
  • debounce 操作符可以用来过滤掉高频产生的元素,它只会发出这种元素:该元素产生后,一段时间内没有新元素产生。
  • 换句话说就是,队列中的元素如果和下一个元素的间隔小于了指定的时间间隔,那么这个元素将被过滤掉。
  • debounce 常用在用户输入的时候,不需要每个字母敲进去都发送一个事件,而是稍等一下取最后一个事件。
//定义好每个事件里的值以及发送的时间
        let times = [
            [ "value": 1, "time": 0.1 ],
            [ "value": 2, "time": 1.1 ],
            [ "value": 3, "time": 1.2 ],
            [ "value": 4, "time": 1.2 ],
            [ "value": 5, "time": 1.4 ],
            [ "value": 6, "time": 2.1 ]
        ]
         
        //生成对应的 Observable 序列并订阅
        Observable.from(times)
            .flatMap { item in
                return Observable.of(Int(item["value"]!))
                    .delaySubscription(Double(item["time"]!),
                                       scheduler: MainScheduler.instance)
            }
            .debounce(0.5, scheduler: MainScheduler.instance) //只发出与下一个间隔超过0.5秒的元素
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
            // 1 5 6

条件和布尔操作符

  • 当传入多个 Observables 到 amb 操作符时,它将取第一个发出元素或产生事件的 Observable,然后只发出它的元素。并忽略掉其他的 Observables。
let disposeBag = DisposeBag()
 
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
let subject3 = PublishSubject<Int>()
 
subject1
    .amb(subject2)
    .amb(subject3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject2.onNext(1)
subject1.onNext(20)
subject2.onNext(2)
subject1.onNext(40)
subject3.onNext(0)
subject2.onNext(3)
subject1.onNext(60)
subject3.onNext(0)
subject3.onNext(0)
//1 2 3
amb
  • 该方法依次判断 Observable 序列的每一个值是否满足给定的条件。 当第一个不满足条件的值出现时,它便自动完成。
let disposeBag = DisposeBag()
 
Observable.of(2, 3, 4, 5, 6)
    .takeWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
takeWhile
  • 该方法依次判断 Observable 序列的每一个值是否满足给定的条件。 当第一个不满足条件的值出现时,它便自动完成。
let disposeBag = DisposeBag()
 
Observable.of(2, 3, 4, 5, 6)
    .takeWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    // 2 3
takeUntil
  • 除了订阅源 Observable 外,通过 takeUntil 方法我们还可以监视另外一个 Observable, 即 notifier。
  • 如果 notifier 发出值或 complete 通知,那么源 Observable 便自动完成,停止发送事件。
let disposeBag = DisposeBag()
 
let source = PublishSubject<String>()
let notifier = PublishSubject<String>()
 
source
    .takeUntil(notifier)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
source.onNext("a")
source.onNext("b")
source.onNext("c")
source.onNext("d")
 
//停止接收消息
notifier.onNext("z")
 
source.onNext("e")
source.onNext("f")
source.onNext("g")
//a b c d
skipWhile
  • 该方法用于跳过前面所有满足条件的事件。
  • 一旦遇到不满足条件的事件,之后就不会再跳过了。
let disposeBag = DisposeBag()
 
Observable.of(2, 3, 4, 5, 6)
    .skipWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    }
}
// 4 5 6 
skipWhile
  • 同上面的 takeUntil 一样,skipUntil 除了订阅源 Observable 外,通过 skipUntil方法我们还可以监视另外一个 Observable, 即 notifier 。
  • 与 takeUntil 相反的是。源 Observable 序列事件默认会一直跳过,直到 notifier 发出值或 complete 通知。
let disposeBag = DisposeBag()
 
let source = PublishSubject<Int>()
let notifier = PublishSubject<Int>()
 
source
    .skipUntil(notifier)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
source.onNext(1)
source.onNext(2)
source.onNext(3)
source.onNext(4)
source.onNext(5)
 
//开始接收消息
notifier.onNext(0)
 
source.onNext(6)
source.onNext(7)
source.onNext(8)
 
//仍然接收消息
notifier.onNext(0)
 
source.onNext(9)
// 6 7 8 9

结合操作符

startWith
  • 该方法会在 Observable 序列开始之前插入一些事件元素。即发出事件消息之前,会先发出这些预先插入的事件消息。
let disposeBag = DisposeBag()
         
Observable.of("2", "3")
    .startWith("1")
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    // 1 2 3
merge
  • 该方法可以将多个(两个或两个以上的)Observable 序列合并成一个 Observable序列。
let disposeBag = DisposeBag()
         
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
 
Observable.of(subject1, subject2)
    .merge()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject1.onNext(20)
subject1.onNext(40)
subject1.onNext(60)
subject2.onNext(1)
subject1.onNext(80)
subject1.onNext(100)
subject2.onNext(1)
// 20 40 60 1 80 100 1
zip
  • 该方法可以将多个(两个或两个以上的)Observable 序列压缩成一个 Observable 序列。
  • 而且它会等到每个 Observable 事件一一对应地凑齐之后再合并。
let disposeBag = DisposeBag()
         
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<String>()
 
Observable.zip(subject1, subject2) {
    "\($0)\($1)"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject1.onNext(1)
subject2.onNext("A")
subject1.onNext(2)
subject2.onNext("B")
subject2.onNext("C")
subject2.onNext("D")
subject1.onNext(3)
subject1.onNext(4)
subject1.onNext(5)
// 1A 2B 3C 4D
combineLatest
  • 该方法同样是将多个(两个或两个以上的)Observable 序列元素进行合并。
  • 但与 zip 不同的是,每当任意一个 Observable 有新的事件发出时,它会将每个 Observable 序列的最新的一个事件元素进行合并。
let disposeBag = DisposeBag()
         
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<String>()
 
Observable.combineLatest(subject1, subject2) {
    "\($0)\($1)"
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject1.onNext(1)
subject2.onNext("A")
subject1.onNext(2)
subject2.onNext("B")
subject2.onNext("C")
subject2.onNext("D")
subject1.onNext(3)
subject1.onNext(4)
subject1.onNext(5)
//1A 2A 2B 2C 2D 3D 4D 5D
withLatestFrom
  • 该方法将两个 Observable 序列合并为一个。每当 self 队列发射一个元素时,便从第二个序列中取出最新的一个值。
let disposeBag = DisposeBag()
 
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
 
subject1.withLatestFrom(subject2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject1.onNext("A")
subject2.onNext("1")
subject1.onNext("B")
subject1.onNext("C")
subject2.onNext("2")
subject1.onNext("D")
// 1 1 2 
switchLatest
  • switchLatest 有点像其他语言的switch 方法,可以对事件流进行转换。
  • 比如本来监听的 subject1,我可以通过更改 variable 里面的 value 更换事件源。变成监听 subject2。
let disposeBag = DisposeBag()
 
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "1")
 
let variable = Variable(subject1)
 
variable.asObservable()
    .switchLatest()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject1.onNext("B")
subject1.onNext("C")
 
//改变事件源
variable.value = subject2
subject1.onNext("D")
subject2.onNext("2")
 
//改变事件源
variable.value = subject1
subject2.onNext("3")
subject1.onNext("E")
// A B C 1 2 D E

数学聚合操作符

toArray
  • 该操作符先把一个序列转成一个数组,并作为一个单一的事件发送,然后结束。
let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3)
    .toArray()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    //[1,2,3]
reduce
  • reduce 接受一个初始值,和一个操作符号。
  • reduce 将给定的初始值,与序列里的每个值进行累计运算。得到一个最终结果,并将其作为单个值发送出去。
let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3, 4, 5)
    .reduce(0, accumulator: +)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
    //15 
concat
  • concat 会把多个 Observable 序列合并(串联)为一个 Observable 序列。
  • 并且只有当前面一个 Observable 序列发出了 completed 事件,才会开始发送下一个 Observable 序列事件。
let disposeBag = DisposeBag()
 
let subject1 = BehaviorSubject(value: 1)
let subject2 = BehaviorSubject(value: 2)
 
let variable = Variable(subject1)
variable.asObservable()
    .concat()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
subject2.onNext(2)
subject1.onNext(1)
subject1.onNext(1)
subject1.onCompleted()
 
variable.value = subject2
subject2.onNext(2)
//1 1 1 2 2

连接操作符

可连接的序列(Connectable Observable):

  • (1)可连接的序列和一般序列不同在于:有订阅时不会立刻开始发送事件消息,只有当调用 connect()之后才会开始发送值。
  • (2)可连接的序列可以让所有的订阅者订阅后,才开始发出事件消息,从而保证我们想要的所有订阅者都能接收到事件消息。
publish
  • publish 方法会将一个正常的序列转换成一个可连接的序列。同时该序列不会立刻发送事件,只有在调用 connect 之后才会开始。
//每隔1秒钟发送1个事件
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .publish()
         
//第一个订阅者(立刻开始订阅)
_ = interval
    .subscribe(onNext: { print("订阅1: \($0)") })
 
//相当于把事件消息推迟了两秒
delay(2) {
    _ = interval.connect()
}
 
//第二个订阅者(延迟5秒开始订阅)
delay(5) {
    _ = interval
        .subscribe(onNext: { print("订阅2: \($0)") })
}
replay
  • replay 同上面的 publish 方法相同之处在于:会将将一个正常的序列转换成一个可连接的序列。同时该序列不会立刻发送事件,只有在调用 connect 之后才会开始。
  • replay 与 publish 不同在于:新的订阅者还能接收到订阅之前的事件消息(数量由设置的 bufferSize 决定)。
//每隔1秒钟发送1个事件
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .replay(5)
         
//第一个订阅者(立刻开始订阅)
_ = interval
    .subscribe(onNext: { print("订阅1: \($0)") })
 
//相当于把事件消息推迟了两秒
delay(2) {
    _ = interval.connect()
}
 
//第二个订阅者(延迟5秒开始订阅)
delay(5) {
    _ = interval
        .subscribe(onNext: { print("订阅2: \($0)") })
}
multicast
  • multicast 方法同样是将一个正常的序列转换成一个可连接的序列。
  • 同时 multicast 方法还可以传入一个 Subject,每当序列发送事件时都会触发这个 Subject 的发送。
//创建一个Subject(后面的multicast()方法中传入)
let subject = PublishSubject<Int>()
 
//这个Subject的订阅
_ = subject
    .subscribe(onNext: { print("Subject: \($0)") })
 
//每隔1秒钟发送1个事件
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .multicast(subject)
         
//第一个订阅者(立刻开始订阅)
_ = interval
    .subscribe(onNext: { print("订阅1: \($0)") })
 
//相当于把事件消息推迟了两秒
delay(2) {
    _ = interval.connect()
}
 
//第二个订阅者(延迟5秒开始订阅)
delay(5) {
    _ = interval
        .subscribe(onNext: { print("订阅2: \($0)") })
}
refCount
  • refCount 操作符可以将可被连接的 Observable 转换为普通 Observable
  • 即该操作符可以自动连接和断开可连接的 Observable。当第一个观察者对可连接的Observable 订阅时,那么底层的 Observable 将被自动连接。当最后一个观察者离开时,那么底层的 Observable 将被自动断开连接。
//每隔1秒钟发送1个事件
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .publish()
    .refCount()
 
//第一个订阅者(立刻开始订阅)
_ = interval
    .subscribe(onNext: { print("订阅1: \($0)") })
 
//第二个订阅者(延迟5秒开始订阅)
delay(5) {
    _ = interval
        .subscribe(onNext: { print("订阅2: \($0)") })
}
share(relay:)
  • 该操作符将使得观察者共享源 Observable,并且缓存最新的 n 个元素,将这些元素直接发送给新的观察者。
  • 简单来说 shareReplay 就是 replay 和 refCount 的组合。
//每隔1秒钟发送1个事件
        let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
            .share(replay: 5)
         
        //第一个订阅者(立刻开始订阅)
        _ = interval
            .subscribe(onNext: { print("订阅1: \($0)") })
         
        //第二个订阅者(延迟5秒开始订阅)
        delay(5) {
            _ = interval
                .subscribe(onNext: { print("订阅2: \($0)") })
        }
        
/// - Parameters:
///   - delay: 延迟时间(秒)
///   - closure: 延迟执行的闭包
public func delay(_ delay: Double, closure: @escaping () -> Void) {
    DispatchQueue.main.asyncAfter(deadline: .now() + delay) {
        closure()
    }

其他操作符

delay
  • 该操作符会将 Observable 的所有元素都先拖延一段设定好的时间,然后才将它们发送出来。
Observable.of(1, 2, 1)
            .delay(3, scheduler: MainScheduler.instance) //元素延迟3秒才发出
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
            //1 2 1
delaySubscription
  • 使用该操作符可以进行延时订阅。即经过所设定的时间后,才对 Observable 进行订阅操作。
Observable.of(1, 2, 1)
            .delaySubscription(3, scheduler: MainScheduler.instance) //延迟3秒才开始订阅
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
materialize
  • 该操作符可以将序列产生的事件,转换成元素。
    通常一个有限的 Observable 将产生零个或者多个 onNext 事件,最后产生一个 onCompleted - 或者onError事件。而 materialize 操作符会将 Observable 产生的这些事件全部转换成元素,然后发送出来。
Observable.of(1, 2, 1)
            .materialize()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
dematerialize
  • 该操作符的作用和 materialize 正好相反,它可以将 materialize 转换后的元素还原。
Observable.of(1, 2, 1)
            .materialize()
            .dematerialize()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
            //1 2 1
timeout
  • 使用该操作符可以设置一个超时时间。如果源 Observable 在规定时间内没有发任何出元素,就产生一个超时的 error 事件。
//定义好每个事件里的值以及发送的时间
        let times = [
            [ "value": 1, "time": 0 ],
            [ "value": 2, "time": 0.5 ],
            [ "value": 3, "time": 1.5 ],
            [ "value": 4, "time": 4 ],
            [ "value": 5, "time": 5 ]
        ]
         
        //生成对应的 Observable 序列并订阅
        Observable.from(times)
            .flatMap { item in
                return Observable.of(Int(item["value"]!))
                    .delaySubscription(Double(item["time"]!),
                                       scheduler: MainScheduler.instance)
            }
            .timeout(2, scheduler: MainScheduler.instance) //超过两秒没发出元素,则产生error事件
            .subscribe(onNext: { element in
                print(element)
            }, onError: { error in
                print(error)
            })
            .disposed(by: disposeBag)
            // 1 2 3
using
  • 使用 using 操作符创建 Observable 时,同时会创建一个可被清除的资源,一旦 Observable终止了,那么这个资源就会被清除掉了。
//一个无限序列(每隔0.1秒创建一个序列数 )
        let infiniteInterval$ = Observable<Int>
            .interval(0.1, scheduler: MainScheduler.instance)
            .do(
                onNext: { print("infinite$: \($0)") },
                onSubscribe: { print("开始订阅 infinite$")},
                onDispose: { print("销毁 infinite$")}
        )
         
        //一个有限序列(每隔0.5秒创建一个序列数,共创建三个 )
        let limited$ = Observable<Int>
            .interval(0.5, scheduler: MainScheduler.instance)
            .take(2)
            .do(
                onNext: { print("limited$: \($0)") },
                onSubscribe: { print("开始订阅 limited$")},
                onDispose: { print("销毁 limited$")}
        )
         
        //使用using操作符创建序列
        let o: Observable<Int> = Observable.using({ () -> AnyDisposable in
            return AnyDisposable(infiniteInterval$.subscribe())
        }, observableFactory: { _ in return limited$ }
        )
        o.subscribe()
        
class AnyDisposable: Disposable {
    let _dispose: () -> Void
     
    init(_ disposable: Disposable) {
        _dispose = disposable.dispose
    }
     
    func dispose() {
        _dispose()
    }
}

错误处理/调试

catchErrorJustReturn

  • 当遇到 error 事件的时候,就返回指定的值,然后结束。
let disposeBag = DisposeBag()
 
let sequenceThatFails = PublishSubject<String>()
 
sequenceThatFails
    .catchErrorJustReturn("错误")
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
sequenceThatFails.onNext("a")
sequenceThatFails.onNext("b")
sequenceThatFails.onNext("c")
sequenceThatFails.onError(MyError.A)
sequenceThatFails.onNext("d")
// a b c 错误

catchError

  • 该方法可以捕获 error,并对其进行处理。
  • 同时还能返回另一个 Observable 序列进行订阅(切换到新的序列)。
let disposeBag = DisposeBag()
 
let sequenceThatFails = PublishSubject<String>()
let recoverySequence = Observable.of("1", "2", "3")
 
sequenceThatFails
    .catchError {
        print("Error:", $0)
        return recoverySequence
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
 
sequenceThatFails.onNext("a")
sequenceThatFails.onNext("b")
sequenceThatFails.onNext("c")
sequenceThatFails.onError(MyError.A)
sequenceThatFails.onNext("d")

retry

  • 使用该方法当遇到错误的时候,会重新订阅该序列。比如遇到网络请求失败时,可以进行重新连接。
  • retry() 方法可以传入数字表示重试次数。不传的话只会重试一次。
let disposeBag = DisposeBag()
var count = 1
 
let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("a")
    observer.onNext("b")
     
    //让第一个订阅时发生错误
    if count == 1 {
        observer.onError(MyError.A)
        print("Error encountered")
        count += 1
    }
     
    observer.onNext("c")
    observer.onNext("d")
    observer.onCompleted()
     
    return Disposables.create()
}
 
sequenceThatErrors
    .retry(2)  //重试2次(参数为空则只重试一次)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

debug

  • 我们可以将 debug 调试操作符添加到一个链式步骤当中,这样系统就能将所有的订阅者、事件、和处理等详细信
    息打印出来,方便我们开发调试。
let disposeBag = DisposeBag()
 
Observable.of("2", "3")
    .startWith("1")
    .debug()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

RxSwift.Resources.total

  • 通过将 RxSwift.Resources.total 打印出来,我们可以查看当前 RxSwift 申请的所有资源数量。这个在检查内存泄露的时候非常有用。
print(RxSwift.Resources.total)
         
let disposeBag = DisposeBag()
 
print(RxSwift.Resources.total)
         
Observable.of("BBB", "CCC")
    .startWith("AAA")
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
         
print(RxSwift.Resources.total)

特征序列

Single

Single 是 Observable 的另外一个版本。但它不像 Observable 可以发出多个元素,它要么只能发出一个元素,要么产生一个 error 事件。

  • 发出一个元素,或一个 error 事件
  • 不会共享状态变化
public enum SingleEvent<Element> {
    case success(Element)
    case error(Swift.Error)
}

//获取豆瓣某频道下的歌曲信息
func getPlaylist(_ channel: String) -> Single<[String: Any]> {
    return Single<[String: Any]>.create { single in
        let url = "https://douban.fm/j/mine/playlist?"
            + "type=n&channel=\(channel)&from=mainsite"
        let task = URLSession.shared.dataTask(with: URL(string: url)!) { data, _, error in
            if let error = error {
                single(.error(error))
                return
            }
             
            guard let data = data,
                let json = try? JSONSerialization.jsonObject(with: data,
                                                             options: .mutableLeaves),
                let result = json as? [String: Any] else {
                    single(.error(DataError.cantParseJSON))
                    return
            }
             
            single(.success(result))
        }
         
        task.resume()
         
        return Disposables.create { task.cancel() }
    }
}
 
//与数据相关的错误类型
enum DataError: Error {
    case cantParseJSON
}


class ViewController: UIViewController {
    let disposeBag = DisposeBag()
     
    override func viewDidLoad() {
        //获取第0个频道的歌曲信息
        getPlaylist("0")
            .subscribe { event in
                switch event {
                case .success(let json):
                    print("JSON结果: ", json)
                case .error(let error):
                    print("发生错误: ", error)
                }
            }
            .disposed(by: disposeBag)
    }
}

class ViewController: UIViewController {
    let disposeBag = DisposeBag()
     
    override func viewDidLoad() {
        //获取第0个频道的歌曲信息
        getPlaylist("0")
            .subscribe(onSuccess: { json in
                print("JSON结果: ", json)
            }, onError: { error in
                print("发生错误: ", error)
            })
            .disposed(by: disposeBag)
    }
}
  • asSingle(),调用 Observable 序列的.asSingle()方法,将它转换为 Single。
let disposeBag = DisposeBag()
 
Observable.of("1")
    .asSingle()
    .subscribe({ print($0) })
    .disposed(by: disposeBag)

Completable

Completable 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能产生一个 completed 事件,要么产生一个 error 事件。

  • 不会发出任何元素
  • 只会发出一个 completed 事件或者一个 error 事件
  • 不会共享状态变化
public enum CompletableEvent {
    case error(Swift.Error)
    case completed
}
//将数据缓存到本地
func cacheLocally() -> Completable {
    return Completable.create { completable in
        //将数据缓存到本地(这里掠过具体的业务代码,随机成功或失败)
        let success = (arc4random() % 2 == 0)
         
        guard success else {
            completable(.error(CacheError.failedCaching))
            return Disposables.create {}
        }
         
        completable(.completed)
        return Disposables.create {}
    }
}
 
//与缓存相关的错误类型
enum CacheError: Error {
    case failedCaching
}

cacheLocally()
    .subscribe(onCompleted: {
         print("保存成功!")
    }, onError: { error in
        print("保存失败: \(error.localizedDescription)")
    })
    .disposed(by: disposeBag)

Maybe

Maybe 同样是 Observable 的另外一个版本。它介于 Single 和 Completable 之间,它要么只能发出一个元素,要么产生一个 completed 事件,要么产生一个 error 事件。

  • 发出一个元素、或者一个 completed 事件、或者一个 error 事件
  • 不会共享状态变化
public enum MaybeEvent<Element> {
    case success(Element)
    case error(Swift.Error)
    case completed
}

func generateString() -> Maybe<String> {
    return Maybe<String>.create { maybe in
         
        //成功并发出一个元素
        maybe(.success("hangge.com"))
         
        //成功但不发出任何元素
        maybe(.completed)
         
        //失败
        //maybe(.error(StringError.failedGenerate))
         
        return Disposables.create {}
    }
}
 
//与缓存相关的错误类型
enum StringError: Error {
    case failedGenerate
}

generateString()
    .subscribe { maybe in
        switch maybe {
        case .success(let element):
            print("执行完毕,并获得元素:\(element)")
        case .completed:
            print("执行完毕,且没有任何元素。")
        case .error(let error):
            print("执行失败: \(error.localizedDescription)")
   
        }
    }
    .disposed(by: disposeBag)
    
generateString()
    .subscribe(onSuccess: { element in
        print("执行完毕,并获得元素:\(element)")
    },
               onError: { error in
                print("执行失败: \(error.localizedDescription)")
    },
               onCompleted: {
                print("执行完毕,且没有任何元素。")
    })
    .disposed(by: disposeBag)
  • asMaybe() 我们可以通过调用 Observable 序列的 .asMaybe()方法,将它转换为 Maybe。

Driver

(1)Driver 可以说是最复杂的 trait,它的目标是提供一种简便的方式在 UI 层编写响应式代码。
(2)如果我们的序列满足如下特征,就可以使用它:

  • 不会产生 error 事件
  • 一定在主线程监听(MainScheduler)
  • 共享状态变化(shareReplayLatestWhileConnected)
代码讲解:
(1)首先我们使用 asDriver 方法将 ControlProperty 转换为 Driver。
(2)接着我们可以用 .asDriver(onErrorJustReturn: []) 方法将任何 Observable 序列都转成 Driver,因为我们知道序列转换为 Driver 要他满足 3 个条件:

*   不会产生 error 事件
*   一定在主线程监听(MainScheduler)
*   共享状态变化(shareReplayLatestWhileConnected)

而 asDriver(onErrorJustReturn: []) 相当于以下代码:
        let safeSequence = xs
         .observeOn(MainScheduler.instance) // 主线程监听
         .catchErrorJustReturn(onErrorJustReturn) // 无法产生错误
         .share(replay: 1, scope: .whileConnected)// 共享状态变化
         return Driver(raw: safeSequence) // 封装
(3)同时在 Driver 中,框架已经默认帮我们加上了 shareReplayLatestWhileConnected,所以我们也没必要再加上"replay"相关的语句了。
(4)最后记得使用 drive 而不是 bindTo

let results = query.rx.text.asDriver()        // 将普通序列转换为 Driver
    .throttle(0.3, scheduler: MainScheduler.instance)
    .flatMapLatest { query in
        fetchAutoCompleteItems(query)
            .asDriver(onErrorJustReturn: [])  // 仅仅提供发生错误时的备选返回值
    }
 
//将返回的结果绑定到显示结果数量的label上
results
    .map { "\($0.count)" }
    .drive(resultCount.rx.text) // 这里使用 drive 而不是 bindTo
    .disposed(by: disposeBag)
 
//将返回的结果绑定到tableView上
results
    .drive(resultsTableView.rx.items(cellIdentifier: "Cell")) { //  同样使用 drive 而不是 bindTo
        (_, result, cell) in
        cell.textLabel?.text = "\(result)"
    }
    .disposed(by: disposeBag)

ControlProperty

(1)ControlProperty 是专门用来描述 UI 控件属性,拥有该类型的属性都是被观察者(Observable)。

(2)ControlProperty 具有以下特征:

  • 不会产生 error 事件
  • 一定在 MainScheduler 订阅(主线程订阅)
  • 一定在 MainScheduler 监听(主线程监听)
  • 共享状态变化
class ViewController: UIViewController {
     
    @IBOutlet weak var textField: UITextField!
     
    @IBOutlet weak var label: UILabel!
     
    let disposeBag = DisposeBag()
     
    override func viewDidLoad() {
         
        //将textField输入的文字绑定到label上
        textField.rx.text
            .bind(to: label.rx.text)
            .disposed(by: disposeBag)
    }
}
 
extension UILabel {
    public var fontSize: Binder<CGFloat> {
        return Binder(self) { label, fontSize in
            label.font = UIFont.systemFont(ofSize: fontSize)
        }
    }
}

ControlEvent

(1)ControlEvent 是专门用于描述 UI 所产生的事件,拥有该类型的属性都是被观察者(Observable)。

(2)ControlEvent 和 ControlProperty 一样,都具有以下特征:

  • 不会产生 error 事件
  • 一定在 MainScheduler 订阅(主线程订阅)
  • 一定在 MainScheduler 监听(主线程监听)
    共享状态变化
extension Reactive where Base: UIButton {
    public var tap: ControlEvent<Void> {
        return controlEvent(.touchUpInside)
    }
}

 //订阅按钮点击事件
        button.rx.tap
            .subscribe(onNext: {
                print("欢迎访问hangge.com")
            }).disposed(by: disposeBag)

调度器

Schedulers

(1)调度器(Schedulers)是 RxSwift 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。

(2)RxSwift 内置了如下几种 Scheduler:

  • CurrentThreadScheduler:表示当前线程 Scheduler。(默认使用这个)
  • MainScheduler:表示主线程。如果我们需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler运行。
  • SerialDispatchQueueScheduler:封装了 GCD 的串行队列。如果我们需要执行一些串行任务,可以切换到这个 Scheduler 运行。
  • ConcurrentDispatchQueueScheduler:封装了 GCD 的并行队列。如果我们需要执行一些并发任务,可以切换到这个 Scheduler 运行。
  • OperationQueueScheduler:封装了 NSOperationQueue。
//过去我们使用 GCD 来实现,代码大概是这样的: 现在后台获取数据
DispatchQueue.global(qos: .userInitiated).async {
    let data = try? Data(contentsOf: url)
    //再到主线程显示结果
    DispatchQueue.main.async {
        self.data = data
    }
}


//如果使用 RxSwift 来实现,代码大概是这样的:
let rxData: Observable<Data> = ...
rxData
    .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated)) //后台构建序列
    .observeOn(MainScheduler.instance)  //主线程监听并处理序列结果
    .subscribe(onNext: { [weak self] data in
        self?.data = data
    })
    .disposed(by: disposeBag)
  • subscribeOn 与 observeOn 区别

(1)subscribeOn()

该方法决定数据序列的构建函数在哪个 Scheduler 上运行。
比如上面样例,由于获取数据、解析数据需要花费一段时间的时间,所以通过 subscribeOn 将其切换到后台 Scheduler 来执行。这样可以避免主线程被阻塞。

(2)observeOn()

该方法决定在哪个 Scheduler 上监听这个数据序列。
比如上面样例,我们获取并解析完毕数据后又通过 observeOn 方法切换到主线程来监听并且处理结果。

参考链接