commit c21cf2a12dd1501da9a314d730133b45cd7da728
Author: Coder Agent <coder@agents.omni>
Date: Tue Apr 7 22:25:52 2026
agentd: run persistent agents via agent stdin mode
Swap daemon-managed persistent process spawning from pi RPC to agent
stdin prompt mode.
Spawn command now starts `agent --provider ... --model ... --json`
with no prompt arg. Prompt delivery uses NUL-delimited stdin framing.
Stop now sends SIGTERM and closes stdin for graceful shutdown.
Send writes prompt bytes followed by NUL and flushes.
Stdout handling now parses Trace.Event JSONL and updates daemon state,
summary, and cost from infer/complete events.
HTTP /agents, /agents/:id/send, and /agents/:id/stop now use the
in-process persistent runner map so POST flows exercise the new agent
runner directly.
Task-Id: t-759.3
diff --git a/Omni/Agentd/Daemon.hs b/Omni/Agentd/Daemon.hs
index 8078337a..0563d0d8 100644
--- a/Omni/Agentd/Daemon.hs
+++ b/Omni/Agentd/Daemon.hs
@@ -98,6 +98,7 @@ import qualified Network.Socket as Socket
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
import qualified Omni.Agent.Models as Models
+import qualified Omni.Agent.Trace as Trace
import qualified Omni.Agents.Summarize as Summarize
import qualified Omni.Test as Test
import Servant
@@ -121,7 +122,7 @@ data DaemonConfig = DaemonConfig
dcDbPath :: FilePath,
dcLogRoot :: FilePath,
dcWorkspace :: FilePath,
- dcPiPath :: FilePath -- Path to pi executable
+ dcPiPath :: FilePath -- Path to agent executable (legacy field name)
}
deriving (Show, Eq)
@@ -136,7 +137,7 @@ defaultDaemonConfig =
dcDbPath = "/var/lib/omni/agentd.db",
dcLogRoot = "/var/log/agentd",
dcWorkspace = "/var/agentd/workspaces",
- dcPiPath = "pi" -- Use PATH lookup by default
+ dcPiPath = "agent" -- Use PATH lookup by default
}
defaultDataRoot :: IO FilePath
@@ -1346,15 +1347,16 @@ validateWebhookUrl (Just url)
| Text.isPrefixOf "http://" url || Text.isPrefixOf "https://" url = Right (Just url)
| otherwise = Left ("Invalid webhook URL: must start with http:// or https://, got: " <> Text.take 100 url)
--- | Spawn a pi agent directly (no pi_rpc.py wrapper)
+-- | Spawn an agent process directly in stdin prompt mode.
spawnPiAgent :: DaemonState -> SpawnRequest -> IO Text
spawnPiAgent state req = do
runId <- newRunId (spawnName req)
let conn = dsDbConn state
config = dsConfig state
logRoot = dcLogRoot config
+ provider = fromMaybe "auto" (spawnProvider req)
model = fromMaybe defaultModel (spawnModel req)
- piPath = dcPiPath config
+ agentPath = dcPiPath config
-- Validate webhook URL before proceeding (t-563)
case validateWebhookUrl (spawnWebhook req) of
@@ -1363,7 +1365,7 @@ spawnPiAgent state req = do
now <- Time.getCurrentTime
updateAgentCompleted conn runId now (Just err) Nothing Nothing
pure runId
- Right webhookUrl -> spawnPiAgentInner state req runId conn config logRoot model piPath webhookUrl
+ Right webhookUrl -> spawnPiAgentInner state req runId conn config logRoot provider model agentPath webhookUrl
-- | Inner spawn logic after validation
spawnPiAgentInner ::
@@ -1374,16 +1376,17 @@ spawnPiAgentInner ::
DaemonConfig ->
FilePath ->
Text ->
+ Text ->
FilePath ->
Maybe Text ->
IO Text
-spawnPiAgentInner state req runId conn config logRoot model piPath webhookUrl = do
+spawnPiAgentInner state req runId conn config logRoot provider model agentPath webhookUrl = do
-- Create log directory for this run
let runDir = logRoot </> Text.unpack runId
Dir.createDirectoryIfMissing True runDir
- -- Log file for pi output
- let logFile = runDir </> "pi.log"
+ -- Log file for agent output
+ let logFile = runDir </> "agent.log"
workspaceResult <- resolveWorkspace config req runId
case workspaceResult of
@@ -1411,12 +1414,12 @@ spawnPiAgentInner state req runId conn config logRoot model piPath webhookUrl =
(Just sizeBytes)
(wsManaged selection)
- -- Build pi command: pi --mode rpc --model <model>
- let args = ["--mode", "rpc", "--model", Text.unpack model]
+ -- Build agent command: agent --provider <provider> --model <model> --json
+ let args = ["--provider", Text.unpack provider, "--model", Text.unpack model, "--json"]
- -- Spawn pi process with stdin/stdout/stderr pipes
+ -- Spawn agent process with stdin/stdout/stderr pipes
let procSpec =
- (Process.proc piPath args)
+ (Process.proc agentPath args)
{ Process.cwd = Just workspace,
Process.std_in = Process.CreatePipe,
Process.std_out = Process.CreatePipe,
@@ -1449,11 +1452,11 @@ spawnPiAgentInner state req runId conn config logRoot model piPath webhookUrl =
-- Start output reader thread
readerAsync <-
Async.async
- (readPiOutput state runId stdoutH logFile liveStatusVar webhookUrl)
+ (readAgentOutput state runId stdoutH logFile liveStatusVar webhookUrl)
stderrAsync <- Async.async (readPiStderr logFile stderrH)
-- Start monitor thread that watches for process exit
- monitorAsync <- Async.async <| monitorPiAgent state runId ph readerAsync stderrAsync webhookUrl mRepoRoot workspace
+ monitorAsync <- Async.async <| monitorAgentProcess state runId ph readerAsync stderrAsync webhookUrl mRepoRoot workspace
-- Add to running map
STM.atomically
@@ -1476,21 +1479,15 @@ spawnPiAgentInner state req runId conn config logRoot model piPath webhookUrl =
forM_ (spawnPrompt req) <| \initialPrompt -> insertMessage conn runId "user" initialPrompt
-- Send the initial prompt to the agent
- forM_ (spawnPrompt req) <| \initialPrompt -> do
- let promptCmd =
- Aeson.object
- [ "type" .= ("prompt" :: Text),
- "message" .= initialPrompt
- ]
- sendToPiProcess stdinH promptCmd
+ forM_ (spawnPrompt req) <| \initialPrompt -> sendPromptToAgent stdinH initialPrompt
pure runId
--- | Send a JSON command to pi's stdin
-sendToPiProcess :: IO.Handle -> Aeson.Value -> IO ()
-sendToPiProcess h cmd = do
- let line = BL.toStrict (Aeson.encode cmd)
- TextIO.hPutStrLn h (decodeUtf8 line)
+-- | Send a prompt to agent stdin using NUL framing.
+sendPromptToAgent :: IO.Handle -> Text -> IO ()
+sendPromptToAgent h prompt = do
+ TextIO.hPutStr h prompt
+ IO.hPutChar h '\NUL'
IO.hFlush h
-- | Prepare workspace with optional git worktree
@@ -1568,11 +1565,11 @@ updateWorkspaceSizeFromAgent conn runId = do
sizeBytes <- calculateWorkspaceSize (Text.unpack (wrPath workspace))
updateWorkspaceSize conn runId sizeBytes
--- | Read pi output, update status, log events, send webhooks.
+-- | Read agent output, update status, log events, send webhooks.
-- Appends each line to the log file by path (not a persistent handle)
-- to avoid the handle-closed bug (t-695).
-readPiOutput :: DaemonState -> Text -> IO.Handle -> FilePath -> TVar AgentStatus -> Maybe Text -> IO ()
-readPiOutput state runId stdoutH logPath statusVar webhookUrl = do
+readAgentOutput :: DaemonState -> Text -> IO.Handle -> FilePath -> TVar AgentStatus -> Maybe Text -> IO ()
+readAgentOutput state runId stdoutH logPath statusVar webhookUrl = do
loop Nothing 0
where
loop :: Maybe Text -> Int -> IO ()
@@ -1582,7 +1579,7 @@ readPiOutput state runId stdoutH logPath statusVar webhookUrl = do
Left err
| IOError.isEOFError err -> pure ()
| otherwise -> do
- TextIO.hPutStrLn IO.stderr <| "Failed to read pi output: " <> tshow err
+ TextIO.hPutStrLn IO.stderr <| "Failed to read agent output: " <> tshow err
pure ()
Right lineBytes -> do
let line = TextEncoding.decodeUtf8With TextEncodingError.lenientDecode lineBytes
@@ -1592,17 +1589,20 @@ readPiOutput state runId stdoutH logPath statusVar webhookUrl = do
-- Parse and process event
case Aeson.decode (BL.fromStrict (encodeUtf8 line)) :: Maybe Aeson.Value of
Nothing -> loop lastSummary totalCostCents
- Just event -> do
- processResult <-
- Exception.try @SomeException
- <| processEvent state runId statusVar webhookUrl event lastSummary totalCostCents
- case processResult of
- Left err -> do
- TextIO.hPutStrLn IO.stderr <| "Failed to process pi event: " <> tshow err
- loop lastSummary totalCostCents
- Right (newSummary, newCost) -> loop newSummary newCost
-
--- | Read pi stderr and append to log file.
+ Just payload ->
+ case parseTraceEvent payload of
+ Nothing -> loop lastSummary totalCostCents
+ Just event -> do
+ processResult <-
+ Exception.try @SomeException
+ <| processTraceEvent state runId statusVar webhookUrl event lastSummary totalCostCents
+ case processResult of
+ Left err -> do
+ TextIO.hPutStrLn IO.stderr <| "Failed to process agent event: " <> tshow err
+ loop lastSummary totalCostCents
+ Right (newSummary, newCost) -> loop newSummary newCost
+
+-- | Read agent stderr and append to log file.
readPiStderr :: FilePath -> IO.Handle -> IO ()
readPiStderr logPath stderrH = do
loop
@@ -1613,137 +1613,93 @@ readPiStderr logPath stderrH = do
Left err
| IOError.isEOFError err -> pure ()
| otherwise -> do
- TextIO.hPutStrLn IO.stderr <| "Failed to read pi stderr: " <> tshow err
+ TextIO.hPutStrLn IO.stderr <| "Failed to read agent stderr: " <> tshow err
pure ()
Right lineBytes -> do
let line = TextEncoding.decodeUtf8With TextEncodingError.lenientDecode lineBytes
_ <- Exception.try @SomeException <| TextIO.appendFile logPath ("[stderr] " <> line <> "\n")
loop
--- | Process a single pi event
-processEvent ::
+-- | Parse raw JSON line into a Trace.Event.
+parseTraceEvent :: Aeson.Value -> Maybe Trace.Event
+parseTraceEvent value =
+ case Aeson.fromJSON value of
+ Aeson.Success ev -> Just ev
+ Aeson.Error _ -> case value of
+ Aeson.Object obj -> case KeyMap.lookup "event" obj of
+ Just eventVal -> case Aeson.fromJSON eventVal of
+ Aeson.Success ev -> Just ev
+ Aeson.Error _ -> Nothing
+ Nothing -> Nothing
+ _ -> Nothing
+
+-- | Process a single trace event.
+processTraceEvent ::
DaemonState ->
Text ->
TVar AgentStatus ->
Maybe Text ->
- Aeson.Value ->
+ Trace.Event ->
Maybe Text ->
Int ->
IO (Maybe Text, Int)
-processEvent state runId statusVar webhookUrl event lastSummary totalCostCents = do
+processTraceEvent state runId statusVar webhookUrl event lastSummary totalCostCents =
case event of
- Aeson.Object obj -> do
- let eventType = KeyMap.lookup "type" obj
- case eventType of
- Just (Aeson.String "agent_start") -> do
- STM.atomically <| STM.writeTVar statusVar StatusRunning
- pure (lastSummary, totalCostCents)
- Just (Aeson.String "turn_start") -> do
- STM.atomically <| STM.writeTVar statusVar StatusRunning
- pure (lastSummary, totalCostCents)
- Just (Aeson.String "turn_end") -> do
- STM.atomically <| STM.writeTVar statusVar StatusIdle
- pure (lastSummary, totalCostCents)
- Just (Aeson.String "agent_end") -> do
- -- Extract summary from last assistant message
- let summary = extractSummary obj
- cost = extractCost obj
- newCost = totalCostCents + cost
- now <- Time.getCurrentTime
- -- Update DB with latest summary/cost and mark idle
- updateAgentIdle (dsDbConn state) runId summary (Just newCost)
- updateWorkspaceStatus (dsDbConn state) runId "idle"
- updateWorkspaceAccessedAt (dsDbConn state) runId now
- updateWorkspaceSizeFromAgent (dsDbConn state) runId
- -- Record assistant response in conversation log
- forM_ summary <| \s ->
- insertMessage (dsDbConn state) runId "assistant" s
- -- Send webhook for idle state
- sendWebhook webhookUrl runId "idle" summary newCost Nothing
- STM.atomically <| STM.writeTVar statusVar StatusIdle
- -- Generate LLM title on first agent_end (async, don't block)
- mInfo <- getAgent (dsDbConn state) runId
- case mInfo of
- Just info | case aiTitle info of Nothing -> True; Just _ -> False -> do
- let prompt = aiPrompt info
- toolSummary = fromMaybe "" summary
- _ <- Async.async <| generateAndStoreTitle (dsDbConn state) runId prompt toolSummary
- pure ()
- _ -> pure ()
- pure (summary, newCost)
- Just (Aeson.String "message_end") -> do
- -- Extract summary/cost from message
- let cost = extractMessageCost obj
- newCost = totalCostCents + cost
- newSummary = extractSummaryFromMessageEnd obj <|> lastSummary
- updateAgentSummary (dsDbConn state) runId newSummary (Just newCost)
- pure (newSummary, newCost)
- Just (Aeson.String "extension_error") -> do
- let errMsg = case KeyMap.lookup "error" obj of
- Just (Aeson.String e) -> Just e
- _ -> Just "Extension error"
- sendWebhook webhookUrl runId "error" Nothing totalCostCents errMsg
- pure (lastSummary, totalCostCents)
- _ -> pure (lastSummary, totalCostCents)
+ Trace.EventCustom "agent_start" _ _ -> do
+ STM.atomically <| STM.writeTVar statusVar StatusRunning
+ pure (lastSummary, totalCostCents)
+ Trace.EventInferStart {} -> do
+ STM.atomically <| STM.writeTVar statusVar StatusRunning
+ pure (lastSummary, totalCostCents)
+ Trace.EventToolCall {} -> do
+ STM.atomically <| STM.writeTVar statusVar StatusRunning
+ pure (lastSummary, totalCostCents)
+ Trace.EventInferEnd _ _ costCents _ _ _ -> do
+ let newCost = totalCostCents + round costCents
+ updateAgentSummary (dsDbConn state) runId lastSummary (Just newCost)
+ pure (lastSummary, newCost)
+ Trace.EventCustom "agent_complete" payload _ -> do
+ let summary = extractSummaryFromAgentComplete payload <|> lastSummary
+ now <- Time.getCurrentTime
+ updateAgentIdle (dsDbConn state) runId summary (Just totalCostCents)
+ updateWorkspaceStatus (dsDbConn state) runId "idle"
+ updateWorkspaceAccessedAt (dsDbConn state) runId now
+ updateWorkspaceSizeFromAgent (dsDbConn state) runId
+ forM_ summary <| \s -> insertMessage (dsDbConn state) runId "assistant" s
+ sendWebhook webhookUrl runId "idle" summary totalCostCents Nothing
+ STM.atomically <| STM.writeTVar statusVar StatusIdle
+ mInfo <- getAgent (dsDbConn state) runId
+ case mInfo of
+ Just info | case aiTitle info of Nothing -> True; Just _ -> False -> do
+ let prompt = aiPrompt info
+ toolSummary = fromMaybe "" summary
+ _ <- Async.async <| generateAndStoreTitle (dsDbConn state) runId prompt toolSummary
+ pure ()
+ _ -> pure ()
+ pure (summary, totalCostCents)
+ Trace.EventCustom "agent_error" payload _ -> do
+ let errMsg = extractErrorFromCustom payload
+ STM.atomically <| STM.writeTVar statusVar StatusIdle
+ updateAgentStatus (dsDbConn state) runId "idle"
+ case errMsg of
+ Just "Cancelled by user" -> pure ()
+ _ -> sendWebhook webhookUrl runId "error" Nothing totalCostCents errMsg
+ pure (lastSummary, totalCostCents)
_ -> pure (lastSummary, totalCostCents)
--- | Extract summary from agent_end event (last assistant message text)
-extractSummary :: Aeson.Object -> Maybe Text
-extractSummary obj = do
- Aeson.Array messages <- KeyMap.lookup "messages" obj
- -- Find last assistant message
- let assistantMsgs = filter isAssistant (toList messages)
- lastMsg <- listToMaybe (reverse assistantMsgs)
- extractMessageText lastMsg
- where
- isAssistant (Aeson.Object o) = KeyMap.lookup "role" o == Just (Aeson.String "assistant")
- isAssistant _ = False
-
--- | Extract summary from a message_end event
-extractSummaryFromMessageEnd :: Aeson.Object -> Maybe Text
-extractSummaryFromMessageEnd obj = do
- message <- KeyMap.lookup "message" obj
- extractMessageText message
-
-extractMessageText :: Aeson.Value -> Maybe Text
-extractMessageText (Aeson.Object o) = do
- Aeson.Array content <- KeyMap.lookup "content" o
- -- Find first text block
- let textBlocks = filter isTextBlock (toList content)
- case textBlocks of
- (Aeson.Object tb : _) -> do
- Aeson.String txt <- KeyMap.lookup "text" tb
- pure <| Text.take 200 txt
+extractSummaryFromAgentComplete :: Aeson.Value -> Maybe Text
+extractSummaryFromAgentComplete (Aeson.Object obj) =
+ case KeyMap.lookup "response" obj <|> KeyMap.lookup "message" obj <|> KeyMap.lookup "content" obj of
+ Just (Aeson.String txt) -> Just (Text.take 200 txt)
_ -> Nothing
-extractMessageText _ = Nothing
-
-isTextBlock :: Aeson.Value -> Bool
-isTextBlock (Aeson.Object o) = KeyMap.lookup "type" o == Just (Aeson.String "text")
-isTextBlock _ = False
-
--- | Extract total cost from agent_end messages
-extractCost :: Aeson.Object -> Int
-extractCost obj = case KeyMap.lookup "messages" obj of
- Just (Aeson.Array messages) ->
- sum <| map extractMessageCostFromValue (toList messages)
- _ -> 0
-
--- | Extract cost from a message_end event
-extractMessageCost :: Aeson.Object -> Int
-extractMessageCost obj = case KeyMap.lookup "message" obj of
- Just val -> extractMessageCostFromValue val
- _ -> 0
-
--- | Extract cost in cents from a message value
-extractMessageCostFromValue :: Aeson.Value -> Int
-extractMessageCostFromValue (Aeson.Object msg) = case KeyMap.lookup "usage" msg of
- Just (Aeson.Object usage) -> case KeyMap.lookup "cost" usage of
- Just (Aeson.Object cost) -> case KeyMap.lookup "total" cost of
- Just (Aeson.Number n) -> round (n * 100) -- Convert dollars to cents
- _ -> 0
- _ -> 0
- _ -> 0
-extractMessageCostFromValue _ = 0
+extractSummaryFromAgentComplete _ = Nothing
+
+extractErrorFromCustom :: Aeson.Value -> Maybe Text
+extractErrorFromCustom (Aeson.Object obj) =
+ case KeyMap.lookup "message" obj <|> KeyMap.lookup "error" obj <|> KeyMap.lookup "content" obj of
+ Just (Aeson.String txt) -> Just txt
+ _ -> Nothing
+extractErrorFromCustom _ = Nothing
-- | Send webhook notification
sendWebhook :: Maybe Text -> Text -> Text -> Maybe Text -> Int -> Maybe Text -> IO ()
@@ -1778,8 +1734,8 @@ sendWebhook (Just url) runId event summary costCents mError = do
Right () -> pure ()
pure ()
--- | Monitor a pi agent process and update DB when it exits
-monitorPiAgent ::
+-- | Monitor an agent process and update DB when it exits
+monitorAgentProcess ::
DaemonState ->
Text ->
Process.ProcessHandle ->
@@ -1789,7 +1745,7 @@ monitorPiAgent ::
Maybe FilePath ->
FilePath ->
IO ()
-monitorPiAgent state runId ph readerAsync stderrAsync webhookUrl _mRepoRoot worktreePath = do
+monitorAgentProcess state runId ph readerAsync stderrAsync webhookUrl _mRepoRoot worktreePath = do
exitCode <- Process.waitForProcess ph
-- Wait for readers to finish processing remaining output
_ <- Async.waitCatch readerAsync
@@ -1822,56 +1778,46 @@ monitorPiAgent state runId ph readerAsync stderrAsync webhookUrl _mRepoRoot work
-- Remove from running map
STM.atomically <| STM.modifyTVar' (dsRunning state) (Map.delete runId)
--- | Stop a running agent process
+-- | Stop a running agent process.
stopAgentProcess :: DaemonState -> Text -> IO Bool
stopAgentProcess state runId = do
running <- STM.atomically <| STM.readTVar (dsRunning state)
case Map.lookup runId running of
Nothing -> pure False
Just ra -> do
- -- Send quit command first
- let quitCmd = Aeson.object ["type" .= ("quit" :: Text)]
- _ <- try @SomeException <| sendToPiProcess (raStdinHandle ra) quitCmd
- -- Wait a bit then force terminate if still running
+ mPid <- Process.getPid (raProcessHandle ra)
+ _ <- try @SomeException <| forM_ mPid (Signals.signalProcess Signals.sigTERM)
+ _ <- try @SomeException <| IO.hClose (raStdinHandle ra)
threadDelay 1000000 -- 1 second
mExitCode <- Process.getProcessExitCode (raProcessHandle ra)
case mExitCode of
- Just _ -> pure () -- Already exited
- Nothing -> do
- -- Force terminate
- Process.terminateProcess (raProcessHandle ra)
- -- Cancel monitor thread
+ Just _ -> pure ()
+ Nothing -> Process.terminateProcess (raProcessHandle ra)
Async.cancel (raMonitorAsync ra)
- -- Update DB
updateAgentStatus (dsDbConn state) runId "stopped"
now <- Time.getCurrentTime
updateWorkspaceStatus (dsDbConn state) runId "stopped"
updateWorkspaceAccessedAt (dsDbConn state) runId now
sizeBytes <- calculateWorkspaceSize (raWorktreePath ra)
updateWorkspaceSize (dsDbConn state) runId sizeBytes
- -- Send webhook
sendWebhook (raWebhookUrl ra) runId "stopped" Nothing 0 Nothing
- -- Don't delete worktree on stop — user may want to review/resume.
- -- Remove from running map
STM.atomically <| STM.modifyTVar' (dsRunning state) (Map.delete runId)
pure True
--- | Send message to a running agent
+-- | Send message to a running agent.
sendToAgent :: DaemonState -> Text -> Text -> IO Bool
sendToAgent state runId message = do
running <- STM.atomically <| STM.readTVar (dsRunning state)
case Map.lookup runId running of
Nothing -> pure False
Just ra -> do
- let promptCmd =
- Aeson.object
- [ "type" .= ("prompt" :: Text),
- "message" .= message
- ]
- result <- try @SomeException <| sendToPiProcess (raStdinHandle ra) promptCmd
+ result <- try @SomeException <| sendPromptToAgent (raStdinHandle ra) message
case result of
Left _ -> pure False
- Right _ -> pure True
+ Right _ -> do
+ insertMessage (dsDbConn state) runId "user" message
+ updateAgentStatus (dsDbConn state) runId "running"
+ pure True
-- | Get live status of a running agent
getLiveStatus :: DaemonState -> Text -> IO (Maybe AgentStatus)
@@ -1896,45 +1842,8 @@ server state =
spawnHandler :: DaemonState -> SpawnRequest -> Handler SpawnResponse
spawnHandler state req = do
- case validateWebhookUrl (spawnWebhook req) of
- Left err -> throwError err400 {errBody = BL.fromStrict (encodeUtf8 err)}
- Right _ -> do
- runId <- liftIO <| newRunId (spawnName req)
- cwdText <- case spawnCwd req <|> spawnWorkspace req of
- Just cwd -> pure cwd
- Nothing -> throwError err400 {errBody = "Missing required field: cwd"}
- absCwd <- liftIO <| Dir.makeAbsolute (Text.unpack cwdText)
- let cfg =
- AgentConfig
- { acName = runId,
- acProvider = fromMaybe "anthropic" (spawnProvider req),
- acModel = fromMaybe defaultModel (spawnModel req),
- acCwd = absCwd,
- acThinking = normalizeThinking (fromMaybe "high" (spawnThinking req)),
- acExtraArgs = spawnExtraArgs req,
- acExtraEnv = Map.empty
- }
- dbPath = Just (dcDbPath (dsConfig state))
- created <- liftIO <| createAgent dbPath cfg
- case created of
- Left err -> throwError err400 {errBody = BL.fromStrict (encodeUtf8 err)}
- Right _ -> do
- started <- liftIO <| startPersistentAgent dbPath runId
- case started of
- Left err -> throwError err500 {errBody = BL.fromStrict (encodeUtf8 err)}
- Right _ -> do
- case spawnPrompt req of
- Just rawPrompt -> do
- let promptText = Text.strip rawPrompt
- if Text.null promptText
- then pure ()
- else do
- sendResult <- liftIO <| sendPersistentAgent dbPath runId promptText
- case sendResult of
- Left err -> throwError err500 {errBody = BL.fromStrict (encodeUtf8 err)}
- Right () -> pure ()
- Nothing -> pure ()
- pure <| SpawnResponse runId "started"
+ runId <- liftIO <| spawnPiAgent state req
+ pure <| SpawnResponse runId "started"
listHandler :: DaemonState -> Handler [AgentInfo]
listHandler state = do
@@ -1954,19 +1863,17 @@ statusHandler state runId = do
sendHandler :: DaemonState -> Text -> SendRequest -> Handler Aeson.Value
sendHandler state runId req = do
- let dbPath = Just (dcDbPath (dsConfig state))
- result <- liftIO <| sendPersistentAgent dbPath runId (sendMessage req)
- case result of
- Left err -> throwError err404 {errBody = BL.fromStrict (encodeUtf8 err)}
- Right () -> pure <| Aeson.object ["status" .= ("sent" :: Text)]
+ sent <- liftIO <| sendToAgent state runId (sendMessage req)
+ if sent
+ then pure <| Aeson.object ["status" .= ("sent" :: Text)]
+ else throwError err404 {errBody = "Agent not found"}
stopHandler :: DaemonState -> Text -> Handler Aeson.Value
stopHandler state runId = do
- let dbPath = Just (dcDbPath (dsConfig state))
- result <- liftIO <| stopPersistentAgent dbPath runId
- case result of
- Left err -> throwError err404 {errBody = BL.fromStrict (encodeUtf8 err)}
- Right _ -> pure <| Aeson.object ["status" .= ("stopped" :: Text)]
+ stopped <- liftIO <| stopAgentProcess state runId
+ if stopped
+ then pure <| Aeson.object ["status" .= ("stopped" :: Text)]
+ else throwError err404 {errBody = "Agent not found"}
deleteHandler :: DaemonState -> Text -> Handler Aeson.Value
deleteHandler state runId = do
@@ -2047,7 +1954,7 @@ runDaemon config = do
putText <| " Database: " <> Text.pack (dcDbPath config)
putText <| " Log root: " <> Text.pack (dcLogRoot config)
putText <| " Workspace: " <> Text.pack (dcWorkspace config)
- putText <| " Pi path: " <> Text.pack (dcPiPath config)
+ putText <| " Agent path (--pi-path): " <> Text.pack (dcPiPath config)
Warp.run (dcPort config) (serve agentAPI (server state))
-- | Graceful shutdown: stop all running agents, close handles (t-564)
@@ -2447,32 +2354,26 @@ mockPiScript =
[ "#!/usr/bin/env bash",
"set -euo pipefail",
"",
+ "ts='2026-01-01T00:00:00Z'",
"emit() {",
" printf '%s\\n' \"$1\"",
"}",
"",
- "while IFS= read -r line; do",
- " if [ -z \"$line\" ]; then",
+ "while IFS= read -r -d '' prompt; do",
+ " if [ -z \"$prompt\" ]; then",
" continue",
" fi",
- " case \"$line\" in",
- " *'\"type\":\"quit\"'* ) exit 0 ;;",
- " *'\"type\":\"abort\"'* ) exit 1 ;;",
- " esac",
- " case \"$line\" in",
- " *'\"type\":\"prompt\"'* ) ;;",
- " * ) continue ;;",
- " esac",
- " message=$(printf '%s' \"$line\" | sed -n 's/.*\"message\"[ ]*:[ ]*\"\\([^\"]*\\)\".*/\\1/p')",
- " if printf '%s' \"$message\" | grep -q 'FAIL'; then",
- " emit '{\"type\":\"agent_start\"}'",
- " exit 1",
+ "",
+ " emit \"{\\\"type\\\":\\\"custom\\\",\\\"custom_type\\\":\\\"agent_start\\\",\\\"data\\\":{},\\\"timestamp\\\":\\\"${ts}\\\"}\"",
+ "",
+ " if printf '%s' \"$prompt\" | grep -q 'FAIL'; then",
+ " emit \"{\\\"type\\\":\\\"custom\\\",\\\"custom_type\\\":\\\"agent_error\\\",\\\"data\\\":{\\\"message\\\":\\\"Mock failure\\\"},\\\"timestamp\\\":\\\"${ts}\\\"}\"",
+ " continue",
" fi",
- " emit '{\"type\":\"agent_start\"}'",
- " emit '{\"type\":\"turn_start\"}'",
- " emit \"{\\\"type\\\":\\\"message_end\\\",\\\"message\\\":{\\\"role\\\":\\\"assistant\\\",\\\"content\\\":[{\\\"type\\\":\\\"text\\\",\\\"text\\\":\\\"Mock reply: ${message}\\\"}],\\\"usage\\\":{\\\"cost\\\":{\\\"total\\\":0.01}}}}\"",
- " emit '{\"type\":\"turn_end\"}'",
- " emit \"{\\\"type\\\":\\\"agent_end\\\",\\\"messages\\\":[{\\\"role\\\":\\\"user\\\",\\\"content\\\":[{\\\"type\\\":\\\"text\\\",\\\"text\\\":\\\"${message}\\\"}]},{\\\"role\\\":\\\"assistant\\\",\\\"content\\\":[{\\\"type\\\":\\\"text\\\",\\\"text\\\":\\\"Mock reply: ${message}\\\"}],\\\"usage\\\":{\\\"cost\\\":{\\\"total\\\":0.01}}}]}\"",
+ "",
+ " emit \"{\\\"type\\\":\\\"infer_start\\\",\\\"model\\\":\\\"mock-model\\\",\\\"prompt_preview\\\":\\\"${prompt}\\\",\\\"timestamp\\\":\\\"${ts}\\\",\\\"iteration\\\":0}\"",
+ " emit \"{\\\"type\\\":\\\"infer_end\\\",\\\"response_preview\\\":\\\"Mock reply: ${prompt}\\\",\\\"tokens\\\":42,\\\"cost_cents\\\":1.0,\\\"duration_ms\\\":5,\\\"timestamp\\\":\\\"${ts}\\\",\\\"iteration\\\":0}\"",
+ " emit \"{\\\"type\\\":\\\"custom\\\",\\\"custom_type\\\":\\\"agent_complete\\\",\\\"data\\\":{\\\"response\\\":\\\"Mock reply: ${prompt}\\\"},\\\"timestamp\\\":\\\"${ts}\\\"}\"",
"done"
]