Skip to Content
指南流式输出与异步任务

流式输出与异步任务

到这一页之前,我们一直在用同步、局部、可预测的交互建立 Ansiq 的基本直觉:

  • signal
  • computed
  • effect
  • focus 与 routing
  • continuity contract

但很多真正的终端应用都有另一类需求:

  • 后台任务
  • 网络请求
  • 系统采样
  • AI / LLM 输出
  • 分段返回的长结果

也就是说,界面并不是“用户按一下键,我就更新一次”这么简单。
它还要面对:有些内容会在未来一点一点到达。

这一页要讲的,就是 Ansiq 里这件事应该怎么做。

先说结论:不要在后台任务里直接改 UI signal

这是最重要的一条规则。

Ansiq 现在的响应式句柄是线程绑定的,不能被安全地带进任意 tokio::spawn 任务里直接 set()
正确做法是:

  • 后台任务只负责产生消息
  • 消息通过 RuntimeHandle::emit(...) 回到 app
  • app 的 update(...) 再修改自己的状态

也就是说:

异步任务负责“把新事实带回来”,
app 负责“把这些事实写进状态并重新渲染”。

这条边界一旦守住,streaming 应用就会稳定很多。

Ansiq 里的 streaming 主链
1用户提交动作
2update 收到 Submit
3通过 RuntimeHandle.spawn 启动异步任务
4后台任务分段 emit Chunk / Finish
5app.update 消费消息
6修改 draft / streaming 等状态
7runtime 收集 dirty scopes 并做局部更新

NotesApp 继续演化

前面我们的 NotesApp 还是一个很小的输入与列表应用。
现在我们把它再推进一步:提交一条 note 后,不是立刻得到一个完整结果,而是让后台任务分几段返回“正在生成的摘要”。

我们先定义消息:

#[derive(Clone, Debug)] enum Message { Submit(String), Chunk(String), Finish, }

再给 app 加一份“正在流式生成的草稿”:

#[derive(Default)] struct NotesApp { notes: Vec<String>, draft: String, streaming: bool, }

异步任务从哪里启动

Ansiq 里有两种最常见的入口:

  • mount(...) 里启动长期后台任务
  • update(...) 里响应某个动作时启动一次性任务

对于流式输出,最常见的是第二种:
用户提交后,启动一个异步任务分段返回消息。

fn update(&mut self, message: Message, handle: &RuntimeHandle<Message>) { match message { Message::Submit(input) => { self.streaming = true; self.draft.clear(); let handle = handle.clone(); handle.spawn(async move { for chunk in [ format!("Planning the change for {}", input), "Inspecting the workspace...".to_string(), "Writing the first failing test...".to_string(), ] { let _ = handle.emit(Message::Chunk(chunk)); tokio::time::sleep(std::time::Duration::from_millis(120)).await; } let _ = handle.emit(Message::Finish); }); } Message::Chunk(chunk) => { if !self.draft.is_empty() { self.draft.push('\n'); } self.draft.push_str(&chunk); } Message::Finish => { self.streaming = false; } } }

这里最值得注意的是:

  • 后台任务没有持有 signal
  • 后台任务只发 Message
  • 真正的状态写入仍然发生在 app 的 update(...)

真实的 stream 往往长这样

上面的 for chunk in [...] 只是为了先把模式看清楚。
真实的 AI / agent / 网络场景,更常见的是“消费一个真正的 stream”:

use tokio_stream::StreamExt; fn update(&mut self, message: Message, handle: &RuntimeHandle<Message>) { match message { Message::Submit(input) => { self.streaming = true; self.draft.clear(); let handle = handle.clone(); handle.spawn(async move { let mut stream = open_llm_stream(input).await; while let Some(chunk) = stream.next().await { let _ = handle.emit(Message::Chunk(chunk)); } let _ = handle.emit(Message::Finish); }); } Message::Chunk(chunk) => { if !self.draft.is_empty() { self.draft.push('\n'); } self.draft.push_str(&chunk); } Message::Finish => { self.streaming = false; } } }

这才是 AI agent 场景里最典型的路径:

  • tokio::spawn 里消费一个长连接或流式响应
  • 每拿到一段内容就 emit(Message::Chunk(...))
  • 最后统一发一个 Message::Finish

这里最重要的不是 open_llm_stream(...) 具体来自哪个 SDK,而是这条边界本身:

stream.next().await -> emit(Message) -> update(...) -> state

UI 如何显示“正在流式生成”

渲染时,只需要把 draft 当成普通状态显示出来:

fn render(&mut self, cx: &mut ViewCtx<'_, Message>) -> Element<Message> { let input = cx.signal(|| String::new()); let current = input.get(); view! { <Paragraph text={"Write a note and press Enter"} /> <Input value={current.clone()} on_change={{ let input = input.clone(); move |next| input.set_if_changed(next) }} on_submit={|next| Some(Message::Submit(next))} /> <Paragraph text={ if self.streaming { format!("Streaming...\n\n{}", self.draft) } else if self.draft.is_empty() { "No active draft".to_string() } else { self.draft.clone() } } /> } }

到这里,Ansiq 的 streaming 模型其实已经很清楚了:

  • 用户动作触发任务
  • 任务通过消息分段回传
  • app 把消息累积成 draft
  • runtime 只负责局部更新屏幕

为什么这条边界很重要

很多框架在 streaming 场景里最容易犯的错误是:

  • 让后台任务直接抓住 UI 状态
  • 让业务逻辑和渲染逻辑混在一起
  • 让“消息回传”和“屏幕更新”没有明确分层

在 Ansiq 里,最稳的一条路径是:

async task -> Message::Chunk / Message::Finish -> app.update(...) -> state changes -> runtime does partial update

如果你更习惯看系统流向图,可以把它理解成:

更抽象的 streaming 数据流
1async task
2Message::Chunk / Message::Finish
3app.update(...)
4state changes
5runtime partial update

一旦你这样组织,debug 会容易很多。

effect 在 streaming 里适合做什么

effect 不是流式输出的主入口,但它很适合做一些辅助性工作,例如:

  • streamingfalse -> true 时记录日志
  • draft 清空时触发某个外部同步动作
  • 驱动一个非 UI 的副作用

它不适合替代消息系统本身。

换句话说:

  • streaming 的主通道是消息
  • effect 是副作用钩子

如果需要历史记录,什么时候 commit

这时 draft 还只是 live 内容。

一旦流式生成完成,你通常要决定:

  • 它继续留在 live 区,还是
  • 进入历史列表,或者
  • commit 到 terminal scrollback

这就是为什么 streaming 最终一定会和 viewport/history 语义碰到一起。

在一个简单示例里,你可以先只把它留在当前 UI 状态里。
在 transcript / agent shell 里,则往往要继续引入:

  • completed turn
  • history block
  • scrollback commit

什么时候该用 mount(...)

如果你的异步来源不是“用户提交后启动的一次性任务”,而是持续存在的外部数据源,例如:

  • 系统采样
  • 文件监听
  • 长连接
  • 后台事件流

更合适的入口通常是 mount(...)

也就是说:

  • update(...) 更适合响应一次动作后启动任务
  • mount(...) 更适合应用一启动就存在的长期任务

这一页的真正结论

Ansiq 里的 streaming 不是一种特殊 widget 技巧,而是一条明确的数据流:

  • 异步任务产生消息
  • app 消费消息并更新状态
  • runtime 负责局部刷新

如果你守住这条边界,就可以在不破坏 runtime 模型的前提下,稳定地实现:

  • 分段加载
  • 实时采样
  • LLM 输出
  • transcript 增量更新

下一步:阅读 布局与渲染
下一页我们会回到 runtime,解释为什么这些更新不会自动退化成整树重画。

Last updated on