用 Kotlin 原生的方式处理异步数据流 —— 从冷流到热流,从操作符到实战
链式推理,每步推导下一步
真相永远只有一个,而通往真相的路径,是一条环环相扣的推理链。Flow 就是我们追踪异步数据真相的证据链 —— 每个操作符都是一件鉴证工具,每次 emit 都是一条新线索,而 collect 则是最终的破案时刻。跟我一起,沿着这条证据链,一步步推导出数据流的全貌吧。
各位侦探,在上一章中我们掌握了挂起函数和协程 —— 这些是我们手中的基础调查工具。挂起函数就像一次性的现场取证:去犯罪现场,采集一份指纹,得到一个结果。但真正复杂的案件远非如此简单。很多案件需要持续追踪:数据库的实时变更就像嫌疑人的行踪轨迹、传感器的持续读数就像监控录像、用户输入的连续事件就像线人的持续情报、WebSocket 推送的消息就像窃听到的通讯内容......
挂起函数只能返回一个值 —— 就像派侦探出去调查,只能带回一份报告。如果你想要连续的情报,就只能让他把所有情报攒齐了再一次性汇报,但这样你就错过了实时追踪的最佳时机:
// 挂起函数:只能返回一个结果
suspend fun fetchUser(): User {
return api.getUser() // 一次请求,一个结果
}
// 想返回多个值?只能用 List,但必须等全部完成
suspend fun fetchAllPrices(): List<Double> {
val prices = mutableListOf<Double>()
for (i in 1..100) {
prices.add(getPrice(i)) // 必须全部拿到才能返回
}
return prices
}
推理链的第一环就在这里断裂了:调用者必须等到所有数据都准备好才能开始分析,而且无法表达"持续追踪"的场景 —— 比如嫌疑人的位置每秒都在变化,你不能等他停下来才开始跟踪。
有的侦探可能会说:"我们不是有 LiveData 这个线人网络吗?"确实,LiveData 可以持续推送情报,但这个情报网有明显的短板:
map 和 switchMap 两个变换操作符 —— 鉴证工具箱里只有放大镜和镊子,遇到复杂现场就束手无策。推理到这里,证据链指向了一个结论:我们需要一个既能持续追踪数据、又能与协程无缝配合的工具。Flow 就是 Kotlin 协程库提供的异步数据流解决方案 —— 它就是我们的终极侦查系统,结合了协程的简洁语法和响应式流的强大追踪能力:
// Flow:可以按需逐个发射值
fun priceUpdates(): Flow<Double> = flow {
while (true) {
val price = fetchLatestPrice()
emit(price) // 每获取一个值就发射出去
delay(1000) // 每秒更新一次
}
}
在侦查的世界里,案件分为两种:
flow { } 就是冷流。StateFlow 和 SharedFlow 是热流。这里有一条关键的推理:Flow 的设计哲学是"按需调查"。冷流只在被收集(collect)时才启动调查,这意味着如果没有侦探关注这条线索,就不会浪费任何资源去追踪它。这与 Kotlin 的序列(Sequence)理念环环相扣,只不过 Flow 是为异步侦查场景设计的。
好,让我们打开第一份悬案卷宗。创建 Flow 最基本的方式是使用 flow { } 构建器 —— 这就是建立一份案件档案。在 lambda 中通过 emit() 提交线索:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
fun simpleFlow(): Flow<Int> = flow {
println("Flow 开始执行")
for (i in 1..3) {
delay(100) // 模拟异步操作
emit(i) // 发射值
println("已发射 $i")
}
}
fun main() = runBlocking {
println("调用 simpleFlow()...")
val flow = simpleFlow() // 这里什么都不会发生!
println("准备收集...")
flow.collect { value ->
println("收到: $value")
}
}
运行结果:
调用 simpleFlow()...
准备收集...
Flow 开始执行
已发射 1
收到: 1
已发射 2
收到: 2
已发射 3
收到: 3
注意这条证据链的关键细节:调用 simpleFlow() 时没有任何打印 —— 悬案卷宗只是被取出来了,还没有人翻开它。直到 collect 被调用,侦探才真正开始逐条审阅线索。这就是冷流的核心特性 —— 没有侦探接手,案件就不会启动。
collect 是一个挂起函数,它就像侦探正式接手案件 —— 触发整个调查流程并逐条接收线索。在 collect 返回之前,当前协程会一直挂起。这意味着 collect 之后的代码要等到案件完全结案后才会执行:
runBlocking {
simpleFlow().collect { value ->
println("收到: $value")
}
println("Flow 已完成") // 在所有值都收集完后才打印
}
除了 flow { } 这种"从零建档"的方式,Kotlin 还提供了一些快速立案的便捷方法:
// flowOf:从已知线索直接建档
val numbersFlow = flowOf(1, 2, 3, 4, 5)
// asFlow:将已有证据集转换为案件流
val listFlow = listOf("A", "B", "C").asFlow()
val rangeFlow = (1..10).asFlow()
// 使用示例
runBlocking {
flowOf("苹果", "香蕉", "橘子").collect { fruit ->
println("水果: $fruit")
}
}
冷流的一个重要推理要点:每次调用 collect,就像一位新侦探从头翻开卷宗,独立展开调查。多位侦探之间互不干扰,各自得出自己的发现:
fun timestampFlow(): Flow<Long> = flow {
emit(System.currentTimeMillis())
delay(100)
emit(System.currentTimeMillis())
}
runBlocking {
val flow = timestampFlow()
println("=== 第一次收集 ===")
flow.collect { println(it) }
delay(500)
println("=== 第二次收集 ===")
flow.collect { println(it) } // 完全独立,时间戳不同
}
flow { } 构建器中的代码直到被 collect 时才执行,并且每次 collect 都会重新执行 —— 每位侦探都是从头调查。如果你在 flow 构建器中做了网络请求,每次 collect 都会发一次请求。如果你希望多个侦探共享同一份实时情报,应该使用热流(StateFlow / SharedFlow),我们将在 12.5 节中揭开这层真相。
现在,推理链进入最精彩的部分。Flow 的强大之处在于它提供了一整套鉴证工具来变换、组合和处理数据流。每一个操作符就是证据链上的一环,环环相扣,最终指向真相。而且这些工具和 Kotlin 集合操作符的命名非常相似,学习成本很低。
map —— 证据变换工具。对每条线索进行一对一的鉴定转换,就像对指纹进行比对分析:
flowOf(1, 2, 3)
.map { it * it } // 1, 4, 9
.collect { println(it) }
filter —— 证据筛选工具。把不符合调查方向的线索过滤掉,只保留有价值的证据:
(1..20).asFlow()
.filter { it % 2 == 0 } // 只保留偶数
.collect { println(it) } // 2, 4, 6, 8, ..., 20
transform —— 最灵活的鉴证工具。可以从一条线索推导出任意数量的新线索,就像从一个证物中提取出多种鉴定信息:
flowOf("张三", "李四", "王五")
.transform { name ->
emit("正在处理: $name")
val result = processName(name) // 模拟耗时操作
emit("处理完成: $result")
}
.collect { println(it) }
// 输出:
// 正在处理: 张三
// 处理完成: ZHANG SAN
// 正在处理: 李四
// ...
take —— 限量取证。只取前 N 条线索就结案,就像侦探只需要关键的几条证据就能锁定嫌疑人:
flow {
var i = 0
while (true) {
emit(i++)
delay(100)
}
}
.take(5) // 只取前 5 个值
.collect { println(it) } // 0, 1, 2, 3, 4
复杂案件往往需要将多条独立的证据链进行交叉比对。这时就需要组合操作符 —— 它们就是我们的证据关联工具。
zip —— 证据配对。将两条证据链严格一一配对,就像将嫌疑人名单和他们的不在场证明一一对应。任何一方的证据用完,配对就结束:
val names = flowOf("张三", "李四", "王五")
val ages = flowOf(25, 30, 28)
names.zip(ages) { name, age ->
"$name, $age 岁"
}.collect { println(it) }
// 输出:
// 张三, 25 岁
// 李四, 30 岁
// 王五, 28 岁
combine —— 实时关联。将两条证据链动态组合,任意一方出现新情报时,立刻与另一方的最新情报进行关联分析 —— 就像实时追踪两个嫌疑人的位置,任何一方移动都要重新评估他们的关系:
val temperature = flow {
emit(20)
delay(300)
emit(25)
delay(300)
emit(22)
}
val humidity = flow {
emit(60)
delay(500)
emit(55)
}
temperature.combine(humidity) { temp, hum ->
"温度: ${temp}°C, 湿度: ${hum}%"
}.collect { println(it) }
zip 和 combine 的区别是推理中的关键一环,必须厘清:
| 特性 | zip | combine |
|---|---|---|
| 配对方式 | 严格一一配对 | 总是使用最新值组合 |
| 触发时机 | 两边都有新值时 | 任意一边有新值时 |
| 结束条件 | 任一流结束 | 两个流都结束 |
| 适用场景 | 配对两组关联数据 | 合并多个独立数据源 |
有时候,一条线索会引出一个全新的子案件 —— 也就是一个 Flow 发射的值本身也是 Flow(即 Flow<Flow<T>>)。这时需要展平操作符将嵌套的调查"拍平"成一条连贯的证据链。
flatMapConcat —— 顺序侦查。按顺序逐一追踪每条子线索,前一条调查完毕后才开启下一条,确保证据链不会混乱:
flowOf(1, 2, 3)
.flatMapConcat { id ->
flow {
emit("开始处理 $id")
delay(100)
emit("完成处理 $id")
}
}
.collect { println(it) }
// 严格顺序: 先处理完 1,再处理 2,再处理 3
flatMapMerge —— 并发侦查。同时派出多组侦探追踪所有子线索,结果交错汇报 —— 效率最高,但证据到达的顺序不可预测:
flowOf(1, 2, 3)
.flatMapMerge { id ->
flow {
emit("开始处理 $id")
delay(100)
emit("完成处理 $id")
}
}
.collect { println(it) }
// 并发执行: 所有任务同时开始,结果交错出现
flatMapLatest —— 追最新线索。每当新线索出现,立刻放弃正在追踪的旧线索,全力追踪最新的那条 —— 就像嫌疑人突然改变了逃跑方向,侦探必须立即调头:
flowOf("A", "B", "C")
.flatMapLatest { letter ->
flow {
emit("处理 $letter - 第1步")
delay(200)
emit("处理 $letter - 第2步") // 可能被取消
}
}
.collect { println(it) }
// 只有最后一个 "C" 的两步都会完成
终端操作符是整条推理链的"结案陈词" —— 它们触发 Flow 的执行并产生最终结论:
val numbers = flowOf(1, 2, 3, 4, 5)
// collect:逐条审阅每一条证据
numbers.collect { println(it) }
// toList:把所有证据收集成案卷
val list: List<Int> = numbers.toList()
// first:取第一条关键证据(之后结案)
val first: Int = numbers.first()
// reduce:累积推理
val sum: Int = numbers.reduce { acc, value -> acc + value }
println("sum = $sum") // sum = 15
// fold:带初始假设的累积推理
val product: Int = numbers.fold(1) { acc, value -> acc * value }
println("product = $product") // product = 120
这条推理链的关键在于:Flow 的中间操作符(map、filter 等)不会触发执行,它们只是在构建一条完整的证据处理管道。只有终端操作符(collect、toList、first 等)被调用时 —— 也就是侦探正式接手案件时 —— 整个管道才会启动运转。这类似于 Java 的 Stream 或 Kotlin 的 Sequence,环环相扣但按需执行。
在 Android 案件侦查中,我们经常需要让外勤侦探在现场(IO 线程)采集证据,然后把证据送回总部(主线程)进行分析展示。Flow 提供了优雅的"现场-总部"调度机制。
首先来追踪一条重要规则:flow { } 构建器中的代码默认运行在收集者所在的协程上下文中。这叫做"上下文保留"(context preservation)—— 就像侦探默认在接手案件的办公室里工作:
fun myFlow(): Flow<Int> = flow {
println("Flow 运行在: ${Thread.currentThread().name}")
emit(1)
}
runBlocking {
myFlow().collect { value ->
println("收集在: ${Thread.currentThread().name}")
}
}
// Flow 运行在: main
// 收集在: main
flowOn 是 Flow 中用于切换调查地点的标准指令。它改变的是上游(flowOn 之前的操作)的执行位置 —— 就像命令外勤侦探去犯罪现场取证,取回的证据再送到总部处理:
fun loadDataFlow(): Flow<String> = flow {
// 这部分在 IO 线程执行
println("发射数据在: ${Thread.currentThread().name}")
val data = loadFromDatabase()
emit(data)
}.flowOn(Dispatchers.IO) // 将上游切换到 IO 调度器
// 在 ViewModel 中使用
fun observeData() {
viewModelScope.launch {
// collect 在主线程(viewModelScope 默认在主线程)
loadDataFlow().collect { data ->
println("收集数据在: ${Thread.currentThread().name}")
_uiState.value = data // 安全地更新 UI 状态
}
}
}
你可能会有一个大胆的推理:"在 flow 构建器里用 withContext 切换线程不就行了?"但这条推理链是断裂的 —— 不行。Flow 禁止在 flow 构建器中使用 withContext 来发射值,这会抛出异常:
// 错误!会抛出 IllegalStateException
fun badFlow(): Flow<Int> = flow {
withContext(Dispatchers.IO) {
emit(1) // 禁止!不能从不同的上下文发射
}
}
// 正确:使用 flowOn
fun goodFlow(): Flow<Int> = flow {
val data = heavyComputation()
emit(data)
}.flowOn(Dispatchers.Default)
推理到深层原因:这是为了保证证据链的完整性(线程安全)。emit 不是线程安全的,如果允许从任意地点提交线索,就可能导致证据链出现并发篡改的混乱。flowOn 在内部通过 Channel 来安全地跨线程传递数据,就像用密封的证据袋在现场和实验室之间传递证物,所以是安全的。
这是 Android 侦查中最经典的模式:外勤在 IO 现场取证,总部在主线程分析展示:
class UserRepository(private val dao: UserDao) {
fun getUsers(): Flow<List<User>> = flow {
while (true) {
val users = dao.queryAllUsers() // 数据库操作
emit(users)
delay(5000) // 每 5 秒刷新
}
}.flowOn(Dispatchers.IO) // 数据库操作在 IO 线程
}
class UserViewModel(private val repo: UserRepository) : ViewModel() {
val users: StateFlow<List<User>> = repo.getUsers()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
}
推理链到了转折点。普通的 Flow 是悬案卷宗(冷流),每位侦探独立从头调查。但很多场景需要一块实时案情看板 —— 所有侦探共享同一份最新情报,新来的侦探一看就知道案件目前的状况。这就是热流登场的时候了。
StateFlow 是一种特殊的热流 —— 它就是侦探总部的实时案情看板。它始终展示着当前最新的案情状态,任何新到的侦探一抬头就能看到最新进展。它是 LiveData 在协程世界的替代品:
class CounterViewModel : ViewModel() {
// MutableStateFlow:可修改的案情看板
private val _count = MutableStateFlow(0)
// 对外暴露只读的案情看板
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() {
_count.value++ // 直接修改 value 属性
}
fun decrement() {
_count.value--
}
fun updateSafely() {
_count.update { current ->
current + 1 // 原子更新,线程安全
}
}
}
StateFlow 的三个核心特性,是我们推理的关键证据:
.value 直接读取当前案情。equals 比较),看板不会重复通知侦探们 —— 没有新线索就不浪费大家的注意力。StateFlow 是 ViewModel 管理 UI 状态的最佳选择 —— 就像案件指挥中心的核心看板:
data class UiState(
val isLoading: Boolean = false,
val items: List<String> = emptyList(),
val error: String? = null
)
class ItemViewModel(private val repo: ItemRepository) : ViewModel() {
private val _uiState = MutableStateFlow(UiState())
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
fun loadItems() {
viewModelScope.launch {
_uiState.update { it.copy(isLoading = true) }
try {
val items = repo.fetchItems()
_uiState.update {
it.copy(isLoading = false, items = items)
}
} catch (e: Exception) {
_uiState.update {
it.copy(isLoading = false, error = e.message)
}
}
}
}
}
SharedFlow 适合表示一次性的紧急通报(如导航、Toast、Snackbar)—— 它不是持续展示的案情看板,而是一声警报,听到就行动,没听到就错过。它没有初始值,可以配置回放(replay):
class EventViewModel : ViewModel() {
// 无回放:新来的侦探不会收到之前的通报
private val _events = MutableSharedFlow<UiEvent>()
val events: SharedFlow<UiEvent> = _events.asSharedFlow()
fun onItemClicked(id: String) {
viewModelScope.launch {
_events.emit(UiEvent.NavigateToDetail(id))
}
}
fun onError(message: String) {
viewModelScope.launch {
_events.emit(UiEvent.ShowSnackbar(message))
}
}
}
sealed class UiEvent {
data class NavigateToDetail(val id: String) : UiEvent()
data class ShowSnackbar(val message: String) : UiEvent()
}
每位优秀的侦探都要学会对比证据。让我们把这四种数据流工具放在一起,进行一次全面的交叉鉴定:
| 特性 | Flow | StateFlow | SharedFlow | LiveData |
|---|---|---|---|---|
| 冷/热 | 冷流 | 热流 | 热流 | 热流 |
| 有无初始值 | 无 | 必须有 | 可选(replay) | 可选 |
| 去重 | 无 | 是(equals) | 否 | 否 |
| 多收集者 | 各自独立 | 共享最新值 | 共享事件 | 共享最新值 |
| 生命周期感知 | 否 | 否(需配合) | 否(需配合) | 是 |
| 操作符 | 丰富 | 丰富 | 丰富 | 极少 |
| 线程控制 | 灵活 | 灵活 | 灵活 | 仅主线程分发 |
| 适用场景 | 数据库查询、网络流 | UI 状态 | 一次性事件 | 简单 UI 状态 |
在 Jetpack Compose 中,使用 collectAsStateWithLifecycle 来安全地收集 Flow —— 它会在界面不可见时自动暂停追踪,就像侦探下班后暂停监控,避免白白浪费精力:
@Composable
fun ItemListScreen(viewModel: ItemViewModel) {
// 推荐:生命周期感知的收集
val uiState by viewModel.uiState.collectAsStateWithLifecycle()
when {
uiState.isLoading -> LoadingIndicator()
uiState.error != null -> ErrorMessage(uiState.error!!)
else -> ItemList(uiState.items)
}
}
@Composable
fun EventHandler(viewModel: EventViewModel) {
val context = LocalContext.current
// 收集一次性事件
LaunchedEffect(Unit) {
viewModel.events.collect { event ->
when (event) {
is UiEvent.ShowSnackbar -> {
// 显示 Snackbar
}
is UiEvent.NavigateToDetail -> {
// 导航到详情页
}
}
}
}
}
collectAsState 在组合存续期间始终活跃地收集 Flow —— 就像侦探 24 小时不间断地盯着监控。而 collectAsStateWithLifecycle(来自 lifecycle-runtime-compose 库)会在界面进入后台时自动暂停收集。在 Android 开发中,始终优先使用 collectAsStateWithLifecycle,让侦探在没有必要时好好休息,以避免在后台浪费 CPU 和网络资源。
任何一条严谨的推理链都必须考虑意外情况 —— 线索断裂、证人翻供、现场被破坏。生产环境中的 Flow 同样必须妥善处理错误。Flow 提供了声明式的错误处理操作符,让你可以像搭建推理链一样,环环相扣地组合错误处理逻辑。
catch 操作符可以捕获上游的异常(即 catch 之前的所有操作)—— 就像在证据链的某个环节设置一道安全网,拦截上游出现的任何问题。它不会捕获下游(catch 之后)的异常:
flow {
emit(1)
emit(2)
throw RuntimeException("出错了")
emit(3) // 永远不会执行
}
.catch { e ->
println("捕获到异常: ${e.message}")
emit(-1) // 可以在 catch 中发射备用值
}
.collect { println(it) }
// 输出:
// 1
// 2
// 捕获到异常: 出错了
// -1
推理中的一个关键细节:catch 只捕获上游的异常。如果异常发生在 collect 的 lambda 中 —— 也就是侦探在分析证据时自己犯了错 —— catch 是捕获不到的:
// catch 无法捕获 collect 中的异常!
flowOf(1, 2, 3)
.catch { println("不会捕获到 collect 中的异常") }
.collect { value ->
if (value == 2) throw RuntimeException("崩溃") // 这个不会被 catch 捕获
println(value)
}
// 解决方案:把 collect 的逻辑移到 onEach 中,让它变成上游
flowOf(1, 2, 3)
.onEach { value ->
if (value == 2) throw RuntimeException("崩溃")
println(value)
}
.catch { e -> println("捕获到: ${e.message}") }
.collect()
onCompletion 在 Flow 结案时(无论是正常结案还是因异常终止)执行 —— 类似于案件归档程序,不管破案与否都要走这一步:
flowOf(1, 2, 3)
.onEach { println("处理: $it") }
.onCompletion { cause ->
if (cause != null) {
println("Flow 异常完成: ${cause.message}")
} else {
println("Flow 正常完成")
}
}
.collect()
追踪嫌疑人时,第一次跟丢了不代表案件终结 —— 我们需要重试。网络请求同样经常需要重试机制。Flow 提供了两个重试操作符:
// retry:简单重试,指定次数
flow {
val response = api.fetchData()
emit(response)
}
.retry(3) { cause ->
cause is IOException // 只对 IO 异常重试
}
.catch { e -> println("3次重试后仍然失败: ${e.message}") }
.collect { println(it) }
// retryWhen:精细控制重试逻辑,像制定详细的追踪方案
flow {
val response = api.fetchData()
emit(response)
}
.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
val delayMs = (1000L * (1 shl attempt.toInt())) // 指数退避
println("第 ${attempt + 1} 次重试,等待 ${delayMs}ms")
delay(delayMs)
true // 返回 true 表示重试
} else {
false // 返回 false 表示放弃
}
}
.collect { println(it) }
侦探手中有两种应急方案:声明式的 catch 操作符和命令式的 try-catch。什么时候用哪种?
// 方式 1:catch 操作符(声明式,推荐用于 Flow 管道)
repo.getUsers()
.map { users -> users.filter { it.isActive } }
.catch { e -> emit(emptyList()) }
.collect { users -> _uiState.value = users }
// 方式 2:try-catch(命令式,适合包裹整个 collect)
try {
repo.getUsers().collect { users ->
_uiState.value = users
}
} catch (e: Exception) {
_uiState.value = emptyList()
}
推理结论:在 Flow 管道中优先使用 catch 操作符,它更符合证据链的声明式风格,且可以在 catch 中通过 emit 提交备用线索。当你需要捕获 collect 中的异常时 —— 也就是侦探在分析阶段可能出错 —— 使用 try-catch 包裹整个 collect 调用。两种方式可以结合使用,形成更严密的安全网。
当线索的产生速度快于侦探的分析速度时,案头上的卷宗就会堆积如山 —— 这就是背压(backpressure)问题。Flow 默认是顺序的:emit 会挂起直到收集者处理完当前值,就像线人必须等侦探记完笔记才能继续汇报。这是安全的默认行为,但有时会拖慢整条推理链。
默认情况下,线人和侦探一问一答交替进行。buffer() 让线人可以持续汇报,侦探则在另一边同步整理 —— 两人并行工作,效率大增:
fun slowProducer(): Flow<Int> = flow {
for (i in 1..5) {
delay(100) // 每 100ms 生产一个值
emit(i)
}
}
// 无 buffer:总耗时约 1500ms (5 x 100ms生产 + 5 x 200ms消费)
slowProducer()
.collect { value ->
delay(200) // 每 200ms 消费一个值
println(value)
}
// 有 buffer:总耗时约 1100ms (生产和消费并发)
slowProducer()
.buffer()
.collect { value ->
delay(200)
println(value)
}
conflate() 在侦探忙碌时,只保留最新的线索,跳过中间来不及分析的情报 —— 因为在很多案件中,过时的情报已经没有价值了:
flow {
for (i in 1..10) {
emit(i)
delay(50) // 快速生产
}
}
.conflate()
.collect { value ->
delay(200) // 慢速消费
println("处理: $value")
}
// 可能的输出:
// 处理: 1
// 处理: 4
// 处理: 8
// 处理: 10
// (中间的值被跳过了)
collectLatest 更加激进:新线索一到,立即中断对旧线索的分析,全力投入最新的线索 —— 就像嫌疑人突然现身,侦探必须丢下手头的案卷立即出动:
flow {
emit("A")
delay(100)
emit("B")
delay(100)
emit("C")
}
.collectLatest { value ->
println("开始处理 $value")
delay(300) // 模拟耗时处理
println("完成处理 $value") // A 和 B 的这行不会执行
}
// 输出:
// 开始处理 A
// 开始处理 B (A 的处理被取消了)
// 开始处理 C (B 的处理被取消了)
// 完成处理 C (只有最后一个值完整处理)
三种策略的对比鉴定如下:
| 策略 | 行为 | 适用场景 |
|---|---|---|
buffer() | 缓冲值,生产消费并发 | 所有值都必须处理,但不需要按顺序等待 |
conflate() | 跳过中间值,保留最新 | 只关心最新状态(如传感器数据、股价) |
collectLatest | 新值来时取消旧处理 | 搜索建议、实时预览等场景 |
推理逻辑很简单:如果每条线索都至关重要(如订单处理),用 buffer() 确保一条不漏。如果只关心最新动态(如 UI 刷新),用 conflate() 抓住重点。如果旧的分析结果已经失去意义(如搜索),用 collectLatest 果断弃旧追新。
终于来到案件的高潮 —— 这是我们设下陷阱、收网抓人的时刻。搜索框是 Flow 最经典的实战场景:用户每输入一个字符就触发搜索请求,就像嫌疑人每走一步你就冲上去拦截一样荒唐。我们需要的是防抖(debounce) —— 等待最佳时机,在嫌疑人停下脚步时一举收网。这个例子综合运用了本章所有的鉴证工具,形成一条环环相扣的完整推理链。
data class SearchState(
val query: String = "",
val results: List<String> = emptyList(),
val isSearching: Boolean = false,
val error: String? = null
)
class SearchViewModel(
private val searchRepository: SearchRepository
) : ViewModel() {
private val _searchQuery = MutableStateFlow("")
val searchState: StateFlow<SearchState> = _searchQuery
.debounce(300) // 防抖:等待 300ms 确认嫌疑人停下
.distinctUntilChanged() // 去重:同样的线索不重复追踪
.flatMapLatest { query -> // 新线索来了,放弃旧的追踪
if (query.isBlank()) {
// 空搜索词:没有线索,不出动
flowOf(SearchState(query = query))
} else {
flow {
// 发射加载中状态
emit(SearchState(
query = query,
isSearching = true
))
// 执行搜索
val results = searchRepository.search(query)
emit(SearchState(
query = query,
results = results,
isSearching = false
))
}
.catch { e ->
emit(SearchState(
query = query,
error = e.message ?: "搜索失败",
isSearching = false
))
}
}
}
.flowOn(Dispatchers.IO) // 外勤在 IO 线程执行
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = SearchState()
)
fun onQueryChanged(query: String) {
_searchQuery.value = query
}
}
@Composable
fun SearchScreen(viewModel: SearchViewModel) {
val state by viewModel.searchState.collectAsStateWithLifecycle()
Column(
modifier = Modifier
.fillMaxSize()
.padding(16.dp)
) {
// 搜索输入框
OutlinedTextField(
value = state.query,
onValueChange = { viewModel.onQueryChanged(it) },
label = { Text("搜索") },
modifier = Modifier.fillMaxWidth(),
singleLine = true
)
Spacer(modifier = Modifier.height(16.dp))
// 根据状态显示不同内容
when {
state.isSearching -> {
CircularProgressIndicator(
modifier = Modifier.align(Alignment.CenterHorizontally)
)
}
state.error != null -> {
Text(
text = "错误: ${state.error}",
color = MaterialTheme.colorScheme.error
)
}
state.results.isEmpty() && state.query.isNotBlank() -> {
Text("没有找到相关结果")
}
else -> {
LazyColumn {
items(state.results) { result ->
Text(
text = result,
modifier = Modifier
.fillMaxWidth()
.padding(vertical = 12.dp)
)
}
}
}
}
}
}
让我们像侦探复盘案件一样,追踪用户快速输入"Kotlin"时的完整证据链:
_searchQuery.value = "K"_searchQuery.value = "Ko" (在 300ms 内)_searchQuery.value = "Kot" (在 300ms 内)_searchQuery.value = "Kotl" (在 300ms 内)_searchQuery.value = "Kotli" (在 300ms 内)_searchQuery.value = "Kotlin"SearchState(query = "Kotlin", isSearching = true) —— 调查中SearchState(query = "Kotlin", results = [...]) —— 破案整个过程中只发起了一次网络请求,而不是六次 —— 这就是精准打击的力量。如果用户在搜索过程中修改了搜索词,flatMapLatest 还会自动取消正在进行的旧调查,立即转向新目标。环环相扣,滴水不漏。
各位侦探,本案到此结案。让我们回顾整条推理链:Flow 是 Kotlin 协程生态中追踪异步数据流的核心工具。冷流(Flow)是等待被打开的悬案卷宗,热流(StateFlow / SharedFlow)是实时案情看板和紧急通报系统。掌握 flowOn(调配外勤地点)、catch(设置安全网)、debounce(等待最佳时机)、flatMapLatest(果断追踪最新线索)等关键鉴证工具,配合 collectAsStateWithLifecycle 在 Compose 中安全收集,就能覆盖 Android 开发中绝大多数的异步数据流场景。真相只有一个 —— Flow 就是答案。
创建一个 Flow,模拟侦探每秒接收一条新线索。该 Flow 应该每隔 1 秒发射一次当前时间戳(System.currentTimeMillis()),总共发射 5 次。然后在 main 函数中使用 runBlocking 收集并打印每条线索。
使用 flow { } 构建器,在循环中调用 delay(1000) 和 emit()。记住:Flow 是冷流,只有在 collect 时才会开始执行。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun clueFlow(): Flow<Long> = flow {
for (i in 1..5) {
delay(1000)
emit(System.currentTimeMillis())
}
}
fun main() = runBlocking {
clueFlow().collect { timestamp ->
println("线索到达: $timestamp")
}
println("所有线索已收集完毕")
}
给定一个整数 Flow (1..20).asFlow(),请用操作符完成以下证据筛选与变换:(1) 过滤出能被 3 整除的数字;(2) 将每个数字平方;(3) 只取前 3 个结果;(4) 最终打印每个值。请写出完整的操作符链。
证据链的顺序很重要:先 filter 筛选,再 map 变换,然后 take 限量,最后 collect 结案。思考一下:如果把 take 放在 filter 前面,结果会不同吗?
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
(1..20).asFlow()
.filter { it % 3 == 0 } // 3, 6, 9, 12, 15, 18
.map { it * it } // 9, 36, 81, 144, 225, 324
.take(3) // 9, 36, 81
.collect { value ->
println("鉴定结果: $value")
}
}
// 输出:
// 鉴定结果: 9
// 鉴定结果: 36
// 鉴定结果: 81
创建一个 TaskViewModel,使用 MutableStateFlow 管理一个待办任务列表。实现以下功能:(1) 定义一个 data class Task(val id: Int, val title: String, val isDone: Boolean);(2) 使用 MutableStateFlow<List<Task>> 存储任务列表;(3) 实现 addTask(title: String) 方法添加新任务;(4) 实现 toggleTask(id: Int) 方法切换任务完成状态。注意使用 update 方法确保线程安全。
使用 _tasks.update { currentList -> ... } 进行原子更新。添加任务时,可以用 currentList + newTask 创建新列表。切换状态时,用 map 遍历列表找到目标任务并 copy。记住 StateFlow 使用 equals 去重,所以必须创建新列表对象。
import kotlinx.coroutines.flow.*
data class Task(
val id: Int,
val title: String,
val isDone: Boolean = false
)
class TaskViewModel {
private var nextId = 1
private val _tasks = MutableStateFlow<List<Task>>(emptyList())
val tasks: StateFlow<List<Task>> = _tasks.asStateFlow()
fun addTask(title: String) {
_tasks.update { currentList ->
currentList + Task(
id = nextId++,
title = title
)
}
}
fun toggleTask(id: Int) {
_tasks.update { currentList ->
currentList.map { task ->
if (task.id == id) {
task.copy(isDone = !task.isDone)
} else {
task
}
}
}
}
}
编写一个 Flow,模拟从不稳定的网络接口获取数据:每次 emit 之前有 30% 概率抛出 IOException("网络超时")。要求:(1) 使用 retry(3) 自动重试最多 3 次;(2) 如果 3 次重试后仍然失败,使用 catch 捕获异常并发射一个默认值 "离线数据";(3) 使用 onCompletion 打印完成状态。
使用 kotlin.random.Random.nextDouble() 来模拟失败概率。操作符链的顺序应该是:flow { ... }.retry(3).catch { ... }.onCompletion { ... }.collect { ... }。注意 retry 要放在 catch 前面,这样重试耗尽后异常才会落入 catch。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.io.IOException
import kotlin.random.Random
fun unstableNetworkFlow(): Flow<String> = flow {
println("尝试获取数据...")
if (Random.nextDouble() < 0.3) {
throw IOException("网络超时")
}
emit("服务器数据: ${System.currentTimeMillis()}")
}
fun main() = runBlocking {
unstableNetworkFlow()
.retry(3) { cause ->
println("遇到异常: ${cause.message},准备重试...")
delay(500) // 重试前等待
cause is IOException
}
.catch { e ->
println("重试耗尽,使用备用数据: ${e.message}")
emit("离线数据")
}
.onCompletion { cause ->
if (cause == null) {
println("调查正常完成")
} else {
println("调查异常终止: ${cause.message}")
}
}
.collect { data ->
println("最终获得: $data")
}
}
模拟一个天气监控面板:创建两个独立的 Flow,一个每 2 秒发射一次温度数据(从列表 [22, 25, 21, 28, 24] 中依次取),另一个每 3 秒发射一次风速数据(从列表 [3, 7, 5, 12] 中依次取)。使用 combine 将两者合并,实时输出形如 "温度: 25°C | 风速: 7m/s | 状态: 正常" 的监控信息。规则:温度超过 27 或风速超过 10 时状态为 "预警",否则为 "正常"。
分别用 flow { } 创建两个 Flow,在循环中依次 emit 列表中的值,每次 emit 后 delay 对应的时间。使用 combine 时,任意一方发射新值都会触发组合。在 combine 的 lambda 中判断温度和风速是否超标。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun temperatureFlow(): Flow<Int> = flow {
val temps = listOf(22, 25, 21, 28, 24)
for (temp in temps) {
emit(temp)
delay(2000)
}
}
fun windSpeedFlow(): Flow<Int> = flow {
val winds = listOf(3, 7, 5, 12)
for (wind in winds) {
emit(wind)
delay(3000)
}
}
fun main() = runBlocking {
temperatureFlow()
.combine(windSpeedFlow()) { temp, wind ->
val status = if (temp > 27 || wind > 10) "预警" else "正常"
"温度: ${temp}°C | 风速: ${wind}m/s | 状态: $status"
}
.collect { info ->
println(info)
}
}