流式输出与异步任务
到这一页之前,我们一直在用同步、局部、可预测的交互建立 Ansiq 的基本直觉:
signalcomputedeffect- focus 与 routing
- continuity contract
但很多真正的终端应用都有另一类需求:
- 后台任务
- 网络请求
- 系统采样
- AI / LLM 输出
- 分段返回的长结果
也就是说,界面并不是“用户按一下键,我就更新一次”这么简单。
它还要面对:有些内容会在未来一点一点到达。
这一页要讲的,就是 Ansiq 里这件事应该怎么做。
先说结论:不要在后台任务里直接改 UI signal
这是最重要的一条规则。
Ansiq 现在的响应式句柄是线程绑定的,不能被安全地带进任意 tokio::spawn 任务里直接 set()。
正确做法是:
- 后台任务只负责产生消息
- 消息通过
RuntimeHandle::emit(...)回到 app - app 的
update(...)再修改自己的状态
也就是说:
异步任务负责“把新事实带回来”,
app 负责“把这些事实写进状态并重新渲染”。
这条边界一旦守住,streaming 应用就会稳定很多。
从 NotesApp 继续演化
前面我们的 NotesApp 还是一个很小的输入与列表应用。
现在我们把它再推进一步:提交一条 note 后,不是立刻得到一个完整结果,而是让后台任务分几段返回“正在生成的摘要”。
我们先定义消息:
#[derive(Clone, Debug)]
enum Message {
Submit(String),
Chunk(String),
Finish,
}再给 app 加一份“正在流式生成的草稿”:
#[derive(Default)]
struct NotesApp {
notes: Vec<String>,
draft: String,
streaming: bool,
}异步任务从哪里启动
Ansiq 里有两种最常见的入口:
- 在
mount(...)里启动长期后台任务 - 在
update(...)里响应某个动作时启动一次性任务
对于流式输出,最常见的是第二种:
用户提交后,启动一个异步任务分段返回消息。
fn update(&mut self, message: Message, handle: &RuntimeHandle<Message>) {
match message {
Message::Submit(input) => {
self.streaming = true;
self.draft.clear();
let handle = handle.clone();
handle.spawn(async move {
for chunk in [
format!("Planning the change for {}", input),
"Inspecting the workspace...".to_string(),
"Writing the first failing test...".to_string(),
] {
let _ = handle.emit(Message::Chunk(chunk));
tokio::time::sleep(std::time::Duration::from_millis(120)).await;
}
let _ = handle.emit(Message::Finish);
});
}
Message::Chunk(chunk) => {
if !self.draft.is_empty() {
self.draft.push('\n');
}
self.draft.push_str(&chunk);
}
Message::Finish => {
self.streaming = false;
}
}
}这里最值得注意的是:
- 后台任务没有持有
signal - 后台任务只发
Message - 真正的状态写入仍然发生在 app 的
update(...)
真实的 stream 往往长这样
上面的 for chunk in [...] 只是为了先把模式看清楚。
真实的 AI / agent / 网络场景,更常见的是“消费一个真正的 stream”:
use tokio_stream::StreamExt;
fn update(&mut self, message: Message, handle: &RuntimeHandle<Message>) {
match message {
Message::Submit(input) => {
self.streaming = true;
self.draft.clear();
let handle = handle.clone();
handle.spawn(async move {
let mut stream = open_llm_stream(input).await;
while let Some(chunk) = stream.next().await {
let _ = handle.emit(Message::Chunk(chunk));
}
let _ = handle.emit(Message::Finish);
});
}
Message::Chunk(chunk) => {
if !self.draft.is_empty() {
self.draft.push('\n');
}
self.draft.push_str(&chunk);
}
Message::Finish => {
self.streaming = false;
}
}
}这才是 AI agent 场景里最典型的路径:
tokio::spawn里消费一个长连接或流式响应- 每拿到一段内容就
emit(Message::Chunk(...)) - 最后统一发一个
Message::Finish
这里最重要的不是 open_llm_stream(...) 具体来自哪个 SDK,而是这条边界本身:
stream.next().await -> emit(Message) -> update(...) -> state
UI 如何显示“正在流式生成”
渲染时,只需要把 draft 当成普通状态显示出来:
fn render(&mut self, cx: &mut ViewCtx<'_, Message>) -> Element<Message> {
let input = cx.signal(|| String::new());
let current = input.get();
view! {
<Paragraph text={"Write a note and press Enter"} />
<Input
value={current.clone()}
on_change={{
let input = input.clone();
move |next| input.set_if_changed(next)
}}
on_submit={|next| Some(Message::Submit(next))}
/>
<Paragraph
text={
if self.streaming {
format!("Streaming...\n\n{}", self.draft)
} else if self.draft.is_empty() {
"No active draft".to_string()
} else {
self.draft.clone()
}
}
/>
}
}到这里,Ansiq 的 streaming 模型其实已经很清楚了:
- 用户动作触发任务
- 任务通过消息分段回传
- app 把消息累积成
draft - runtime 只负责局部更新屏幕
为什么这条边界很重要
很多框架在 streaming 场景里最容易犯的错误是:
- 让后台任务直接抓住 UI 状态
- 让业务逻辑和渲染逻辑混在一起
- 让“消息回传”和“屏幕更新”没有明确分层
在 Ansiq 里,最稳的一条路径是:
async task
-> Message::Chunk / Message::Finish
-> app.update(...)
-> state changes
-> runtime does partial update如果你更习惯看系统流向图,可以把它理解成:
一旦你这样组织,debug 会容易很多。
effect 在 streaming 里适合做什么
effect 不是流式输出的主入口,但它很适合做一些辅助性工作,例如:
- 当
streaming从false -> true时记录日志 - 当
draft清空时触发某个外部同步动作 - 驱动一个非 UI 的副作用
它不适合替代消息系统本身。
换句话说:
- streaming 的主通道是消息
- effect 是副作用钩子
如果需要历史记录,什么时候 commit
这时 draft 还只是 live 内容。
一旦流式生成完成,你通常要决定:
- 它继续留在 live 区,还是
- 进入历史列表,或者
- commit 到 terminal scrollback
这就是为什么 streaming 最终一定会和 viewport/history 语义碰到一起。
在一个简单示例里,你可以先只把它留在当前 UI 状态里。
在 transcript / agent shell 里,则往往要继续引入:
- completed turn
- history block
- scrollback commit
什么时候该用 mount(...)
如果你的异步来源不是“用户提交后启动的一次性任务”,而是持续存在的外部数据源,例如:
- 系统采样
- 文件监听
- 长连接
- 后台事件流
更合适的入口通常是 mount(...)。
也就是说:
update(...)更适合响应一次动作后启动任务mount(...)更适合应用一启动就存在的长期任务
这一页的真正结论
Ansiq 里的 streaming 不是一种特殊 widget 技巧,而是一条明确的数据流:
- 异步任务产生消息
- app 消费消息并更新状态
- runtime 负责局部刷新
如果你守住这条边界,就可以在不破坏 runtime 模型的前提下,稳定地实现:
- 分段加载
- 实时采样
- LLM 输出
- transcript 增量更新
下一步:阅读 布局与渲染。
下一页我们会回到 runtime,解释为什么这些更新不会自动退化成整树重画。