くま's Tech系Blog

基本的には技術で学んだことを書き留めようと思います。雑談もやるかもね!

RxSwiftのSubject・Relayについて

今回はRxSwift(RxCocoa)のSubject・Relayについてまとめようと思います

Rxについては以前記事にしたので読んでみてください!

kumaskun.hatenablog.com

Subject

まずはSubjectについてです

Observableはイベントを検知するためのクラスですが、Subjectと後述するRelayはイベントの検知も、イベントの発生もできるクラスです

SubjectはonNextonErroronCompleteの3種類のイベントを流すことができます。 なので、通信処理やDB処理などでメッセージを出したいなどのエラーによって処理を分けたいときに使用します

Subjectには次の3種類があります

  • PublishSubject: subscribeしてから発生したeventを受け取れる
  • ReplaySubject: subscribe以前に発生したeventも受け取れる。バッファサイズを指定する
  • BehaviorSubject: 一つ過去のeventを受け取れる。購読した際に必ず一度イベントが発生する。初期値が必要

PublishSubjectはイベントが発生したタイミングで、直前の値をストリームに流します。よって、次のようにイベントごとにsubscribe()します。 また、PublishSubjectは初期値を設定する必要がありません

let sub = PublishSubject<String>()
        
sub.subscribe(onNext: { str in
    print(str)
}).disposed(by: disposeBag)
        
sub.onNext("1")
sub.onNext("2")
sub.onNext("3")
sub.onCompleted()

// printされるのは"1"、"2"、"3"

ReplaySubjectは購読直後に引数で指定したbufferSizeの分だけ前の値をストリームに流します。 前の値がない場合はPublishSubjectと同じように振る舞います

let sub = ReplaySubject<String>.create(bufferSize: 2)
        
sub.onNext("1")
sub.onNext("2")
sub.onNext("3")
sub.onNext("4")
sub.onNext("5")
        
sub.subscribe(onNext: { str in
    print(str)
}).disposed(by: disposeBag)

sub.onCompleted()

// printされるのは"4"、"5"

今回はbufferSizeに2を指定することによって、購読直後に2つ前の値をストリームに流す設定になっています。 また、subscribeをonNextよりも前で定義すると「前の値がない場合」に当てはまるので、イベントごとにsubscribe()を行います

BehaviorSubjectはストリームを購読する際に、直前の値を一度流してからsubscribe()します。 つまり、購読した際に必ず一度イベントが発生します

let sub = BehaviorSubject<String>(value: "1")

sub.subscribe(onNext: { str in
    print(str)
}).dispose()

sub.onNext("2")
sub.onNext("3")
sub.onCompleted()

// printされるのは"3"のみ

Relay

次はRelayについてです

RelayもSubjectと同様にイベントの検知も、イベントの発生もできるクラスです。 ただ、Subjectとの違いはonNextのみイベントとして流すことができる点です(Relayはイベントを流す際にはonNextではなくacceptを使用します)。

Relayには、BehaviorRelayPublishRelayがあります。 PublishRelayはPublishSubjectのwrapperで、BehaviorRelayはBehaviorSubjectのwrapperになります。 BehaviorRelayは初期値があり、PublishRelayには初期値がありません。 そして、subscribeしたときにBehaviorRelayは現在値を流し、PublishRelayは現在値を流さないです。

let sub = PublishRelay<String>()
sub.accept("1")
sub.accept("2")
                
sub.subscribe(onNext: { str in
    print(str)
}).disposed(by: disposeBag)
                
sub.accept("3")

// printされるのは"3"のみ(subscribeする前の"1"、"2"は出力されない)

BehaviorRelayは現在の値を取得する際にvalueを使います

PublishRelayの際にはsubscribeで値が取得できていましたが、BehaviorRelayのsubscribeではイベントが流れてきます

let behaviorRelay = BehaviorRelay<String>(value: "1")

behaviorRelay.subscribe{ _ in
    print(behaviorRelay.value)
}.disposed(by: disposeBag)

behaviorRelay.accept("2")
behaviorRelay.accept("3")
behaviorRelay.accept("4")

// printされるのは"1"、"2"、"3"、"4"が出力される

Hot/Coldについての補足

ObservableはHotとColdの2種類の性質があります

HotなObservableとはsubscribeしているObserverがいなくても、値が流れ続けるObservableのことで、 ColdなObservableとはsubscribeしているObserverが発生するまで、値が流れないObservableのことです

これをSubjectやRelayを例にして補足します

let testRelay = PublishRelay<String>()

let cold = testRelay.map { str in
    print("\(str) ここを通る")
}

testRelay.subscribe(onNext: { str in
    print(str)
})

testRelay.accept("Test")

上記の例のPublishRelayはHotなObservableです。 そして、HotなObservableをmapなどのoperatorを使うことでColdなObservableに変換しています。 coldはColdなObservableなのでsubscribeしないとflatMapやmapの中身が評価されることはありません。 なので、"Test"は出力されますが、print("\(str) ここを通る")は出力されません

その上で次のパターンを見てみましょう

let testRelay = PublishRelay<String>()

let cold = testRelay.map { str in
  print(" \(str) ここを通る")
}

testRelay.subscribe(onNext: { str in
  print(str)
})

testRelay.accept("hoge")

cold.subscribe(onNext: { _ in
  print("ここを通る!!")
})

testRelay.accept("fuga")

実行してみるとprintしたものすべてが表示されます

実行結果から、fugaがtestRelayに流れてきたときにcoldにも値が流れていることが確認できます。 hogeでは発火していないのは、値が流れた時点ではObserverが存在しなかったためです。(cold.subscribeが実行されていないため) このようにHotやCold意識しないとsubscribeするタイミングによって挙動が変わり、バグに繋がったケースがあり得るので注意が必要です

参照

reactivex.io

github.com

zenn.dev