fix(tui): offload offline queue persistence

Move offline queue persistence off the UI hot path so Enter handling does not wait on synchronous disk work.
This commit is contained in:
Hunter Bown
2026-05-23 13:06:32 -05:00
committed by GitHub
parent 7a87aebec7
commit c675fe64d2
2 changed files with 126 additions and 20 deletions
+110 -5
View File
@@ -16,10 +16,10 @@
//! to this task. The UI merely `try_send`s a request (non-blocking,
//! bounded-channel drop) and returns immediately — keystrokes are never
//! gated on write completion.
//! - **Latest-wins coalescing**: when multiple `Checkpoint` or
//! `SessionSnapshot` requests pile up before the actor's next write cycle,
//! only the most recent one is written. `ClearCheckpoint` requests
//! accumulate normally (they're cheap and commutative).
//! - **Latest-wins coalescing**: when multiple `Checkpoint`,
//! `SessionSnapshot`, or offline-queue requests pile up before the actor's
//! next write cycle, only the most recent one is written. `ClearCheckpoint`
//! requests accumulate normally (they're cheap and commutative).
//! - **Unbounded channel** for `try_send` to always succeed; the actor
//! naturally backpressures via the spawn pool. A few outstanding
//! `SavedSession` values in the channel (< 1 MB) is negligible pressure.
@@ -28,7 +28,7 @@ use std::sync::OnceLock;
use tokio::sync::mpsc;
use crate::session_manager::{SavedSession, SessionManager};
use crate::session_manager::{OfflineQueueState, SavedSession, SessionManager};
use crate::utils::spawn_supervised;
// ---------------------------------------------------------------------------
@@ -42,12 +42,28 @@ pub enum PersistRequest {
Checkpoint(SavedSession),
/// Write a full session snapshot (completed turn, durable save).
SessionSnapshot(SavedSession),
/// Write queued/draft offline input for crash recovery.
OfflineQueue {
state: OfflineQueueState,
session_id: Option<String>,
},
/// Remove the queued/draft offline input file.
ClearOfflineQueue,
/// Remove the crash-recovery checkpoint file.
ClearCheckpoint,
/// Graceful shutdown — flush pending writes, then exit the actor loop.
Shutdown,
}
#[derive(Debug)]
enum PendingOfflineQueue {
Save {
state: OfflineQueueState,
session_id: Option<String>,
},
Clear,
}
// ---------------------------------------------------------------------------
// Handle (held by the TUI)
// ---------------------------------------------------------------------------
@@ -106,6 +122,7 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle {
async move {
let mut latest_checkpoint: Option<SavedSession> = None;
let mut latest_session: Option<SavedSession> = None;
let mut latest_offline_queue: Option<PendingOfflineQueue> = None;
let mut should_clear: bool = false;
loop {
@@ -118,6 +135,13 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle {
PersistRequest::SessionSnapshot(session) => {
latest_session = Some(session);
}
PersistRequest::OfflineQueue { state, session_id } => {
latest_offline_queue =
Some(PendingOfflineQueue::Save { state, session_id });
}
PersistRequest::ClearOfflineQueue => {
latest_offline_queue = Some(PendingOfflineQueue::Clear);
}
PersistRequest::ClearCheckpoint => {
should_clear = true;
}
@@ -126,6 +150,7 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle {
&manager,
latest_checkpoint.as_ref(),
latest_session.as_ref(),
latest_offline_queue.as_ref(),
should_clear,
);
return;
@@ -144,6 +169,9 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle {
if let Some(ref session) = latest_session.take() {
let _ = manager.save_session(session);
}
if let Some(ref request) = latest_offline_queue.take() {
apply_offline_queue_request(&manager, request);
}
// Block until the next request arrives.
match rx.recv().await {
@@ -153,6 +181,13 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle {
Some(PersistRequest::SessionSnapshot(session)) => {
latest_session = Some(session);
}
Some(PersistRequest::OfflineQueue { state, session_id }) => {
latest_offline_queue =
Some(PendingOfflineQueue::Save { state, session_id });
}
Some(PersistRequest::ClearOfflineQueue) => {
latest_offline_queue = Some(PendingOfflineQueue::Clear);
}
Some(PersistRequest::ClearCheckpoint) => {
should_clear = true;
}
@@ -161,6 +196,7 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle {
&manager,
latest_checkpoint.as_ref(),
latest_session.as_ref(),
latest_offline_queue.as_ref(),
should_clear,
);
return;
@@ -171,6 +207,7 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle {
&manager,
latest_checkpoint.as_ref(),
latest_session.as_ref(),
latest_offline_queue.as_ref(),
should_clear,
);
return;
@@ -188,6 +225,7 @@ fn flush_inner(
manager: &SessionManager,
checkpoint: Option<&SavedSession>,
session: Option<&SavedSession>,
offline_queue: Option<&PendingOfflineQueue>,
should_clear: bool,
) {
if should_clear {
@@ -199,4 +237,71 @@ fn flush_inner(
if let Some(s) = session {
let _ = manager.save_session(s);
}
if let Some(request) = offline_queue {
apply_offline_queue_request(manager, request);
}
}
fn apply_offline_queue_request(manager: &SessionManager, request: &PendingOfflineQueue) {
match request {
PendingOfflineQueue::Save { state, session_id } => {
let _ = manager.save_offline_queue_state(state, session_id.as_deref());
}
PendingOfflineQueue::Clear => {
let _ = manager.clear_offline_queue_state();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use crate::session_manager::{OfflineQueueState, QueuedSessionMessage};
async fn wait_until(mut predicate: impl FnMut() -> bool) {
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
if predicate() {
return;
}
assert!(
tokio::time::Instant::now() < deadline,
"timed out waiting for persistence actor"
);
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
#[tokio::test]
async fn actor_persists_and_clears_offline_queue_requests() {
let tmp = tempfile::tempdir().expect("tempdir");
let sessions_dir = tmp.path().join("sessions");
let manager = SessionManager::new(sessions_dir.clone()).expect("manager");
let queue_path = sessions_dir.join("checkpoints").join("offline_queue.json");
let handle = spawn_persistence_actor(manager);
let state = OfflineQueueState {
messages: vec![QueuedSessionMessage {
display: "queued from enter".to_string(),
skill_instruction: None,
}],
..OfflineQueueState::default()
};
handle.try_send(PersistRequest::OfflineQueue {
state,
session_id: Some("session-A".to_string()),
});
wait_until(|| {
std::fs::read_to_string(&queue_path)
.is_ok_and(|body| body.contains("queued from enter"))
})
.await;
handle.try_send(PersistRequest::ClearOfflineQueue);
wait_until(|| !queue_path.exists()).await;
handle.try_send(PersistRequest::Shutdown);
}
}
+16 -15
View File
@@ -3675,22 +3675,23 @@ pub(crate) fn apply_engine_error_to_app(
}
fn persist_offline_queue_state(app: &App) {
if let Ok(manager) = SessionManager::default_location() {
if app.queued_messages.is_empty() && app.queued_draft.is_none() {
let _ = manager.clear_offline_queue_state();
return;
}
let state = OfflineQueueState {
messages: app
.queued_messages
.iter()
.map(queued_ui_to_session)
.collect(),
draft: app.queued_draft.as_ref().map(queued_ui_to_session),
..OfflineQueueState::default()
};
let _ = manager.save_offline_queue_state(&state, app.current_session_id.as_deref());
if app.queued_messages.is_empty() && app.queued_draft.is_none() {
persistence_actor::persist(PersistRequest::ClearOfflineQueue);
return;
}
let state = OfflineQueueState {
messages: app
.queued_messages
.iter()
.map(queued_ui_to_session)
.collect(),
draft: app.queued_draft.as_ref().map(queued_ui_to_session),
..OfflineQueueState::default()
};
persistence_actor::persist(PersistRequest::OfflineQueue {
state,
session_id: app.current_session_id.clone(),
});
}
/// Strip ANSI control codes / non-printable bytes from a streaming