- 前言
- 1. collect接收 *** 作符
- 2. launchIn *** 作符
- 3. onEach *** 作符
- 4. 组合 *** 作符
- 4.1 zip *** 作符
- 4.2 combine *** 作符
- 5. 展平流
- 5.1 flatMapConcat连接模式
- 5.2 flatMapMerge并发模式
- 5.3 flatMapLatest
- 参考
协程 *** 作符的第二篇,请参考第一篇Kotlin 协程Flow主要 *** 作符(一)
1. collect接收 *** 作符用于数据接收,此 *** 作符没有返回对象,后面不可再添加 *** 作符。
fun flowCollect() {
//流启动
viewModelScope.launch {
flow {
for (i in 1..3) {
LogUtils.d(" Emitting $i")
emit(i)
delay(1000)
}
}.collect {
LogUtils.d(" collect $it")
}//后面不可再接收其他 *** 作符
}
}
2. launchIn *** 作符
流的启动主要有两种,一种是上面的作用域.launch启动一个流,用collect *** 作符接收数据;
一种是launchIn *** 作符启动流,官方不建议用这种,可能出于数据安全考虑,一般建议在onResume方法调用后启动协程,因为怕数据接收了,但View还没创建出来。但链式调用真的很好用,用onEach *** 作符接收数据。
flow {
for (i in 1..3) {
LogUtils.d("Emitting $i")
emit(i)
delay(1000)
}
}.onCompletion {
LogUtils.d("onCompletion ")
}.launchIn(viewModelScope)//在ViewModel中,直接launchIn在ViewModelScope作用域
3. onEach *** 作符
返回一个流,该流在上游流的每个值向下游发出之前调用给定的 *** 作。也可以用来接收数据,与上面collect不同的是此 *** 作符返回流,我们后面还可接其他 *** 作符。上面的launchIn *** 作符启动时,可用于接收数据。
fun flowOnEach() {
flow {
for (i in 1..3) {
LogUtils.d("Emitting $i")
emit(i)
delay(1000)
}
}.onEach {
LogUtils.d("onEach $it")
}.onCompletion {
LogUtils.d("onCompletion ")
}.launchIn(viewModelScope)
}
D/FlowViewModel: Emitting 1
D/FlowViewModel: onEach 1
D/FlowViewModel: Emitting 2
D/FlowViewModel: onEach 2
D/FlowViewModel: Emitting 3
D/FlowViewModel: onEach 3
D/FlowViewModel: onCompletion
4. 组合 *** 作符
4.1 zip *** 作符
zip *** 作符用于组合两个流中的相关值,与Rxjava中的zip *** 作符类似
fun flowZip() {
viewModelScope.launch {
val nums = flowOf(1, 2, 3)
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" }//两个流数据组合
.collect {
LogUtils.d("collect==$it")
}
}
}
D/Collect: collect==1 -> one
D/Collect: collect==2 -> two
D/Collect: collect==3 -> three
4.2 combine *** 作符
当流表示一个变量或 *** 作的最新值时,可能需要执行计算,这依赖于相应流的最新值,并且每当上游流产生值的时候都需要重新计算。这种相应的 *** 作符家族称为 combine。
例如,先前示例中的数字如果每 300 毫秒更新一次,但字符串每 400 毫秒更新一次, 然后使用 zip *** 作符合并它们,但仍会产生相同的结果, 尽管每 400 毫秒打印一次结果
private fun flowCombine() {
viewModelScope.launch {
val nums = flowOf(1, 2, 3).onEach { delay(300) }
val strs = flowOf("one", "two", "three").onEach { delay(400) }
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.combine(strs) { a, b -> "$a -> $b" }
.collect {
LogUtils.d("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
}
D/Collect: 1 -> one at 418 ms from start
D/Collect: 2 -> one at 627 ms from start
D/Collect: 2 -> two at 827 ms from start
D/Collect: 3 -> two at 931 ms from start
D/Collect: 3 -> three at 1231 ms from start
5. 展平流
流表示异步接收的值序列,所以很容易遇到这样的情况: 每个值都会触发对另一个值序列的请求。比如说,我们可以拥有下面这样一个返回间隔 500 毫秒的两个字符串流的函数:
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
现在,如果我们有一个包含三个整数的流,并为每个整数调用 requestFlow,如下所示:
(1..3).asFlow().map { requestFlow(it) }
然后我们得到了一个包含流的流(Flow
它们是相应序列 *** 作符最相近的类似物。它们在等待内部流完成之前开始收集下一个值。一个流数据完成后,再收集下一个流数据。
private fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
private fun flowFlatMapConcat() {
viewModelScope.launch() {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapConcat { requestFlow(it) }
.collect { value -> // 收集并打印
LogUtils.d("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
}
D/Collect: 1: First at 121 ms from start
D/Collect: 1: Second at 624 ms from start
D/Collect: 2: First at 728 ms from start
D/Collect: 2: Second at 1230 ms from start
D/Collect: 3: First at 1332 ms from start
D/Collect: 3: Second at 1834 ms from start
5.2 flatMapMerge并发模式
另一种展平模式是并发收集所有传入的流,并将它们的值合并到一个单独的流,以便尽快的发射值。它由 flatMapMerge 与 flattenMerge *** 作符实现。
private fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
private fun flowFlatMapMerge() {
viewModelScope.launch() {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapMerge { requestFlow(it) }
.collect { value -> // 收集并打印
LogUtils.d("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
}
D/Collect: 1: First at 104 ms from start
D/Collect: 2: First at 205 ms from start
D/Collect: 3: First at 308 ms from start
D/Collect: 1: Second at 607 ms from start
D/Collect: 2: Second at 708 ms from start
D/Collect: 3: Second at 812 ms from start
5.3 flatMapLatest
与 collectLatest *** 作符类似,处理最新值,也有相对应的“最新”展平模式,在发出新流后立即取消先前流的收集。 这由 flatMapLatest *** 作符来实现。
参考-
协程文档
-
协程中文网
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)