From c675fe64d2f20f016a974695303d5ec3cbf3c9f9 Mon Sep 17 00:00:00 2001 From: Hunter Bown Date: Sat, 23 May 2026 13:06:32 -0500 Subject: [PATCH] fix(tui): offload offline queue persistence Move offline queue persistence off the UI hot path so Enter handling does not wait on synchronous disk work. --- crates/tui/src/tui/persistence_actor.rs | 115 ++++++++++++++++++++++-- crates/tui/src/tui/ui.rs | 31 +++---- 2 files changed, 126 insertions(+), 20 deletions(-) diff --git a/crates/tui/src/tui/persistence_actor.rs b/crates/tui/src/tui/persistence_actor.rs index 2e3354e9..4c1b5805 100644 --- a/crates/tui/src/tui/persistence_actor.rs +++ b/crates/tui/src/tui/persistence_actor.rs @@ -16,10 +16,10 @@ //! to this task. The UI merely `try_send`s a request (non-blocking, //! bounded-channel drop) and returns immediately — keystrokes are never //! gated on write completion. -//! - **Latest-wins coalescing**: when multiple `Checkpoint` or -//! `SessionSnapshot` requests pile up before the actor's next write cycle, -//! only the most recent one is written. `ClearCheckpoint` requests -//! accumulate normally (they're cheap and commutative). +//! - **Latest-wins coalescing**: when multiple `Checkpoint`, +//! `SessionSnapshot`, or offline-queue requests pile up before the actor's +//! next write cycle, only the most recent one is written. `ClearCheckpoint` +//! requests accumulate normally (they're cheap and commutative). //! - **Unbounded channel** for `try_send` to always succeed; the actor //! naturally backpressures via the spawn pool. A few outstanding //! `SavedSession` values in the channel (< 1 MB) is negligible pressure. @@ -28,7 +28,7 @@ use std::sync::OnceLock; use tokio::sync::mpsc; -use crate::session_manager::{SavedSession, SessionManager}; +use crate::session_manager::{OfflineQueueState, SavedSession, SessionManager}; use crate::utils::spawn_supervised; // --------------------------------------------------------------------------- @@ -42,12 +42,28 @@ pub enum PersistRequest { Checkpoint(SavedSession), /// Write a full session snapshot (completed turn, durable save). SessionSnapshot(SavedSession), + /// Write queued/draft offline input for crash recovery. + OfflineQueue { + state: OfflineQueueState, + session_id: Option, + }, + /// Remove the queued/draft offline input file. + ClearOfflineQueue, /// Remove the crash-recovery checkpoint file. ClearCheckpoint, /// Graceful shutdown — flush pending writes, then exit the actor loop. Shutdown, } +#[derive(Debug)] +enum PendingOfflineQueue { + Save { + state: OfflineQueueState, + session_id: Option, + }, + Clear, +} + // --------------------------------------------------------------------------- // Handle (held by the TUI) // --------------------------------------------------------------------------- @@ -106,6 +122,7 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle { async move { let mut latest_checkpoint: Option = None; let mut latest_session: Option = None; + let mut latest_offline_queue: Option = None; let mut should_clear: bool = false; loop { @@ -118,6 +135,13 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle { PersistRequest::SessionSnapshot(session) => { latest_session = Some(session); } + PersistRequest::OfflineQueue { state, session_id } => { + latest_offline_queue = + Some(PendingOfflineQueue::Save { state, session_id }); + } + PersistRequest::ClearOfflineQueue => { + latest_offline_queue = Some(PendingOfflineQueue::Clear); + } PersistRequest::ClearCheckpoint => { should_clear = true; } @@ -126,6 +150,7 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle { &manager, latest_checkpoint.as_ref(), latest_session.as_ref(), + latest_offline_queue.as_ref(), should_clear, ); return; @@ -144,6 +169,9 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle { if let Some(ref session) = latest_session.take() { let _ = manager.save_session(session); } + if let Some(ref request) = latest_offline_queue.take() { + apply_offline_queue_request(&manager, request); + } // Block until the next request arrives. match rx.recv().await { @@ -153,6 +181,13 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle { Some(PersistRequest::SessionSnapshot(session)) => { latest_session = Some(session); } + Some(PersistRequest::OfflineQueue { state, session_id }) => { + latest_offline_queue = + Some(PendingOfflineQueue::Save { state, session_id }); + } + Some(PersistRequest::ClearOfflineQueue) => { + latest_offline_queue = Some(PendingOfflineQueue::Clear); + } Some(PersistRequest::ClearCheckpoint) => { should_clear = true; } @@ -161,6 +196,7 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle { &manager, latest_checkpoint.as_ref(), latest_session.as_ref(), + latest_offline_queue.as_ref(), should_clear, ); return; @@ -171,6 +207,7 @@ pub fn spawn_persistence_actor(manager: SessionManager) -> PersistActorHandle { &manager, latest_checkpoint.as_ref(), latest_session.as_ref(), + latest_offline_queue.as_ref(), should_clear, ); return; @@ -188,6 +225,7 @@ fn flush_inner( manager: &SessionManager, checkpoint: Option<&SavedSession>, session: Option<&SavedSession>, + offline_queue: Option<&PendingOfflineQueue>, should_clear: bool, ) { if should_clear { @@ -199,4 +237,71 @@ fn flush_inner( if let Some(s) = session { let _ = manager.save_session(s); } + if let Some(request) = offline_queue { + apply_offline_queue_request(manager, request); + } +} + +fn apply_offline_queue_request(manager: &SessionManager, request: &PendingOfflineQueue) { + match request { + PendingOfflineQueue::Save { state, session_id } => { + let _ = manager.save_offline_queue_state(state, session_id.as_deref()); + } + PendingOfflineQueue::Clear => { + let _ = manager.clear_offline_queue_state(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + use crate::session_manager::{OfflineQueueState, QueuedSessionMessage}; + + async fn wait_until(mut predicate: impl FnMut() -> bool) { + let deadline = tokio::time::Instant::now() + Duration::from_secs(2); + loop { + if predicate() { + return; + } + assert!( + tokio::time::Instant::now() < deadline, + "timed out waiting for persistence actor" + ); + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + + #[tokio::test] + async fn actor_persists_and_clears_offline_queue_requests() { + let tmp = tempfile::tempdir().expect("tempdir"); + let sessions_dir = tmp.path().join("sessions"); + let manager = SessionManager::new(sessions_dir.clone()).expect("manager"); + let queue_path = sessions_dir.join("checkpoints").join("offline_queue.json"); + let handle = spawn_persistence_actor(manager); + + let state = OfflineQueueState { + messages: vec![QueuedSessionMessage { + display: "queued from enter".to_string(), + skill_instruction: None, + }], + ..OfflineQueueState::default() + }; + + handle.try_send(PersistRequest::OfflineQueue { + state, + session_id: Some("session-A".to_string()), + }); + wait_until(|| { + std::fs::read_to_string(&queue_path) + .is_ok_and(|body| body.contains("queued from enter")) + }) + .await; + + handle.try_send(PersistRequest::ClearOfflineQueue); + wait_until(|| !queue_path.exists()).await; + handle.try_send(PersistRequest::Shutdown); + } } diff --git a/crates/tui/src/tui/ui.rs b/crates/tui/src/tui/ui.rs index 3cf028a4..e4157c5a 100644 --- a/crates/tui/src/tui/ui.rs +++ b/crates/tui/src/tui/ui.rs @@ -3675,22 +3675,23 @@ pub(crate) fn apply_engine_error_to_app( } fn persist_offline_queue_state(app: &App) { - if let Ok(manager) = SessionManager::default_location() { - if app.queued_messages.is_empty() && app.queued_draft.is_none() { - let _ = manager.clear_offline_queue_state(); - return; - } - let state = OfflineQueueState { - messages: app - .queued_messages - .iter() - .map(queued_ui_to_session) - .collect(), - draft: app.queued_draft.as_ref().map(queued_ui_to_session), - ..OfflineQueueState::default() - }; - let _ = manager.save_offline_queue_state(&state, app.current_session_id.as_deref()); + if app.queued_messages.is_empty() && app.queued_draft.is_none() { + persistence_actor::persist(PersistRequest::ClearOfflineQueue); + return; } + let state = OfflineQueueState { + messages: app + .queued_messages + .iter() + .map(queued_ui_to_session) + .collect(), + draft: app.queued_draft.as_ref().map(queued_ui_to_session), + ..OfflineQueueState::default() + }; + persistence_actor::persist(PersistRequest::OfflineQueue { + state, + session_id: app.current_session_id.clone(), + }); } /// Strip ANSI control codes / non-printable bytes from a streaming