b7bd02d814
Adds first-class DeepSeek V4 Pro and Flash support, updates the default model to deepseek-v4-pro, aligns legacy aliases with the current V4 1M context behavior, and fixes thinking-mode request handling. Key fixes: - Send DeepSeek's raw Chat Completions `thinking` parameter at the top level instead of SDK-only `extra_body`. - Preserve assistant `reasoning_content` for all prior thinking-mode tool-call turns so subsequent requests satisfy DeepSeek V4's replay requirement. - Fix npm wrapper concurrent first-run downloads by using per-process temporary download paths. - Add `.mailmap` so historical bot-attributed commits aggregate under Hunter Bown where mailmap is honored. Verified with the full local Rust gate, live DeepSeek V4 smoke, npm wrapper temp-install smoke, and green PR CI across Linux, macOS, and Windows.
933 lines
32 KiB
Rust
933 lines
32 KiB
Rust
//! Durable automation records and scheduler support.
|
|
//!
|
|
//! Automations are local-first recurring jobs that enqueue standard background
|
|
//! tasks. This module stores automation definitions and run history under
|
|
//! `~/.deepseek/automations` (or `DEEPSEEK_AUTOMATIONS_DIR` override).
|
|
|
|
use std::collections::BTreeMap;
|
|
use std::fs;
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::Arc;
|
|
|
|
use anyhow::{Context, Result, bail};
|
|
use chrono::{DateTime, Datelike, Duration, Local, TimeZone, Timelike, Utc, Weekday};
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::sync::Mutex;
|
|
use tokio::time::sleep;
|
|
use tokio_util::sync::CancellationToken;
|
|
use uuid::Uuid;
|
|
|
|
use crate::task_manager::{NewTaskRequest, SharedTaskManager, TaskStatus};
|
|
|
|
const CURRENT_AUTOMATION_SCHEMA_VERSION: u32 = 1;
|
|
const CURRENT_RUN_SCHEMA_VERSION: u32 = 1;
|
|
|
|
const fn default_automation_schema_version() -> u32 {
|
|
CURRENT_AUTOMATION_SCHEMA_VERSION
|
|
}
|
|
|
|
const fn default_run_schema_version() -> u32 {
|
|
CURRENT_RUN_SCHEMA_VERSION
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum AutomationStatus {
|
|
Active,
|
|
Paused,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
|
|
#[serde(rename_all = "snake_case")]
|
|
pub enum AutomationRunStatus {
|
|
Queued,
|
|
Running,
|
|
Completed,
|
|
Failed,
|
|
Canceled,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct AutomationRecord {
|
|
#[serde(default = "default_automation_schema_version")]
|
|
pub schema_version: u32,
|
|
pub id: String,
|
|
pub name: String,
|
|
pub prompt: String,
|
|
pub rrule: String,
|
|
#[serde(default)]
|
|
pub cwds: Vec<PathBuf>,
|
|
pub status: AutomationStatus,
|
|
pub created_at: DateTime<Utc>,
|
|
pub updated_at: DateTime<Utc>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub next_run_at: Option<DateTime<Utc>>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub last_run_at: Option<DateTime<Utc>>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct AutomationRunRecord {
|
|
#[serde(default = "default_run_schema_version")]
|
|
pub schema_version: u32,
|
|
pub id: String,
|
|
pub automation_id: String,
|
|
pub scheduled_for: DateTime<Utc>,
|
|
pub status: AutomationRunStatus,
|
|
pub created_at: DateTime<Utc>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub started_at: Option<DateTime<Utc>>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub ended_at: Option<DateTime<Utc>>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub task_id: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub thread_id: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub turn_id: Option<String>,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
pub error: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct CreateAutomationRequest {
|
|
pub name: String,
|
|
pub prompt: String,
|
|
pub rrule: String,
|
|
#[serde(default)]
|
|
pub cwds: Vec<PathBuf>,
|
|
#[serde(default)]
|
|
pub status: Option<AutomationStatus>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
pub struct UpdateAutomationRequest {
|
|
pub name: Option<String>,
|
|
pub prompt: Option<String>,
|
|
pub rrule: Option<String>,
|
|
pub cwds: Option<Vec<PathBuf>>,
|
|
pub status: Option<AutomationStatus>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
enum AutomationFrequency {
|
|
Hourly,
|
|
Weekly,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub enum AutomationSchedule {
|
|
Hourly {
|
|
interval_hours: u32,
|
|
byday: Option<Vec<Weekday>>,
|
|
},
|
|
Weekly {
|
|
byday: Vec<Weekday>,
|
|
byhour: u32,
|
|
byminute: u32,
|
|
},
|
|
}
|
|
|
|
impl AutomationSchedule {
|
|
pub fn parse_rrule(rrule: &str) -> Result<Self> {
|
|
let mut parts: BTreeMap<String, String> = BTreeMap::new();
|
|
for raw in rrule.split(';') {
|
|
let item = raw.trim();
|
|
if item.is_empty() {
|
|
continue;
|
|
}
|
|
let Some((k, v)) = item.split_once('=') else {
|
|
bail!("Invalid RRULE segment '{item}'");
|
|
};
|
|
parts.insert(k.trim().to_ascii_uppercase(), v.trim().to_ascii_uppercase());
|
|
}
|
|
|
|
let freq = match parts.get("FREQ").map(String::as_str) {
|
|
Some("HOURLY") => AutomationFrequency::Hourly,
|
|
Some("WEEKLY") => AutomationFrequency::Weekly,
|
|
Some(other) => bail!("Unsupported RRULE FREQ '{other}'. Supported: HOURLY and WEEKLY"),
|
|
None => bail!("RRULE must include FREQ"),
|
|
};
|
|
|
|
match freq {
|
|
AutomationFrequency::Hourly => {
|
|
for key in parts.keys() {
|
|
if key != "FREQ" && key != "INTERVAL" && key != "BYDAY" {
|
|
bail!(
|
|
"Unsupported RRULE field '{key}' for HOURLY. Allowed: FREQ,INTERVAL,BYDAY"
|
|
);
|
|
}
|
|
}
|
|
let interval_hours = parts
|
|
.get("INTERVAL")
|
|
.map(|v| v.parse::<u32>())
|
|
.transpose()
|
|
.context("Failed to parse INTERVAL")?
|
|
.unwrap_or(1);
|
|
if interval_hours == 0 {
|
|
bail!("INTERVAL must be >= 1 for HOURLY schedules");
|
|
}
|
|
let byday = parts
|
|
.get("BYDAY")
|
|
.map(|value| parse_byday(value))
|
|
.transpose()?;
|
|
Ok(Self::Hourly {
|
|
interval_hours,
|
|
byday,
|
|
})
|
|
}
|
|
AutomationFrequency::Weekly => {
|
|
for key in parts.keys() {
|
|
if key != "FREQ" && key != "BYDAY" && key != "BYHOUR" && key != "BYMINUTE" {
|
|
bail!(
|
|
"Unsupported RRULE field '{key}' for WEEKLY. Allowed: FREQ,BYDAY,BYHOUR,BYMINUTE"
|
|
);
|
|
}
|
|
}
|
|
let byday_raw = parts
|
|
.get("BYDAY")
|
|
.ok_or_else(|| anyhow::anyhow!("WEEKLY schedules require BYDAY"))?;
|
|
let byday = parse_byday(byday_raw)?;
|
|
if byday.is_empty() {
|
|
bail!("BYDAY cannot be empty for WEEKLY schedules");
|
|
}
|
|
let byhour = parts
|
|
.get("BYHOUR")
|
|
.ok_or_else(|| anyhow::anyhow!("WEEKLY schedules require BYHOUR"))?
|
|
.parse::<u32>()
|
|
.context("Failed to parse BYHOUR")?;
|
|
let byminute = parts
|
|
.get("BYMINUTE")
|
|
.ok_or_else(|| anyhow::anyhow!("WEEKLY schedules require BYMINUTE"))?
|
|
.parse::<u32>()
|
|
.context("Failed to parse BYMINUTE")?;
|
|
|
|
if byhour > 23 {
|
|
bail!("BYHOUR must be between 0 and 23");
|
|
}
|
|
if byminute > 59 {
|
|
bail!("BYMINUTE must be between 0 and 59");
|
|
}
|
|
|
|
Ok(Self::Weekly {
|
|
byday,
|
|
byhour,
|
|
byminute,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn next_after(&self, after: DateTime<Utc>) -> Result<DateTime<Utc>> {
|
|
let local_after = after.with_timezone(&Local);
|
|
match self {
|
|
Self::Hourly {
|
|
interval_hours,
|
|
byday,
|
|
} => {
|
|
let mut candidate = local_after + Duration::hours(i64::from(*interval_hours))
|
|
- Duration::seconds(i64::from(local_after.second()))
|
|
- Duration::nanoseconds(i64::from(local_after.nanosecond()));
|
|
|
|
if let Some(days) = byday {
|
|
for _ in 0..(24 * 21) {
|
|
if days.contains(&candidate.weekday()) {
|
|
return Ok(candidate.with_timezone(&Utc));
|
|
}
|
|
candidate += Duration::hours(i64::from(*interval_hours));
|
|
}
|
|
bail!("Unable to compute next HOURLY run for BYDAY filter");
|
|
}
|
|
|
|
Ok(candidate.with_timezone(&Utc))
|
|
}
|
|
Self::Weekly {
|
|
byday,
|
|
byhour,
|
|
byminute,
|
|
} => {
|
|
for day_offset in 0..15 {
|
|
let date = local_after.date_naive() + Duration::days(i64::from(day_offset));
|
|
if !byday.contains(&date.weekday()) {
|
|
continue;
|
|
}
|
|
let Some(candidate_naive) = date.and_hms_opt(*byhour, *byminute, 0) else {
|
|
continue;
|
|
};
|
|
if let Some(candidate) = resolve_local_datetime(candidate_naive)
|
|
&& candidate > local_after
|
|
{
|
|
return Ok(candidate.with_timezone(&Utc));
|
|
}
|
|
}
|
|
bail!("Unable to compute next WEEKLY run");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn resolve_local_datetime(naive: chrono::NaiveDateTime) -> Option<DateTime<Local>> {
|
|
Local
|
|
.from_local_datetime(&naive)
|
|
.single()
|
|
.or_else(|| Local.from_local_datetime(&naive).earliest())
|
|
.or_else(|| Local.from_local_datetime(&naive).latest())
|
|
}
|
|
|
|
fn parse_byday(value: &str) -> Result<Vec<Weekday>> {
|
|
let mut days = Vec::new();
|
|
for token in value.split(',') {
|
|
let day = match token.trim().to_ascii_uppercase().as_str() {
|
|
"MO" => Weekday::Mon,
|
|
"TU" => Weekday::Tue,
|
|
"WE" => Weekday::Wed,
|
|
"TH" => Weekday::Thu,
|
|
"FR" => Weekday::Fri,
|
|
"SA" => Weekday::Sat,
|
|
"SU" => Weekday::Sun,
|
|
other => bail!("Invalid BYDAY value '{other}'"),
|
|
};
|
|
if !days.contains(&day) {
|
|
days.push(day);
|
|
}
|
|
}
|
|
Ok(days)
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct AutomationManager {
|
|
automations_dir: PathBuf,
|
|
runs_dir: PathBuf,
|
|
}
|
|
|
|
impl AutomationManager {
|
|
pub fn open(root: PathBuf) -> Result<Self> {
|
|
let automations_dir = root.join("automations");
|
|
let runs_dir = root.join("runs");
|
|
fs::create_dir_all(&automations_dir)
|
|
.with_context(|| format!("Failed to create {}", automations_dir.display()))?;
|
|
fs::create_dir_all(&runs_dir)
|
|
.with_context(|| format!("Failed to create {}", runs_dir.display()))?;
|
|
Ok(Self {
|
|
automations_dir,
|
|
runs_dir,
|
|
})
|
|
}
|
|
|
|
pub fn default_location() -> Result<Self> {
|
|
Self::open(default_automations_dir())
|
|
}
|
|
|
|
fn automation_path(&self, id: &str) -> PathBuf {
|
|
self.automations_dir.join(format!("{id}.json"))
|
|
}
|
|
|
|
fn runs_dir_for(&self, automation_id: &str) -> PathBuf {
|
|
self.runs_dir.join(automation_id)
|
|
}
|
|
|
|
fn run_path(&self, automation_id: &str, run_id: &str) -> PathBuf {
|
|
self.runs_dir_for(automation_id)
|
|
.join(format!("{run_id}.json"))
|
|
}
|
|
|
|
pub fn create_automation(&self, req: CreateAutomationRequest) -> Result<AutomationRecord> {
|
|
validate_name_and_prompt(&req.name, &req.prompt)?;
|
|
let schedule = AutomationSchedule::parse_rrule(&req.rrule)?;
|
|
let now = Utc::now();
|
|
let status = req.status.unwrap_or(AutomationStatus::Active);
|
|
let next_run_at = if matches!(status, AutomationStatus::Active) {
|
|
Some(schedule.next_after(now)?)
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let record = AutomationRecord {
|
|
schema_version: CURRENT_AUTOMATION_SCHEMA_VERSION,
|
|
id: Uuid::new_v4().to_string(),
|
|
name: req.name.trim().to_string(),
|
|
prompt: req.prompt.trim().to_string(),
|
|
rrule: req.rrule.trim().to_ascii_uppercase(),
|
|
cwds: req.cwds,
|
|
status,
|
|
created_at: now,
|
|
updated_at: now,
|
|
next_run_at,
|
|
last_run_at: None,
|
|
};
|
|
|
|
self.save_automation(&record)?;
|
|
Ok(record)
|
|
}
|
|
|
|
pub fn get_automation(&self, id: &str) -> Result<AutomationRecord> {
|
|
let path = self.automation_path(id);
|
|
let raw = fs::read_to_string(&path)
|
|
.with_context(|| format!("Failed to read automation {}", path.display()))?;
|
|
let record: AutomationRecord = serde_json::from_str(&raw)
|
|
.with_context(|| format!("Failed to parse automation {}", path.display()))?;
|
|
if record.schema_version > CURRENT_AUTOMATION_SCHEMA_VERSION {
|
|
bail!(
|
|
"Automation schema v{} is newer than supported v{}",
|
|
record.schema_version,
|
|
CURRENT_AUTOMATION_SCHEMA_VERSION
|
|
);
|
|
}
|
|
Ok(record)
|
|
}
|
|
|
|
pub fn save_automation(&self, record: &AutomationRecord) -> Result<()> {
|
|
write_json_atomic(&self.automation_path(&record.id), record)
|
|
}
|
|
|
|
pub fn list_automations(&self) -> Result<Vec<AutomationRecord>> {
|
|
let mut out = Vec::new();
|
|
for entry in fs::read_dir(&self.automations_dir)
|
|
.with_context(|| format!("Failed to read {}", self.automations_dir.display()))?
|
|
{
|
|
let entry = entry?;
|
|
let path = entry.path();
|
|
if path.extension().is_none_or(|ext| ext != "json") {
|
|
continue;
|
|
}
|
|
let raw = fs::read_to_string(&path)
|
|
.with_context(|| format!("Failed to read {}", path.display()))?;
|
|
let record: AutomationRecord = serde_json::from_str(&raw)
|
|
.with_context(|| format!("Failed to parse {}", path.display()))?;
|
|
if record.schema_version > CURRENT_AUTOMATION_SCHEMA_VERSION {
|
|
bail!(
|
|
"Automation schema v{} is newer than supported v{}",
|
|
record.schema_version,
|
|
CURRENT_AUTOMATION_SCHEMA_VERSION
|
|
);
|
|
}
|
|
out.push(record);
|
|
}
|
|
out.sort_by_key(|r| std::cmp::Reverse(r.updated_at));
|
|
Ok(out)
|
|
}
|
|
|
|
pub fn update_automation(
|
|
&self,
|
|
id: &str,
|
|
req: UpdateAutomationRequest,
|
|
) -> Result<AutomationRecord> {
|
|
let mut existing = self.get_automation(id)?;
|
|
|
|
if let Some(name) = req.name {
|
|
if name.trim().is_empty() {
|
|
bail!("Automation name cannot be empty");
|
|
}
|
|
existing.name = name.trim().to_string();
|
|
}
|
|
if let Some(prompt) = req.prompt {
|
|
if prompt.trim().is_empty() {
|
|
bail!("Automation prompt cannot be empty");
|
|
}
|
|
existing.prompt = prompt.trim().to_string();
|
|
}
|
|
if let Some(rrule) = req.rrule {
|
|
let normalized = rrule.trim().to_ascii_uppercase();
|
|
AutomationSchedule::parse_rrule(&normalized)?;
|
|
existing.rrule = normalized;
|
|
if matches!(existing.status, AutomationStatus::Active) {
|
|
let schedule = AutomationSchedule::parse_rrule(&existing.rrule)?;
|
|
existing.next_run_at = Some(schedule.next_after(Utc::now())?);
|
|
}
|
|
}
|
|
if let Some(cwds) = req.cwds {
|
|
existing.cwds = cwds;
|
|
}
|
|
if let Some(status) = req.status {
|
|
existing.status = status;
|
|
if matches!(status, AutomationStatus::Paused) {
|
|
existing.next_run_at = None;
|
|
} else {
|
|
let schedule = AutomationSchedule::parse_rrule(&existing.rrule)?;
|
|
existing.next_run_at = Some(schedule.next_after(Utc::now())?);
|
|
}
|
|
}
|
|
|
|
existing.updated_at = Utc::now();
|
|
self.save_automation(&existing)?;
|
|
Ok(existing)
|
|
}
|
|
|
|
pub fn pause_automation(&self, id: &str) -> Result<AutomationRecord> {
|
|
self.update_automation(
|
|
id,
|
|
UpdateAutomationRequest {
|
|
status: Some(AutomationStatus::Paused),
|
|
..UpdateAutomationRequest::default()
|
|
},
|
|
)
|
|
}
|
|
|
|
pub fn resume_automation(&self, id: &str) -> Result<AutomationRecord> {
|
|
self.update_automation(
|
|
id,
|
|
UpdateAutomationRequest {
|
|
status: Some(AutomationStatus::Active),
|
|
..UpdateAutomationRequest::default()
|
|
},
|
|
)
|
|
}
|
|
|
|
pub fn delete_automation(&self, id: &str) -> Result<AutomationRecord> {
|
|
let existing = self.get_automation(id)?;
|
|
let path = self.automation_path(id);
|
|
fs::remove_file(&path)
|
|
.with_context(|| format!("Failed to delete automation {}", path.display()))?;
|
|
|
|
let runs_dir = self.runs_dir_for(id);
|
|
if runs_dir.exists() {
|
|
fs::remove_dir_all(&runs_dir).with_context(|| {
|
|
format!("Failed to delete automation runs {}", runs_dir.display())
|
|
})?;
|
|
}
|
|
|
|
Ok(existing)
|
|
}
|
|
|
|
pub fn list_runs(
|
|
&self,
|
|
automation_id: &str,
|
|
limit: Option<usize>,
|
|
) -> Result<Vec<AutomationRunRecord>> {
|
|
let dir = self.runs_dir_for(automation_id);
|
|
if !dir.exists() {
|
|
return Ok(Vec::new());
|
|
}
|
|
|
|
let mut out = Vec::new();
|
|
for entry in
|
|
fs::read_dir(&dir).with_context(|| format!("Failed to read {}", dir.display()))?
|
|
{
|
|
let entry = entry?;
|
|
let path = entry.path();
|
|
if path.extension().is_none_or(|ext| ext != "json") {
|
|
continue;
|
|
}
|
|
let raw = fs::read_to_string(&path)
|
|
.with_context(|| format!("Failed to read {}", path.display()))?;
|
|
let run: AutomationRunRecord = serde_json::from_str(&raw)
|
|
.with_context(|| format!("Failed to parse {}", path.display()))?;
|
|
if run.schema_version > CURRENT_RUN_SCHEMA_VERSION {
|
|
bail!(
|
|
"Automation run schema v{} is newer than supported v{}",
|
|
run.schema_version,
|
|
CURRENT_RUN_SCHEMA_VERSION
|
|
);
|
|
}
|
|
out.push(run);
|
|
}
|
|
|
|
out.sort_by_key(|r| std::cmp::Reverse(r.created_at));
|
|
if let Some(limit) = limit {
|
|
out.truncate(limit);
|
|
}
|
|
Ok(out)
|
|
}
|
|
|
|
fn save_run(&self, run: &AutomationRunRecord) -> Result<()> {
|
|
let dir = self.runs_dir_for(&run.automation_id);
|
|
fs::create_dir_all(&dir).with_context(|| format!("Failed to create {}", dir.display()))?;
|
|
write_json_atomic(&self.run_path(&run.automation_id, &run.id), run)
|
|
}
|
|
|
|
async fn enqueue_run_task(
|
|
&self,
|
|
automation: &AutomationRecord,
|
|
run: &mut AutomationRunRecord,
|
|
task_manager: &SharedTaskManager,
|
|
) -> Result<()> {
|
|
let workspace = automation.cwds.first().cloned();
|
|
|
|
let new_task = NewTaskRequest {
|
|
prompt: automation.prompt.clone(),
|
|
model: None,
|
|
workspace,
|
|
mode: Some("agent".to_string()),
|
|
allow_shell: Some(false),
|
|
trust_mode: Some(false),
|
|
auto_approve: Some(true),
|
|
};
|
|
|
|
match task_manager.add_task(new_task).await {
|
|
Ok(task) => {
|
|
run.status = AutomationRunStatus::Running;
|
|
run.started_at = Some(Utc::now());
|
|
run.task_id = Some(task.id.clone());
|
|
run.thread_id = task.thread_id.clone();
|
|
run.turn_id = task.turn_id.clone();
|
|
run.error = None;
|
|
Ok(())
|
|
}
|
|
Err(err) => {
|
|
run.status = AutomationRunStatus::Failed;
|
|
run.ended_at = Some(Utc::now());
|
|
run.error = Some(format!("Failed to enqueue task: {err}"));
|
|
Ok(())
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn run_now(
|
|
&self,
|
|
automation_id: &str,
|
|
task_manager: &SharedTaskManager,
|
|
) -> Result<AutomationRunRecord> {
|
|
let mut automation = self.get_automation(automation_id)?;
|
|
let now = Utc::now();
|
|
let mut run = AutomationRunRecord {
|
|
schema_version: CURRENT_RUN_SCHEMA_VERSION,
|
|
id: Uuid::new_v4().to_string(),
|
|
automation_id: automation.id.clone(),
|
|
scheduled_for: now,
|
|
status: AutomationRunStatus::Queued,
|
|
created_at: now,
|
|
started_at: None,
|
|
ended_at: None,
|
|
task_id: None,
|
|
thread_id: None,
|
|
turn_id: None,
|
|
error: None,
|
|
};
|
|
|
|
self.enqueue_run_task(&automation, &mut run, task_manager)
|
|
.await?;
|
|
self.save_run(&run)?;
|
|
|
|
automation.updated_at = Utc::now();
|
|
if matches!(
|
|
run.status,
|
|
AutomationRunStatus::Completed
|
|
| AutomationRunStatus::Failed
|
|
| AutomationRunStatus::Canceled
|
|
) {
|
|
automation.last_run_at = run.ended_at.or(Some(Utc::now()));
|
|
}
|
|
self.save_automation(&automation)?;
|
|
|
|
Ok(run)
|
|
}
|
|
|
|
pub async fn scheduler_tick(&self, task_manager: &SharedTaskManager) -> Result<()> {
|
|
let now = Utc::now();
|
|
let mut automations = self.list_automations()?;
|
|
|
|
for automation in &mut automations {
|
|
if !matches!(automation.status, AutomationStatus::Active) {
|
|
continue;
|
|
}
|
|
|
|
let schedule = AutomationSchedule::parse_rrule(&automation.rrule)?;
|
|
if automation.next_run_at.is_none() {
|
|
automation.next_run_at = Some(schedule.next_after(now)?);
|
|
automation.updated_at = now;
|
|
self.save_automation(automation)?;
|
|
continue;
|
|
}
|
|
|
|
let due_at = automation.next_run_at.expect("checked above");
|
|
if due_at > now {
|
|
continue;
|
|
}
|
|
|
|
// Idempotency: if a run already exists for this schedule slot, skip enqueue and
|
|
// advance next_run_at.
|
|
let existing_for_slot = self
|
|
.list_runs(&automation.id, Some(25))?
|
|
.into_iter()
|
|
.any(|run| run.scheduled_for == due_at);
|
|
|
|
if existing_for_slot {
|
|
automation.next_run_at = Some(schedule.next_after(due_at)?);
|
|
automation.updated_at = now;
|
|
self.save_automation(automation)?;
|
|
continue;
|
|
}
|
|
|
|
let mut run = AutomationRunRecord {
|
|
schema_version: CURRENT_RUN_SCHEMA_VERSION,
|
|
id: Uuid::new_v4().to_string(),
|
|
automation_id: automation.id.clone(),
|
|
scheduled_for: due_at,
|
|
status: AutomationRunStatus::Queued,
|
|
created_at: now,
|
|
started_at: None,
|
|
ended_at: None,
|
|
task_id: None,
|
|
thread_id: None,
|
|
turn_id: None,
|
|
error: None,
|
|
};
|
|
|
|
self.enqueue_run_task(automation, &mut run, task_manager)
|
|
.await?;
|
|
self.save_run(&run)?;
|
|
|
|
automation.updated_at = now;
|
|
automation.next_run_at = Some(schedule.next_after(due_at)?);
|
|
self.save_automation(automation)?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn reconcile_run_statuses(&self, task_manager: &SharedTaskManager) -> Result<()> {
|
|
let automations = self.list_automations()?;
|
|
for automation in automations {
|
|
let runs = self.list_runs(&automation.id, Some(100))?;
|
|
for mut run in runs {
|
|
if !matches!(
|
|
run.status,
|
|
AutomationRunStatus::Queued | AutomationRunStatus::Running
|
|
) {
|
|
continue;
|
|
}
|
|
let Some(task_id) = run.task_id.clone() else {
|
|
continue;
|
|
};
|
|
let task = match task_manager.get_task(&task_id).await {
|
|
Ok(task) => task,
|
|
Err(_) => continue,
|
|
};
|
|
|
|
run.thread_id = task.thread_id.clone();
|
|
run.turn_id = task.turn_id.clone();
|
|
|
|
let mut changed = false;
|
|
match task.status {
|
|
TaskStatus::Queued => {
|
|
if !matches!(run.status, AutomationRunStatus::Queued) {
|
|
run.status = AutomationRunStatus::Queued;
|
|
changed = true;
|
|
}
|
|
}
|
|
TaskStatus::Running => {
|
|
if !matches!(run.status, AutomationRunStatus::Running) {
|
|
run.status = AutomationRunStatus::Running;
|
|
changed = true;
|
|
}
|
|
if run.started_at.is_none() {
|
|
run.started_at = Some(task.started_at.unwrap_or_else(Utc::now));
|
|
changed = true;
|
|
}
|
|
}
|
|
TaskStatus::Completed => {
|
|
run.status = AutomationRunStatus::Completed;
|
|
run.started_at = run.started_at.or(task.started_at);
|
|
run.ended_at = task.ended_at.or(Some(Utc::now()));
|
|
run.error = None;
|
|
changed = true;
|
|
}
|
|
TaskStatus::Failed => {
|
|
run.status = AutomationRunStatus::Failed;
|
|
run.started_at = run.started_at.or(task.started_at);
|
|
run.ended_at = task.ended_at.or(Some(Utc::now()));
|
|
run.error = task.error.clone();
|
|
changed = true;
|
|
}
|
|
TaskStatus::Canceled => {
|
|
run.status = AutomationRunStatus::Canceled;
|
|
run.started_at = run.started_at.or(task.started_at);
|
|
run.ended_at = task.ended_at.or(Some(Utc::now()));
|
|
changed = true;
|
|
}
|
|
}
|
|
|
|
if changed {
|
|
self.save_run(&run)?;
|
|
if matches!(
|
|
run.status,
|
|
AutomationRunStatus::Completed
|
|
| AutomationRunStatus::Failed
|
|
| AutomationRunStatus::Canceled
|
|
) {
|
|
let mut updated_automation = self.get_automation(&automation.id)?;
|
|
updated_automation.last_run_at = run.ended_at.or(Some(Utc::now()));
|
|
updated_automation.updated_at = Utc::now();
|
|
self.save_automation(&updated_automation)?;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn validate_name_and_prompt(name: &str, prompt: &str) -> Result<()> {
|
|
if name.trim().is_empty() {
|
|
bail!("Automation name is required");
|
|
}
|
|
if prompt.trim().is_empty() {
|
|
bail!("Automation prompt is required");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn write_json_atomic<T: Serialize>(path: &Path, value: &T) -> Result<()> {
|
|
if let Some(parent) = path.parent() {
|
|
fs::create_dir_all(parent)
|
|
.with_context(|| format!("Failed to create {}", parent.display()))?;
|
|
}
|
|
let content = serde_json::to_string_pretty(value)?;
|
|
let tmp = path.with_extension("json.tmp");
|
|
fs::write(&tmp, content).with_context(|| format!("Failed to write {}", tmp.display()))?;
|
|
fs::rename(&tmp, path).with_context(|| {
|
|
format!(
|
|
"Failed to move temporary file {} to {}",
|
|
tmp.display(),
|
|
path.display()
|
|
)
|
|
})?;
|
|
Ok(())
|
|
}
|
|
|
|
pub fn default_automations_dir() -> PathBuf {
|
|
if let Ok(path) = std::env::var("DEEPSEEK_AUTOMATIONS_DIR") {
|
|
let trimmed = path.trim();
|
|
if !trimmed.is_empty() {
|
|
return PathBuf::from(trimmed);
|
|
}
|
|
}
|
|
dirs::home_dir()
|
|
.map(|home| home.join(".deepseek").join("automations"))
|
|
.unwrap_or_else(|| PathBuf::from(".deepseek").join("automations"))
|
|
}
|
|
|
|
pub type SharedAutomationManager = Arc<Mutex<AutomationManager>>;
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct AutomationSchedulerConfig {
|
|
pub tick_interval_secs: u64,
|
|
}
|
|
|
|
impl Default for AutomationSchedulerConfig {
|
|
fn default() -> Self {
|
|
Self {
|
|
tick_interval_secs: 15,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn spawn_scheduler(
|
|
automations: SharedAutomationManager,
|
|
task_manager: SharedTaskManager,
|
|
cancel: CancellationToken,
|
|
config: AutomationSchedulerConfig,
|
|
) -> tokio::task::JoinHandle<()> {
|
|
tokio::spawn(async move {
|
|
let interval = config.tick_interval_secs.max(5);
|
|
loop {
|
|
if cancel.is_cancelled() {
|
|
break;
|
|
}
|
|
|
|
{
|
|
let manager = automations.lock().await;
|
|
if let Err(err) = manager.scheduler_tick(&task_manager).await {
|
|
tracing::warn!("automation scheduler tick failed: {err}");
|
|
}
|
|
if let Err(err) = manager.reconcile_run_statuses(&task_manager).await {
|
|
tracing::warn!("automation reconcile failed: {err}");
|
|
}
|
|
}
|
|
|
|
tokio::select! {
|
|
_ = cancel.cancelled() => break,
|
|
_ = sleep(std::time::Duration::from_secs(interval)) => {}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn parses_hourly_rrule() {
|
|
let parsed =
|
|
AutomationSchedule::parse_rrule("FREQ=HOURLY;INTERVAL=2;BYDAY=MO,TU").expect("parse");
|
|
match parsed {
|
|
AutomationSchedule::Hourly {
|
|
interval_hours,
|
|
byday,
|
|
} => {
|
|
assert_eq!(interval_hours, 2);
|
|
assert_eq!(byday.expect("byday").len(), 2);
|
|
}
|
|
_ => panic!("expected hourly"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn parses_weekly_rrule() {
|
|
let parsed =
|
|
AutomationSchedule::parse_rrule("FREQ=WEEKLY;BYDAY=MO,WE;BYHOUR=9;BYMINUTE=30")
|
|
.expect("parse");
|
|
match parsed {
|
|
AutomationSchedule::Weekly {
|
|
byday,
|
|
byhour,
|
|
byminute,
|
|
} => {
|
|
assert_eq!(byday.len(), 2);
|
|
assert_eq!(byhour, 9);
|
|
assert_eq!(byminute, 30);
|
|
}
|
|
_ => panic!("expected weekly"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn rejects_invalid_rrule_fields() {
|
|
let err =
|
|
AutomationSchedule::parse_rrule("FREQ=WEEKLY;BYSECOND=5").expect_err("should fail");
|
|
assert!(err.to_string().contains("Unsupported RRULE field"));
|
|
}
|
|
|
|
#[test]
|
|
fn deletes_automation_and_runs() {
|
|
let tempdir = tempfile::tempdir().expect("tempdir");
|
|
let manager = AutomationManager::open(tempdir.path().to_path_buf()).expect("manager");
|
|
|
|
let created = manager
|
|
.create_automation(CreateAutomationRequest {
|
|
name: "Delete me".to_string(),
|
|
prompt: "prompt".to_string(),
|
|
rrule: "FREQ=HOURLY;INTERVAL=1".to_string(),
|
|
cwds: Vec::new(),
|
|
status: Some(AutomationStatus::Active),
|
|
})
|
|
.expect("create");
|
|
|
|
let run = AutomationRunRecord {
|
|
schema_version: CURRENT_RUN_SCHEMA_VERSION,
|
|
id: Uuid::new_v4().to_string(),
|
|
automation_id: created.id.clone(),
|
|
scheduled_for: Utc::now(),
|
|
status: AutomationRunStatus::Queued,
|
|
created_at: Utc::now(),
|
|
started_at: None,
|
|
ended_at: None,
|
|
task_id: None,
|
|
thread_id: None,
|
|
turn_id: None,
|
|
error: None,
|
|
};
|
|
manager.save_run(&run).expect("save run");
|
|
assert!(manager.runs_dir_for(&created.id).exists());
|
|
|
|
manager
|
|
.delete_automation(&created.id)
|
|
.expect("delete automation");
|
|
|
|
assert!(manager.get_automation(&created.id).is_err());
|
|
assert!(!manager.runs_dir_for(&created.id).exists());
|
|
}
|
|
}
|