第12章:Flow 响应式编程

用 Kotlin 原生的方式处理异步数据流 —— 从冷流到热流,从操作符到实战

🔍

柯南

链式推理,每步推导下一步

真相永远只有一个,而通往真相的路径,是一条环环相扣的推理链。Flow 就是我们追踪异步数据真相的证据链 —— 每个操作符都是一件鉴证工具,每次 emit 都是一条新线索,而 collect 则是最终的破案时刻。跟我一起,沿着这条证据链,一步步推导出数据流的全貌吧。

12.1 为什么需要 Flow

各位侦探,在上一章中我们掌握了挂起函数和协程 —— 这些是我们手中的基础调查工具。挂起函数就像一次性的现场取证:去犯罪现场,采集一份指纹,得到一个结果。但真正复杂的案件远非如此简单。很多案件需要持续追踪:数据库的实时变更就像嫌疑人的行踪轨迹、传感器的持续读数就像监控录像、用户输入的连续事件就像线人的持续情报、WebSocket 推送的消息就像窃听到的通讯内容......

挂起函数的局限

挂起函数只能返回一个值 —— 就像派侦探出去调查,只能带回一份报告。如果你想要连续的情报,就只能让他把所有情报攒齐了再一次性汇报,但这样你就错过了实时追踪的最佳时机:

// 挂起函数:只能返回一个结果
suspend fun fetchUser(): User {
    return api.getUser()  // 一次请求,一个结果
}

// 想返回多个值?只能用 List,但必须等全部完成
suspend fun fetchAllPrices(): List<Double> {
    val prices = mutableListOf<Double>()
    for (i in 1..100) {
        prices.add(getPrice(i))  // 必须全部拿到才能返回
    }
    return prices
}

推理链的第一环就在这里断裂了:调用者必须等到所有数据都准备好才能开始分析,而且无法表达"持续追踪"的场景 —— 比如嫌疑人的位置每秒都在变化,你不能等他停下来才开始跟踪。

LiveData 的局限

有的侦探可能会说:"我们不是有 LiveData 这个线人网络吗?"确实,LiveData 可以持续推送情报,但这个情报网有明显的短板:

Flow:Kotlin 原生的响应式流

推理到这里,证据链指向了一个结论:我们需要一个既能持续追踪数据、又能与协程无缝配合的工具。Flow 就是 Kotlin 协程库提供的异步数据流解决方案 —— 它就是我们的终极侦查系统,结合了协程的简洁语法和响应式流的强大追踪能力:

// Flow:可以按需逐个发射值
fun priceUpdates(): Flow<Double> = flow {
    while (true) {
        val price = fetchLatestPrice()
        emit(price)  // 每获取一个值就发射出去
        delay(1000)  // 每秒更新一次
    }
}

冷流与热流

在侦查的世界里,案件分为两种:

核心概念

这里有一条关键的推理:Flow 的设计哲学是"按需调查"。冷流只在被收集(collect)时才启动调查,这意味着如果没有侦探关注这条线索,就不会浪费任何资源去追踪它。这与 Kotlin 的序列(Sequence)理念环环相扣,只不过 Flow 是为异步侦查场景设计的。

12.2 Cold Flow 基础

用 flow { } 创建流

好,让我们打开第一份悬案卷宗。创建 Flow 最基本的方式是使用 flow { } 构建器 —— 这就是建立一份案件档案。在 lambda 中通过 emit() 提交线索:

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*

fun simpleFlow(): Flow<Int> = flow {
    println("Flow 开始执行")
    for (i in 1..3) {
        delay(100)  // 模拟异步操作
        emit(i)     // 发射值
        println("已发射 $i")
    }
}

fun main() = runBlocking {
    println("调用 simpleFlow()...")
    val flow = simpleFlow()  // 这里什么都不会发生!
    println("准备收集...")
    flow.collect { value ->
        println("收到: $value")
    }
}

运行结果:

调用 simpleFlow()...
准备收集...
Flow 开始执行
已发射 1
收到: 1
已发射 2
收到: 2
已发射 3
收到: 3

注意这条证据链的关键细节:调用 simpleFlow() 时没有任何打印 —— 悬案卷宗只是被取出来了,还没有人翻开它。直到 collect 被调用,侦探才真正开始逐条审阅线索。这就是冷流的核心特性 —— 没有侦探接手,案件就不会启动。

collect 是终端操作符

collect 是一个挂起函数,它就像侦探正式接手案件 —— 触发整个调查流程并逐条接收线索。在 collect 返回之前,当前协程会一直挂起。这意味着 collect 之后的代码要等到案件完全结案后才会执行:

runBlocking {
    simpleFlow().collect { value ->
        println("收到: $value")
    }
    println("Flow 已完成")  // 在所有值都收集完后才打印
}

便捷构建器

除了 flow { } 这种"从零建档"的方式,Kotlin 还提供了一些快速立案的便捷方法:

// flowOf:从已知线索直接建档
val numbersFlow = flowOf(1, 2, 3, 4, 5)

// asFlow:将已有证据集转换为案件流
val listFlow = listOf("A", "B", "C").asFlow()
val rangeFlow = (1..10).asFlow()

// 使用示例
runBlocking {
    flowOf("苹果", "香蕉", "橘子").collect { fruit ->
        println("水果: $fruit")
    }
}

每个收集者都是独立的

冷流的一个重要推理要点:每次调用 collect,就像一位新侦探从头翻开卷宗,独立展开调查。多位侦探之间互不干扰,各自得出自己的发现:

fun timestampFlow(): Flow<Long> = flow {
    emit(System.currentTimeMillis())
    delay(100)
    emit(System.currentTimeMillis())
}

runBlocking {
    val flow = timestampFlow()

    println("=== 第一次收集 ===")
    flow.collect { println(it) }

    delay(500)

    println("=== 第二次收集 ===")
    flow.collect { println(it) }  // 完全独立,时间戳不同
}
注意

flow { } 构建器中的代码直到被 collect 时才执行,并且每次 collect 都会重新执行 —— 每位侦探都是从头调查。如果你在 flow 构建器中做了网络请求,每次 collect 都会发一次请求。如果你希望多个侦探共享同一份实时情报,应该使用热流(StateFlow / SharedFlow),我们将在 12.5 节中揭开这层真相。

12.3 Flow 操作符

现在,推理链进入最精彩的部分。Flow 的强大之处在于它提供了一整套鉴证工具来变换、组合和处理数据流。每一个操作符就是证据链上的一环,环环相扣,最终指向真相。而且这些工具和 Kotlin 集合操作符的命名非常相似,学习成本很低。

变换操作符

map —— 证据变换工具。对每条线索进行一对一的鉴定转换,就像对指纹进行比对分析:

flowOf(1, 2, 3)
    .map { it * it }  // 1, 4, 9
    .collect { println(it) }

filter —— 证据筛选工具。把不符合调查方向的线索过滤掉,只保留有价值的证据:

(1..20).asFlow()
    .filter { it % 2 == 0 }  // 只保留偶数
    .collect { println(it) }  // 2, 4, 6, 8, ..., 20

transform —— 最灵活的鉴证工具。可以从一条线索推导出任意数量的新线索,就像从一个证物中提取出多种鉴定信息:

flowOf("张三", "李四", "王五")
    .transform { name ->
        emit("正在处理: $name")
        val result = processName(name)  // 模拟耗时操作
        emit("处理完成: $result")
    }
    .collect { println(it) }

// 输出:
// 正在处理: 张三
// 处理完成: ZHANG SAN
// 正在处理: 李四
// ...

take —— 限量取证。只取前 N 条线索就结案,就像侦探只需要关键的几条证据就能锁定嫌疑人:

flow {
    var i = 0
    while (true) {
        emit(i++)
        delay(100)
    }
}
.take(5)  // 只取前 5 个值
.collect { println(it) }  // 0, 1, 2, 3, 4

组合操作符

复杂案件往往需要将多条独立的证据链进行交叉比对。这时就需要组合操作符 —— 它们就是我们的证据关联工具

zip —— 证据配对。将两条证据链严格一一配对,就像将嫌疑人名单和他们的不在场证明一一对应。任何一方的证据用完,配对就结束:

val names = flowOf("张三", "李四", "王五")
val ages = flowOf(25, 30, 28)

names.zip(ages) { name, age ->
    "$name, $age 岁"
}.collect { println(it) }

// 输出:
// 张三, 25 岁
// 李四, 30 岁
// 王五, 28 岁

combine —— 实时关联。将两条证据链动态组合,任意一方出现新情报时,立刻与另一方的最新情报进行关联分析 —— 就像实时追踪两个嫌疑人的位置,任何一方移动都要重新评估他们的关系:

val temperature = flow {
    emit(20)
    delay(300)
    emit(25)
    delay(300)
    emit(22)
}

val humidity = flow {
    emit(60)
    delay(500)
    emit(55)
}

temperature.combine(humidity) { temp, hum ->
    "温度: ${temp}°C, 湿度: ${hum}%"
}.collect { println(it) }

zip 和 combine 的区别是推理中的关键一环,必须厘清:

特性zipcombine
配对方式严格一一配对总是使用最新值组合
触发时机两边都有新值时任意一边有新值时
结束条件任一流结束两个流都结束
适用场景配对两组关联数据合并多个独立数据源

展平操作符

有时候,一条线索会引出一个全新的子案件 —— 也就是一个 Flow 发射的值本身也是 Flow(即 Flow<Flow<T>>)。这时需要展平操作符将嵌套的调查"拍平"成一条连贯的证据链。

flatMapConcat —— 顺序侦查。按顺序逐一追踪每条子线索,前一条调查完毕后才开启下一条,确保证据链不会混乱:

flowOf(1, 2, 3)
    .flatMapConcat { id ->
        flow {
            emit("开始处理 $id")
            delay(100)
            emit("完成处理 $id")
        }
    }
    .collect { println(it) }

// 严格顺序: 先处理完 1,再处理 2,再处理 3

flatMapMerge —— 并发侦查。同时派出多组侦探追踪所有子线索,结果交错汇报 —— 效率最高,但证据到达的顺序不可预测:

flowOf(1, 2, 3)
    .flatMapMerge { id ->
        flow {
            emit("开始处理 $id")
            delay(100)
            emit("完成处理 $id")
        }
    }
    .collect { println(it) }

// 并发执行: 所有任务同时开始,结果交错出现

flatMapLatest —— 追最新线索。每当新线索出现,立刻放弃正在追踪的旧线索,全力追踪最新的那条 —— 就像嫌疑人突然改变了逃跑方向,侦探必须立即调头:

flowOf("A", "B", "C")
    .flatMapLatest { letter ->
        flow {
            emit("处理 $letter - 第1步")
            delay(200)
            emit("处理 $letter - 第2步")  // 可能被取消
        }
    }
    .collect { println(it) }

// 只有最后一个 "C" 的两步都会完成

终端操作符

终端操作符是整条推理链的"结案陈词" —— 它们触发 Flow 的执行并产生最终结论:

val numbers = flowOf(1, 2, 3, 4, 5)

// collect:逐条审阅每一条证据
numbers.collect { println(it) }

// toList:把所有证据收集成案卷
val list: List<Int> = numbers.toList()

// first:取第一条关键证据(之后结案)
val first: Int = numbers.first()

// reduce:累积推理
val sum: Int = numbers.reduce { acc, value -> acc + value }
println("sum = $sum")  // sum = 15

// fold:带初始假设的累积推理
val product: Int = numbers.fold(1) { acc, value -> acc * value }
println("product = $product")  // product = 120
操作符链的执行模型

这条推理链的关键在于:Flow 的中间操作符(map、filter 等)不会触发执行,它们只是在构建一条完整的证据处理管道。只有终端操作符(collect、toList、first 等)被调用时 —— 也就是侦探正式接手案件时 —— 整个管道才会启动运转。这类似于 Java 的 Stream 或 Kotlin 的 Sequence,环环相扣但按需执行。

12.4 Flow 上下文与线程切换

在 Android 案件侦查中,我们经常需要让外勤侦探在现场(IO 线程)采集证据,然后把证据送回总部(主线程)进行分析展示。Flow 提供了优雅的"现场-总部"调度机制。

上下文保留规则

首先来追踪一条重要规则:flow { } 构建器中的代码默认运行在收集者所在的协程上下文中。这叫做"上下文保留"(context preservation)—— 就像侦探默认在接手案件的办公室里工作:

fun myFlow(): Flow<Int> = flow {
    println("Flow 运行在: ${Thread.currentThread().name}")
    emit(1)
}

runBlocking {
    myFlow().collect { value ->
        println("收集在: ${Thread.currentThread().name}")
    }
}
// Flow 运行在: main
// 收集在: main

flowOn:切换上游的执行上下文

flowOn 是 Flow 中用于切换调查地点的标准指令。它改变的是上游(flowOn 之前的操作)的执行位置 —— 就像命令外勤侦探去犯罪现场取证,取回的证据再送到总部处理:

fun loadDataFlow(): Flow<String> = flow {
    // 这部分在 IO 线程执行
    println("发射数据在: ${Thread.currentThread().name}")
    val data = loadFromDatabase()
    emit(data)
}.flowOn(Dispatchers.IO)  // 将上游切换到 IO 调度器

// 在 ViewModel 中使用
fun observeData() {
    viewModelScope.launch {
        // collect 在主线程(viewModelScope 默认在主线程)
        loadDataFlow().collect { data ->
            println("收集数据在: ${Thread.currentThread().name}")
            _uiState.value = data  // 安全地更新 UI 状态
        }
    }
}

为什么不能在 flow { } 中使用 withContext

你可能会有一个大胆的推理:"在 flow 构建器里用 withContext 切换线程不就行了?"但这条推理链是断裂的 —— 不行。Flow 禁止在 flow 构建器中使用 withContext 来发射值,这会抛出异常:

// 错误!会抛出 IllegalStateException
fun badFlow(): Flow<Int> = flow {
    withContext(Dispatchers.IO) {
        emit(1)  // 禁止!不能从不同的上下文发射
    }
}

// 正确:使用 flowOn
fun goodFlow(): Flow<Int> = flow {
    val data = heavyComputation()
    emit(data)
}.flowOn(Dispatchers.Default)
为什么有这个限制?

推理到深层原因:这是为了保证证据链的完整性(线程安全)。emit 不是线程安全的,如果允许从任意地点提交线索,就可能导致证据链出现并发篡改的混乱。flowOn 在内部通过 Channel 来安全地跨线程传递数据,就像用密封的证据袋在现场和实验室之间传递证物,所以是安全的。

实际场景:从 IO 发射,在 Main 收集

这是 Android 侦查中最经典的模式:外勤在 IO 现场取证,总部在主线程分析展示:

class UserRepository(private val dao: UserDao) {

    fun getUsers(): Flow<List<User>> = flow {
        while (true) {
            val users = dao.queryAllUsers()  // 数据库操作
            emit(users)
            delay(5000)  // 每 5 秒刷新
        }
    }.flowOn(Dispatchers.IO)  // 数据库操作在 IO 线程
}

class UserViewModel(private val repo: UserRepository) : ViewModel() {

    val users: StateFlow<List<User>> = repo.getUsers()
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )
}

12.5 StateFlow 与 SharedFlow

推理链到了转折点。普通的 Flow 是悬案卷宗(冷流),每位侦探独立从头调查。但很多场景需要一块实时案情看板 —— 所有侦探共享同一份最新情报,新来的侦探一看就知道案件目前的状况。这就是热流登场的时候了。

StateFlow:有状态的热流

StateFlow 是一种特殊的热流 —— 它就是侦探总部的实时案情看板。它始终展示着当前最新的案情状态,任何新到的侦探一抬头就能看到最新进展。它是 LiveData 在协程世界的替代品:

class CounterViewModel : ViewModel() {

    // MutableStateFlow:可修改的案情看板
    private val _count = MutableStateFlow(0)

    // 对外暴露只读的案情看板
    val count: StateFlow<Int> = _count.asStateFlow()

    fun increment() {
        _count.value++  // 直接修改 value 属性
    }

    fun decrement() {
        _count.value--
    }

    fun updateSafely() {
        _count.update { current ->
            current + 1  // 原子更新,线程安全
        }
    }
}

StateFlow 的三个核心特性,是我们推理的关键证据:

在 ViewModel 中使用 StateFlow

StateFlow 是 ViewModel 管理 UI 状态的最佳选择 —— 就像案件指挥中心的核心看板:

data class UiState(
    val isLoading: Boolean = false,
    val items: List<String> = emptyList(),
    val error: String? = null
)

class ItemViewModel(private val repo: ItemRepository) : ViewModel() {

    private val _uiState = MutableStateFlow(UiState())
    val uiState: StateFlow<UiState> = _uiState.asStateFlow()

    fun loadItems() {
        viewModelScope.launch {
            _uiState.update { it.copy(isLoading = true) }
            try {
                val items = repo.fetchItems()
                _uiState.update {
                    it.copy(isLoading = false, items = items)
                }
            } catch (e: Exception) {
                _uiState.update {
                    it.copy(isLoading = false, error = e.message)
                }
            }
        }
    }
}

SharedFlow:无状态的事件流

SharedFlow 适合表示一次性的紧急通报(如导航、Toast、Snackbar)—— 它不是持续展示的案情看板,而是一声警报,听到就行动,没听到就错过。它没有初始值,可以配置回放(replay):

class EventViewModel : ViewModel() {

    // 无回放:新来的侦探不会收到之前的通报
    private val _events = MutableSharedFlow<UiEvent>()
    val events: SharedFlow<UiEvent> = _events.asSharedFlow()

    fun onItemClicked(id: String) {
        viewModelScope.launch {
            _events.emit(UiEvent.NavigateToDetail(id))
        }
    }

    fun onError(message: String) {
        viewModelScope.launch {
            _events.emit(UiEvent.ShowSnackbar(message))
        }
    }
}

sealed class UiEvent {
    data class NavigateToDetail(val id: String) : UiEvent()
    data class ShowSnackbar(val message: String) : UiEvent()
}

对比表:Flow vs StateFlow vs SharedFlow vs LiveData

每位优秀的侦探都要学会对比证据。让我们把这四种数据流工具放在一起,进行一次全面的交叉鉴定:

特性FlowStateFlowSharedFlowLiveData
冷/热冷流热流热流热流
有无初始值必须有可选(replay)可选
去重是(equals)
多收集者各自独立共享最新值共享事件共享最新值
生命周期感知否(需配合)否(需配合)
操作符丰富丰富丰富极少
线程控制灵活灵活灵活仅主线程分发
适用场景数据库查询、网络流UI 状态一次性事件简单 UI 状态

在 Compose 中收集 Flow

在 Jetpack Compose 中,使用 collectAsStateWithLifecycle 来安全地收集 Flow —— 它会在界面不可见时自动暂停追踪,就像侦探下班后暂停监控,避免白白浪费精力:

@Composable
fun ItemListScreen(viewModel: ItemViewModel) {
    // 推荐:生命周期感知的收集
    val uiState by viewModel.uiState.collectAsStateWithLifecycle()

    when {
        uiState.isLoading -> LoadingIndicator()
        uiState.error != null -> ErrorMessage(uiState.error!!)
        else -> ItemList(uiState.items)
    }
}

@Composable
fun EventHandler(viewModel: EventViewModel) {
    val context = LocalContext.current

    // 收集一次性事件
    LaunchedEffect(Unit) {
        viewModel.events.collect { event ->
            when (event) {
                is UiEvent.ShowSnackbar -> {
                    // 显示 Snackbar
                }
                is UiEvent.NavigateToDetail -> {
                    // 导航到详情页
                }
            }
        }
    }
}
collectAsState vs collectAsStateWithLifecycle

collectAsState 在组合存续期间始终活跃地收集 Flow —— 就像侦探 24 小时不间断地盯着监控。而 collectAsStateWithLifecycle(来自 lifecycle-runtime-compose 库)会在界面进入后台时自动暂停收集。在 Android 开发中,始终优先使用 collectAsStateWithLifecycle,让侦探在没有必要时好好休息,以避免在后台浪费 CPU 和网络资源。

12.6 Flow 错误处理

任何一条严谨的推理链都必须考虑意外情况 —— 线索断裂、证人翻供、现场被破坏。生产环境中的 Flow 同样必须妥善处理错误。Flow 提供了声明式的错误处理操作符,让你可以像搭建推理链一样,环环相扣地组合错误处理逻辑。

catch 操作符

catch 操作符可以捕获上游的异常(即 catch 之前的所有操作)—— 就像在证据链的某个环节设置一道安全网,拦截上游出现的任何问题。它不会捕获下游(catch 之后)的异常:

flow {
    emit(1)
    emit(2)
    throw RuntimeException("出错了")
    emit(3)  // 永远不会执行
}
.catch { e ->
    println("捕获到异常: ${e.message}")
    emit(-1)  // 可以在 catch 中发射备用值
}
.collect { println(it) }

// 输出:
// 1
// 2
// 捕获到异常: 出错了
// -1

推理中的一个关键细节:catch 只捕获上游的异常。如果异常发生在 collect 的 lambda 中 —— 也就是侦探在分析证据时自己犯了错 —— catch 是捕获不到的:

// catch 无法捕获 collect 中的异常!
flowOf(1, 2, 3)
    .catch { println("不会捕获到 collect 中的异常") }
    .collect { value ->
        if (value == 2) throw RuntimeException("崩溃")  // 这个不会被 catch 捕获
        println(value)
    }

// 解决方案:把 collect 的逻辑移到 onEach 中,让它变成上游
flowOf(1, 2, 3)
    .onEach { value ->
        if (value == 2) throw RuntimeException("崩溃")
        println(value)
    }
    .catch { e -> println("捕获到: ${e.message}") }
    .collect()

onCompletion 操作符

onCompletion 在 Flow 结案时(无论是正常结案还是因异常终止)执行 —— 类似于案件归档程序,不管破案与否都要走这一步:

flowOf(1, 2, 3)
    .onEach { println("处理: $it") }
    .onCompletion { cause ->
        if (cause != null) {
            println("Flow 异常完成: ${cause.message}")
        } else {
            println("Flow 正常完成")
        }
    }
    .collect()

retry 和 retryWhen

追踪嫌疑人时,第一次跟丢了不代表案件终结 —— 我们需要重试。网络请求同样经常需要重试机制。Flow 提供了两个重试操作符:

// retry:简单重试,指定次数
flow {
    val response = api.fetchData()
    emit(response)
}
.retry(3) { cause ->
    cause is IOException  // 只对 IO 异常重试
}
.catch { e -> println("3次重试后仍然失败: ${e.message}") }
.collect { println(it) }

// retryWhen:精细控制重试逻辑,像制定详细的追踪方案
flow {
    val response = api.fetchData()
    emit(response)
}
.retryWhen { cause, attempt ->
    if (cause is IOException && attempt < 3) {
        val delayMs = (1000L * (1 shl attempt.toInt()))  // 指数退避
        println("第 ${attempt + 1} 次重试,等待 ${delayMs}ms")
        delay(delayMs)
        true   // 返回 true 表示重试
    } else {
        false  // 返回 false 表示放弃
    }
}
.collect { println(it) }

catch vs try-catch 对比

侦探手中有两种应急方案:声明式的 catch 操作符和命令式的 try-catch。什么时候用哪种?

// 方式 1:catch 操作符(声明式,推荐用于 Flow 管道)
repo.getUsers()
    .map { users -> users.filter { it.isActive } }
    .catch { e -> emit(emptyList()) }
    .collect { users -> _uiState.value = users }

// 方式 2:try-catch(命令式,适合包裹整个 collect)
try {
    repo.getUsers().collect { users ->
        _uiState.value = users
    }
} catch (e: Exception) {
    _uiState.value = emptyList()
}
最佳实践

推理结论:在 Flow 管道中优先使用 catch 操作符,它更符合证据链的声明式风格,且可以在 catch 中通过 emit 提交备用线索。当你需要捕获 collect 中的异常时 —— 也就是侦探在分析阶段可能出错 —— 使用 try-catch 包裹整个 collect 调用。两种方式可以结合使用,形成更严密的安全网。

12.7 背压与性能优化

当线索的产生速度快于侦探的分析速度时,案头上的卷宗就会堆积如山 —— 这就是背压(backpressure)问题。Flow 默认是顺序的:emit 会挂起直到收集者处理完当前值,就像线人必须等侦探记完笔记才能继续汇报。这是安全的默认行为,但有时会拖慢整条推理链。

buffer() —— 并发生产和消费

默认情况下,线人和侦探一问一答交替进行。buffer() 让线人可以持续汇报,侦探则在另一边同步整理 —— 两人并行工作,效率大增:

fun slowProducer(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100)  // 每 100ms 生产一个值
        emit(i)
    }
}

// 无 buffer:总耗时约 1500ms (5 x 100ms生产 + 5 x 200ms消费)
slowProducer()
    .collect { value ->
        delay(200)  // 每 200ms 消费一个值
        println(value)
    }

// 有 buffer:总耗时约 1100ms (生产和消费并发)
slowProducer()
    .buffer()
    .collect { value ->
        delay(200)
        println(value)
    }

conflate() —— 跳过中间值

conflate() 在侦探忙碌时,只保留最新的线索,跳过中间来不及分析的情报 —— 因为在很多案件中,过时的情报已经没有价值了:

flow {
    for (i in 1..10) {
        emit(i)
        delay(50)  // 快速生产
    }
}
.conflate()
.collect { value ->
    delay(200)  // 慢速消费
    println("处理: $value")
}

// 可能的输出:
// 处理: 1
// 处理: 4
// 处理: 8
// 处理: 10
// (中间的值被跳过了)

collectLatest —— 取消慢速处理

collectLatest 更加激进:新线索一到,立即中断对旧线索的分析,全力投入最新的线索 —— 就像嫌疑人突然现身,侦探必须丢下手头的案卷立即出动:

flow {
    emit("A")
    delay(100)
    emit("B")
    delay(100)
    emit("C")
}
.collectLatest { value ->
    println("开始处理 $value")
    delay(300)  // 模拟耗时处理
    println("完成处理 $value")  // A 和 B 的这行不会执行
}

// 输出:
// 开始处理 A
// 开始处理 B   (A 的处理被取消了)
// 开始处理 C   (B 的处理被取消了)
// 完成处理 C   (只有最后一个值完整处理)

背压策略选择

三种策略的对比鉴定如下:

策略行为适用场景
buffer()缓冲值,生产消费并发所有值都必须处理,但不需要按顺序等待
conflate()跳过中间值,保留最新只关心最新状态(如传感器数据、股价)
collectLatest新值来时取消旧处理搜索建议、实时预览等场景
如何选择?

推理逻辑很简单:如果每条线索都至关重要(如订单处理),用 buffer() 确保一条不漏。如果只关心最新动态(如 UI 刷新),用 conflate() 抓住重点。如果旧的分析结果已经失去意义(如搜索),用 collectLatest 果断弃旧追新。

12.8 实战:搜索框防抖

终于来到案件的高潮 —— 这是我们设下陷阱、收网抓人的时刻。搜索框是 Flow 最经典的实战场景:用户每输入一个字符就触发搜索请求,就像嫌疑人每走一步你就冲上去拦截一样荒唐。我们需要的是防抖(debounce) —— 等待最佳时机,在嫌疑人停下脚步时一举收网。这个例子综合运用了本章所有的鉴证工具,形成一条环环相扣的完整推理链。

需求分析

完整实现

data class SearchState(
    val query: String = "",
    val results: List<String> = emptyList(),
    val isSearching: Boolean = false,
    val error: String? = null
)

class SearchViewModel(
    private val searchRepository: SearchRepository
) : ViewModel() {

    private val _searchQuery = MutableStateFlow("")

    val searchState: StateFlow<SearchState> = _searchQuery
        .debounce(300)                // 防抖:等待 300ms 确认嫌疑人停下
        .distinctUntilChanged()       // 去重:同样的线索不重复追踪
        .flatMapLatest { query ->     // 新线索来了,放弃旧的追踪
            if (query.isBlank()) {
                // 空搜索词:没有线索,不出动
                flowOf(SearchState(query = query))
            } else {
                flow {
                    // 发射加载中状态
                    emit(SearchState(
                        query = query,
                        isSearching = true
                    ))
                    // 执行搜索
                    val results = searchRepository.search(query)
                    emit(SearchState(
                        query = query,
                        results = results,
                        isSearching = false
                    ))
                }
                .catch { e ->
                    emit(SearchState(
                        query = query,
                        error = e.message ?: "搜索失败",
                        isSearching = false
                    ))
                }
            }
        }
        .flowOn(Dispatchers.IO)         // 外勤在 IO 线程执行
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = SearchState()
        )

    fun onQueryChanged(query: String) {
        _searchQuery.value = query
    }
}

Compose UI 层

@Composable
fun SearchScreen(viewModel: SearchViewModel) {
    val state by viewModel.searchState.collectAsStateWithLifecycle()

    Column(
        modifier = Modifier
            .fillMaxSize()
            .padding(16.dp)
    ) {
        // 搜索输入框
        OutlinedTextField(
            value = state.query,
            onValueChange = { viewModel.onQueryChanged(it) },
            label = { Text("搜索") },
            modifier = Modifier.fillMaxWidth(),
            singleLine = true
        )

        Spacer(modifier = Modifier.height(16.dp))

        // 根据状态显示不同内容
        when {
            state.isSearching -> {
                CircularProgressIndicator(
                    modifier = Modifier.align(Alignment.CenterHorizontally)
                )
            }
            state.error != null -> {
                Text(
                    text = "错误: ${state.error}",
                    color = MaterialTheme.colorScheme.error
                )
            }
            state.results.isEmpty() && state.query.isNotBlank() -> {
                Text("没有找到相关结果")
            }
            else -> {
                LazyColumn {
                    items(state.results) { result ->
                        Text(
                            text = result,
                            modifier = Modifier
                                .fillMaxWidth()
                                .padding(vertical = 12.dp)
                        )
                    }
                }
            }
        }
    }
}

数据流分析

让我们像侦探复盘案件一样,追踪用户快速输入"Kotlin"时的完整证据链:

  1. 用户输入 "K" --> _searchQuery.value = "K"
  2. 用户继续输入 "Ko" --> _searchQuery.value = "Ko" (在 300ms 内)
  3. 用户继续输入 "Kot" --> _searchQuery.value = "Kot" (在 300ms 内)
  4. 用户继续输入 "Kotl" --> _searchQuery.value = "Kotl" (在 300ms 内)
  5. 用户继续输入 "Kotli" --> _searchQuery.value = "Kotli" (在 300ms 内)
  6. 用户输入 "Kotlin" 然后停下 --> _searchQuery.value = "Kotlin"
  7. 等待 300ms,debounce 确认嫌疑人(搜索词)已停下,放行 "Kotlin"
  8. distinctUntilChanged 核验:与之前的线索不同,确认是新情报,放行
  9. flatMapLatest 启动搜索 "Kotlin" 的调查
  10. 发射 SearchState(query = "Kotlin", isSearching = true) —— 调查中
  11. 搜索完成,发射 SearchState(query = "Kotlin", results = [...]) —— 破案

整个过程中只发起了一次网络请求,而不是六次 —— 这就是精准打击的力量。如果用户在搜索过程中修改了搜索词,flatMapLatest 还会自动取消正在进行的旧调查,立即转向新目标。环环相扣,滴水不漏。

本章小结

各位侦探,本案到此结案。让我们回顾整条推理链:Flow 是 Kotlin 协程生态中追踪异步数据流的核心工具。冷流(Flow)是等待被打开的悬案卷宗,热流(StateFlow / SharedFlow)是实时案情看板和紧急通报系统。掌握 flowOn(调配外勤地点)、catch(设置安全网)、debounce(等待最佳时机)、flatMapLatest(果断追踪最新线索)等关键鉴证工具,配合 collectAsStateWithLifecycle 在 Compose 中安全收集,就能覆盖 Android 开发中绝大多数的异步数据流场景。真相只有一个 —— Flow 就是答案。

本章练习

练习 1:建立第一条证据链 入门

创建一个 Flow,模拟侦探每秒接收一条新线索。该 Flow 应该每隔 1 秒发射一次当前时间戳(System.currentTimeMillis()),总共发射 5 次。然后在 main 函数中使用 runBlocking 收集并打印每条线索。

提示

使用 flow { } 构建器,在循环中调用 delay(1000)emit()。记住:Flow 是冷流,只有在 collect 时才会开始执行。

参考答案
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun clueFlow(): Flow<Long> = flow {
    for (i in 1..5) {
        delay(1000)
        emit(System.currentTimeMillis())
    }
}

fun main() = runBlocking {
    clueFlow().collect { timestamp ->
        println("线索到达: $timestamp")
    }
    println("所有线索已收集完毕")
}

练习 2:使用鉴证工具处理证据链 进阶

给定一个整数 Flow (1..20).asFlow(),请用操作符完成以下证据筛选与变换:(1) 过滤出能被 3 整除的数字;(2) 将每个数字平方;(3) 只取前 3 个结果;(4) 最终打印每个值。请写出完整的操作符链。

提示

证据链的顺序很重要:先 filter 筛选,再 map 变换,然后 take 限量,最后 collect 结案。思考一下:如果把 take 放在 filter 前面,结果会不同吗?

参考答案
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    (1..20).asFlow()
        .filter { it % 3 == 0 }   // 3, 6, 9, 12, 15, 18
        .map { it * it }          // 9, 36, 81, 144, 225, 324
        .take(3)                  // 9, 36, 81
        .collect { value ->
            println("鉴定结果: $value")
        }
}
// 输出:
// 鉴定结果: 9
// 鉴定结果: 36
// 鉴定结果: 81

练习 3:用 StateFlow 搭建案情看板 进阶

创建一个 TaskViewModel,使用 MutableStateFlow 管理一个待办任务列表。实现以下功能:(1) 定义一个 data class Task(val id: Int, val title: String, val isDone: Boolean);(2) 使用 MutableStateFlow<List<Task>> 存储任务列表;(3) 实现 addTask(title: String) 方法添加新任务;(4) 实现 toggleTask(id: Int) 方法切换任务完成状态。注意使用 update 方法确保线程安全。

提示

使用 _tasks.update { currentList -> ... } 进行原子更新。添加任务时,可以用 currentList + newTask 创建新列表。切换状态时,用 map 遍历列表找到目标任务并 copy。记住 StateFlow 使用 equals 去重,所以必须创建新列表对象。

参考答案
import kotlinx.coroutines.flow.*

data class Task(
    val id: Int,
    val title: String,
    val isDone: Boolean = false
)

class TaskViewModel {

    private var nextId = 1

    private val _tasks = MutableStateFlow<List<Task>>(emptyList())
    val tasks: StateFlow<List<Task>> = _tasks.asStateFlow()

    fun addTask(title: String) {
        _tasks.update { currentList ->
            currentList + Task(
                id = nextId++,
                title = title
            )
        }
    }

    fun toggleTask(id: Int) {
        _tasks.update { currentList ->
            currentList.map { task ->
                if (task.id == id) {
                    task.copy(isDone = !task.isDone)
                } else {
                    task
                }
            }
        }
    }
}

练习 4:为证据链设置安全网 进阶

编写一个 Flow,模拟从不稳定的网络接口获取数据:每次 emit 之前有 30% 概率抛出 IOException("网络超时")。要求:(1) 使用 retry(3) 自动重试最多 3 次;(2) 如果 3 次重试后仍然失败,使用 catch 捕获异常并发射一个默认值 "离线数据";(3) 使用 onCompletion 打印完成状态。

提示

使用 kotlin.random.Random.nextDouble() 来模拟失败概率。操作符链的顺序应该是:flow { ... }.retry(3).catch { ... }.onCompletion { ... }.collect { ... }。注意 retry 要放在 catch 前面,这样重试耗尽后异常才会落入 catch

参考答案
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.io.IOException
import kotlin.random.Random

fun unstableNetworkFlow(): Flow<String> = flow {
    println("尝试获取数据...")
    if (Random.nextDouble() < 0.3) {
        throw IOException("网络超时")
    }
    emit("服务器数据: ${System.currentTimeMillis()}")
}

fun main() = runBlocking {
    unstableNetworkFlow()
        .retry(3) { cause ->
            println("遇到异常: ${cause.message},准备重试...")
            delay(500)  // 重试前等待
            cause is IOException
        }
        .catch { e ->
            println("重试耗尽,使用备用数据: ${e.message}")
            emit("离线数据")
        }
        .onCompletion { cause ->
            if (cause == null) {
                println("调查正常完成")
            } else {
                println("调查异常终止: ${cause.message}")
            }
        }
        .collect { data ->
            println("最终获得: $data")
        }
}

练习 5:combine 交叉鉴定 挑战

模拟一个天气监控面板:创建两个独立的 Flow,一个每 2 秒发射一次温度数据(从列表 [22, 25, 21, 28, 24] 中依次取),另一个每 3 秒发射一次风速数据(从列表 [3, 7, 5, 12] 中依次取)。使用 combine 将两者合并,实时输出形如 "温度: 25°C | 风速: 7m/s | 状态: 正常" 的监控信息。规则:温度超过 27 或风速超过 10 时状态为 "预警",否则为 "正常"。

提示

分别用 flow { } 创建两个 Flow,在循环中依次 emit 列表中的值,每次 emit 后 delay 对应的时间。使用 combine 时,任意一方发射新值都会触发组合。在 combine 的 lambda 中判断温度和风速是否超标。

参考答案
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun temperatureFlow(): Flow<Int> = flow {
    val temps = listOf(22, 25, 21, 28, 24)
    for (temp in temps) {
        emit(temp)
        delay(2000)
    }
}

fun windSpeedFlow(): Flow<Int> = flow {
    val winds = listOf(3, 7, 5, 12)
    for (wind in winds) {
        emit(wind)
        delay(3000)
    }
}

fun main() = runBlocking {
    temperatureFlow()
        .combine(windSpeedFlow()) { temp, wind ->
            val status = if (temp > 27 || wind > 10) "预警" else "正常"
            "温度: ${temp}°C | 风速: ${wind}m/s | 状态: $status"
        }
        .collect { info ->
            println(info)
        }
}
« 上一章:泛型与异常处理 目录 下一章:Compose 动画与副作用 »