← Back to task

Commit c21cf2a1

commit c21cf2a12dd1501da9a314d730133b45cd7da728
Author: Coder Agent <coder@agents.omni>
Date:   Tue Apr 7 22:25:52 2026

    agentd: run persistent agents via agent stdin mode
    
    Swap daemon-managed persistent process spawning from pi RPC to agent
    stdin prompt mode.
    
    Spawn command now starts `agent --provider ... --model ... --json`
    with no prompt arg. Prompt delivery uses NUL-delimited stdin framing.
    
    Stop now sends SIGTERM and closes stdin for graceful shutdown.
    Send writes prompt bytes followed by NUL and flushes.
    
    Stdout handling now parses Trace.Event JSONL and updates daemon state,
    summary, and cost from infer/complete events.
    
    HTTP /agents, /agents/:id/send, and /agents/:id/stop now use the
    in-process persistent runner map so POST flows exercise the new agent
    runner directly.
    
    Task-Id: t-759.3

diff --git a/Omni/Agentd/Daemon.hs b/Omni/Agentd/Daemon.hs
index 8078337a..0563d0d8 100644
--- a/Omni/Agentd/Daemon.hs
+++ b/Omni/Agentd/Daemon.hs
@@ -98,6 +98,7 @@ import qualified Network.Socket as Socket
 import qualified Network.Wai as Wai
 import qualified Network.Wai.Handler.Warp as Warp
 import qualified Omni.Agent.Models as Models
+import qualified Omni.Agent.Trace as Trace
 import qualified Omni.Agents.Summarize as Summarize
 import qualified Omni.Test as Test
 import Servant
@@ -121,7 +122,7 @@ data DaemonConfig = DaemonConfig
     dcDbPath :: FilePath,
     dcLogRoot :: FilePath,
     dcWorkspace :: FilePath,
-    dcPiPath :: FilePath -- Path to pi executable
+    dcPiPath :: FilePath -- Path to agent executable (legacy field name)
   }
   deriving (Show, Eq)
 
@@ -136,7 +137,7 @@ defaultDaemonConfig =
       dcDbPath = "/var/lib/omni/agentd.db",
       dcLogRoot = "/var/log/agentd",
       dcWorkspace = "/var/agentd/workspaces",
-      dcPiPath = "pi" -- Use PATH lookup by default
+      dcPiPath = "agent" -- Use PATH lookup by default
     }
 
 defaultDataRoot :: IO FilePath
@@ -1346,15 +1347,16 @@ validateWebhookUrl (Just url)
   | Text.isPrefixOf "http://" url || Text.isPrefixOf "https://" url = Right (Just url)
   | otherwise = Left ("Invalid webhook URL: must start with http:// or https://, got: " <> Text.take 100 url)
 
--- | Spawn a pi agent directly (no pi_rpc.py wrapper)
+-- | Spawn an agent process directly in stdin prompt mode.
 spawnPiAgent :: DaemonState -> SpawnRequest -> IO Text
 spawnPiAgent state req = do
   runId <- newRunId (spawnName req)
   let conn = dsDbConn state
       config = dsConfig state
       logRoot = dcLogRoot config
+      provider = fromMaybe "auto" (spawnProvider req)
       model = fromMaybe defaultModel (spawnModel req)
-      piPath = dcPiPath config
+      agentPath = dcPiPath config
 
   -- Validate webhook URL before proceeding (t-563)
   case validateWebhookUrl (spawnWebhook req) of
@@ -1363,7 +1365,7 @@ spawnPiAgent state req = do
       now <- Time.getCurrentTime
       updateAgentCompleted conn runId now (Just err) Nothing Nothing
       pure runId
-    Right webhookUrl -> spawnPiAgentInner state req runId conn config logRoot model piPath webhookUrl
+    Right webhookUrl -> spawnPiAgentInner state req runId conn config logRoot provider model agentPath webhookUrl
 
 -- | Inner spawn logic after validation
 spawnPiAgentInner ::
@@ -1374,16 +1376,17 @@ spawnPiAgentInner ::
   DaemonConfig ->
   FilePath ->
   Text ->
+  Text ->
   FilePath ->
   Maybe Text ->
   IO Text
-spawnPiAgentInner state req runId conn config logRoot model piPath webhookUrl = do
+spawnPiAgentInner state req runId conn config logRoot provider model agentPath webhookUrl = do
   -- Create log directory for this run
   let runDir = logRoot </> Text.unpack runId
   Dir.createDirectoryIfMissing True runDir
 
-  -- Log file for pi output
-  let logFile = runDir </> "pi.log"
+  -- Log file for agent output
+  let logFile = runDir </> "agent.log"
 
   workspaceResult <- resolveWorkspace config req runId
   case workspaceResult of
@@ -1411,12 +1414,12 @@ spawnPiAgentInner state req runId conn config logRoot model piPath webhookUrl =
         (Just sizeBytes)
         (wsManaged selection)
 
-      -- Build pi command: pi --mode rpc --model <model>
-      let args = ["--mode", "rpc", "--model", Text.unpack model]
+      -- Build agent command: agent --provider <provider> --model <model> --json
+      let args = ["--provider", Text.unpack provider, "--model", Text.unpack model, "--json"]
 
-      -- Spawn pi process with stdin/stdout/stderr pipes
+      -- Spawn agent process with stdin/stdout/stderr pipes
       let procSpec =
-            (Process.proc piPath args)
+            (Process.proc agentPath args)
               { Process.cwd = Just workspace,
                 Process.std_in = Process.CreatePipe,
                 Process.std_out = Process.CreatePipe,
@@ -1449,11 +1452,11 @@ spawnPiAgentInner state req runId conn config logRoot model piPath webhookUrl =
       -- Start output reader thread
       readerAsync <-
         Async.async
-          (readPiOutput state runId stdoutH logFile liveStatusVar webhookUrl)
+          (readAgentOutput state runId stdoutH logFile liveStatusVar webhookUrl)
       stderrAsync <- Async.async (readPiStderr logFile stderrH)
 
       -- Start monitor thread that watches for process exit
-      monitorAsync <- Async.async <| monitorPiAgent state runId ph readerAsync stderrAsync webhookUrl mRepoRoot workspace
+      monitorAsync <- Async.async <| monitorAgentProcess state runId ph readerAsync stderrAsync webhookUrl mRepoRoot workspace
 
       -- Add to running map
       STM.atomically
@@ -1476,21 +1479,15 @@ spawnPiAgentInner state req runId conn config logRoot model piPath webhookUrl =
       forM_ (spawnPrompt req) <| \initialPrompt -> insertMessage conn runId "user" initialPrompt
 
       -- Send the initial prompt to the agent
-      forM_ (spawnPrompt req) <| \initialPrompt -> do
-        let promptCmd =
-              Aeson.object
-                [ "type" .= ("prompt" :: Text),
-                  "message" .= initialPrompt
-                ]
-        sendToPiProcess stdinH promptCmd
+      forM_ (spawnPrompt req) <| \initialPrompt -> sendPromptToAgent stdinH initialPrompt
 
       pure runId
 
--- | Send a JSON command to pi's stdin
-sendToPiProcess :: IO.Handle -> Aeson.Value -> IO ()
-sendToPiProcess h cmd = do
-  let line = BL.toStrict (Aeson.encode cmd)
-  TextIO.hPutStrLn h (decodeUtf8 line)
+-- | Send a prompt to agent stdin using NUL framing.
+sendPromptToAgent :: IO.Handle -> Text -> IO ()
+sendPromptToAgent h prompt = do
+  TextIO.hPutStr h prompt
+  IO.hPutChar h '\NUL'
   IO.hFlush h
 
 -- | Prepare workspace with optional git worktree
@@ -1568,11 +1565,11 @@ updateWorkspaceSizeFromAgent conn runId = do
     sizeBytes <- calculateWorkspaceSize (Text.unpack (wrPath workspace))
     updateWorkspaceSize conn runId sizeBytes
 
--- | Read pi output, update status, log events, send webhooks.
+-- | Read agent output, update status, log events, send webhooks.
 -- Appends each line to the log file by path (not a persistent handle)
 -- to avoid the handle-closed bug (t-695).
-readPiOutput :: DaemonState -> Text -> IO.Handle -> FilePath -> TVar AgentStatus -> Maybe Text -> IO ()
-readPiOutput state runId stdoutH logPath statusVar webhookUrl = do
+readAgentOutput :: DaemonState -> Text -> IO.Handle -> FilePath -> TVar AgentStatus -> Maybe Text -> IO ()
+readAgentOutput state runId stdoutH logPath statusVar webhookUrl = do
   loop Nothing 0
   where
     loop :: Maybe Text -> Int -> IO ()
@@ -1582,7 +1579,7 @@ readPiOutput state runId stdoutH logPath statusVar webhookUrl = do
         Left err
           | IOError.isEOFError err -> pure ()
           | otherwise -> do
-              TextIO.hPutStrLn IO.stderr <| "Failed to read pi output: " <> tshow err
+              TextIO.hPutStrLn IO.stderr <| "Failed to read agent output: " <> tshow err
               pure ()
         Right lineBytes -> do
           let line = TextEncoding.decodeUtf8With TextEncodingError.lenientDecode lineBytes
@@ -1592,17 +1589,20 @@ readPiOutput state runId stdoutH logPath statusVar webhookUrl = do
           -- Parse and process event
           case Aeson.decode (BL.fromStrict (encodeUtf8 line)) :: Maybe Aeson.Value of
             Nothing -> loop lastSummary totalCostCents
-            Just event -> do
-              processResult <-
-                Exception.try @SomeException
-                  <| processEvent state runId statusVar webhookUrl event lastSummary totalCostCents
-              case processResult of
-                Left err -> do
-                  TextIO.hPutStrLn IO.stderr <| "Failed to process pi event: " <> tshow err
-                  loop lastSummary totalCostCents
-                Right (newSummary, newCost) -> loop newSummary newCost
-
--- | Read pi stderr and append to log file.
+            Just payload ->
+              case parseTraceEvent payload of
+                Nothing -> loop lastSummary totalCostCents
+                Just event -> do
+                  processResult <-
+                    Exception.try @SomeException
+                      <| processTraceEvent state runId statusVar webhookUrl event lastSummary totalCostCents
+                  case processResult of
+                    Left err -> do
+                      TextIO.hPutStrLn IO.stderr <| "Failed to process agent event: " <> tshow err
+                      loop lastSummary totalCostCents
+                    Right (newSummary, newCost) -> loop newSummary newCost
+
+-- | Read agent stderr and append to log file.
 readPiStderr :: FilePath -> IO.Handle -> IO ()
 readPiStderr logPath stderrH = do
   loop
@@ -1613,137 +1613,93 @@ readPiStderr logPath stderrH = do
         Left err
           | IOError.isEOFError err -> pure ()
           | otherwise -> do
-              TextIO.hPutStrLn IO.stderr <| "Failed to read pi stderr: " <> tshow err
+              TextIO.hPutStrLn IO.stderr <| "Failed to read agent stderr: " <> tshow err
               pure ()
         Right lineBytes -> do
           let line = TextEncoding.decodeUtf8With TextEncodingError.lenientDecode lineBytes
           _ <- Exception.try @SomeException <| TextIO.appendFile logPath ("[stderr] " <> line <> "\n")
           loop
 
--- | Process a single pi event
-processEvent ::
+-- | Parse raw JSON line into a Trace.Event.
+parseTraceEvent :: Aeson.Value -> Maybe Trace.Event
+parseTraceEvent value =
+  case Aeson.fromJSON value of
+    Aeson.Success ev -> Just ev
+    Aeson.Error _ -> case value of
+      Aeson.Object obj -> case KeyMap.lookup "event" obj of
+        Just eventVal -> case Aeson.fromJSON eventVal of
+          Aeson.Success ev -> Just ev
+          Aeson.Error _ -> Nothing
+        Nothing -> Nothing
+      _ -> Nothing
+
+-- | Process a single trace event.
+processTraceEvent ::
   DaemonState ->
   Text ->
   TVar AgentStatus ->
   Maybe Text ->
-  Aeson.Value ->
+  Trace.Event ->
   Maybe Text ->
   Int ->
   IO (Maybe Text, Int)
-processEvent state runId statusVar webhookUrl event lastSummary totalCostCents = do
+processTraceEvent state runId statusVar webhookUrl event lastSummary totalCostCents =
   case event of
-    Aeson.Object obj -> do
-      let eventType = KeyMap.lookup "type" obj
-      case eventType of
-        Just (Aeson.String "agent_start") -> do
-          STM.atomically <| STM.writeTVar statusVar StatusRunning
-          pure (lastSummary, totalCostCents)
-        Just (Aeson.String "turn_start") -> do
-          STM.atomically <| STM.writeTVar statusVar StatusRunning
-          pure (lastSummary, totalCostCents)
-        Just (Aeson.String "turn_end") -> do
-          STM.atomically <| STM.writeTVar statusVar StatusIdle
-          pure (lastSummary, totalCostCents)
-        Just (Aeson.String "agent_end") -> do
-          -- Extract summary from last assistant message
-          let summary = extractSummary obj
-              cost = extractCost obj
-              newCost = totalCostCents + cost
-          now <- Time.getCurrentTime
-          -- Update DB with latest summary/cost and mark idle
-          updateAgentIdle (dsDbConn state) runId summary (Just newCost)
-          updateWorkspaceStatus (dsDbConn state) runId "idle"
-          updateWorkspaceAccessedAt (dsDbConn state) runId now
-          updateWorkspaceSizeFromAgent (dsDbConn state) runId
-          -- Record assistant response in conversation log
-          forM_ summary <| \s ->
-            insertMessage (dsDbConn state) runId "assistant" s
-          -- Send webhook for idle state
-          sendWebhook webhookUrl runId "idle" summary newCost Nothing
-          STM.atomically <| STM.writeTVar statusVar StatusIdle
-          -- Generate LLM title on first agent_end (async, don't block)
-          mInfo <- getAgent (dsDbConn state) runId
-          case mInfo of
-            Just info | case aiTitle info of Nothing -> True; Just _ -> False -> do
-              let prompt = aiPrompt info
-                  toolSummary = fromMaybe "" summary
-              _ <- Async.async <| generateAndStoreTitle (dsDbConn state) runId prompt toolSummary
-              pure ()
-            _ -> pure ()
-          pure (summary, newCost)
-        Just (Aeson.String "message_end") -> do
-          -- Extract summary/cost from message
-          let cost = extractMessageCost obj
-              newCost = totalCostCents + cost
-              newSummary = extractSummaryFromMessageEnd obj <|> lastSummary
-          updateAgentSummary (dsDbConn state) runId newSummary (Just newCost)
-          pure (newSummary, newCost)
-        Just (Aeson.String "extension_error") -> do
-          let errMsg = case KeyMap.lookup "error" obj of
-                Just (Aeson.String e) -> Just e
-                _ -> Just "Extension error"
-          sendWebhook webhookUrl runId "error" Nothing totalCostCents errMsg
-          pure (lastSummary, totalCostCents)
-        _ -> pure (lastSummary, totalCostCents)
+    Trace.EventCustom "agent_start" _ _ -> do
+      STM.atomically <| STM.writeTVar statusVar StatusRunning
+      pure (lastSummary, totalCostCents)
+    Trace.EventInferStart {} -> do
+      STM.atomically <| STM.writeTVar statusVar StatusRunning
+      pure (lastSummary, totalCostCents)
+    Trace.EventToolCall {} -> do
+      STM.atomically <| STM.writeTVar statusVar StatusRunning
+      pure (lastSummary, totalCostCents)
+    Trace.EventInferEnd _ _ costCents _ _ _ -> do
+      let newCost = totalCostCents + round costCents
+      updateAgentSummary (dsDbConn state) runId lastSummary (Just newCost)
+      pure (lastSummary, newCost)
+    Trace.EventCustom "agent_complete" payload _ -> do
+      let summary = extractSummaryFromAgentComplete payload <|> lastSummary
+      now <- Time.getCurrentTime
+      updateAgentIdle (dsDbConn state) runId summary (Just totalCostCents)
+      updateWorkspaceStatus (dsDbConn state) runId "idle"
+      updateWorkspaceAccessedAt (dsDbConn state) runId now
+      updateWorkspaceSizeFromAgent (dsDbConn state) runId
+      forM_ summary <| \s -> insertMessage (dsDbConn state) runId "assistant" s
+      sendWebhook webhookUrl runId "idle" summary totalCostCents Nothing
+      STM.atomically <| STM.writeTVar statusVar StatusIdle
+      mInfo <- getAgent (dsDbConn state) runId
+      case mInfo of
+        Just info | case aiTitle info of Nothing -> True; Just _ -> False -> do
+          let prompt = aiPrompt info
+              toolSummary = fromMaybe "" summary
+          _ <- Async.async <| generateAndStoreTitle (dsDbConn state) runId prompt toolSummary
+          pure ()
+        _ -> pure ()
+      pure (summary, totalCostCents)
+    Trace.EventCustom "agent_error" payload _ -> do
+      let errMsg = extractErrorFromCustom payload
+      STM.atomically <| STM.writeTVar statusVar StatusIdle
+      updateAgentStatus (dsDbConn state) runId "idle"
+      case errMsg of
+        Just "Cancelled by user" -> pure ()
+        _ -> sendWebhook webhookUrl runId "error" Nothing totalCostCents errMsg
+      pure (lastSummary, totalCostCents)
     _ -> pure (lastSummary, totalCostCents)
 
--- | Extract summary from agent_end event (last assistant message text)
-extractSummary :: Aeson.Object -> Maybe Text
-extractSummary obj = do
-  Aeson.Array messages <- KeyMap.lookup "messages" obj
-  -- Find last assistant message
-  let assistantMsgs = filter isAssistant (toList messages)
-  lastMsg <- listToMaybe (reverse assistantMsgs)
-  extractMessageText lastMsg
-  where
-    isAssistant (Aeson.Object o) = KeyMap.lookup "role" o == Just (Aeson.String "assistant")
-    isAssistant _ = False
-
--- | Extract summary from a message_end event
-extractSummaryFromMessageEnd :: Aeson.Object -> Maybe Text
-extractSummaryFromMessageEnd obj = do
-  message <- KeyMap.lookup "message" obj
-  extractMessageText message
-
-extractMessageText :: Aeson.Value -> Maybe Text
-extractMessageText (Aeson.Object o) = do
-  Aeson.Array content <- KeyMap.lookup "content" o
-  -- Find first text block
-  let textBlocks = filter isTextBlock (toList content)
-  case textBlocks of
-    (Aeson.Object tb : _) -> do
-      Aeson.String txt <- KeyMap.lookup "text" tb
-      pure <| Text.take 200 txt
+extractSummaryFromAgentComplete :: Aeson.Value -> Maybe Text
+extractSummaryFromAgentComplete (Aeson.Object obj) =
+  case KeyMap.lookup "response" obj <|> KeyMap.lookup "message" obj <|> KeyMap.lookup "content" obj of
+    Just (Aeson.String txt) -> Just (Text.take 200 txt)
     _ -> Nothing
-extractMessageText _ = Nothing
-
-isTextBlock :: Aeson.Value -> Bool
-isTextBlock (Aeson.Object o) = KeyMap.lookup "type" o == Just (Aeson.String "text")
-isTextBlock _ = False
-
--- | Extract total cost from agent_end messages
-extractCost :: Aeson.Object -> Int
-extractCost obj = case KeyMap.lookup "messages" obj of
-  Just (Aeson.Array messages) ->
-    sum <| map extractMessageCostFromValue (toList messages)
-  _ -> 0
-
--- | Extract cost from a message_end event
-extractMessageCost :: Aeson.Object -> Int
-extractMessageCost obj = case KeyMap.lookup "message" obj of
-  Just val -> extractMessageCostFromValue val
-  _ -> 0
-
--- | Extract cost in cents from a message value
-extractMessageCostFromValue :: Aeson.Value -> Int
-extractMessageCostFromValue (Aeson.Object msg) = case KeyMap.lookup "usage" msg of
-  Just (Aeson.Object usage) -> case KeyMap.lookup "cost" usage of
-    Just (Aeson.Object cost) -> case KeyMap.lookup "total" cost of
-      Just (Aeson.Number n) -> round (n * 100) -- Convert dollars to cents
-      _ -> 0
-    _ -> 0
-  _ -> 0
-extractMessageCostFromValue _ = 0
+extractSummaryFromAgentComplete _ = Nothing
+
+extractErrorFromCustom :: Aeson.Value -> Maybe Text
+extractErrorFromCustom (Aeson.Object obj) =
+  case KeyMap.lookup "message" obj <|> KeyMap.lookup "error" obj <|> KeyMap.lookup "content" obj of
+    Just (Aeson.String txt) -> Just txt
+    _ -> Nothing
+extractErrorFromCustom _ = Nothing
 
 -- | Send webhook notification
 sendWebhook :: Maybe Text -> Text -> Text -> Maybe Text -> Int -> Maybe Text -> IO ()
@@ -1778,8 +1734,8 @@ sendWebhook (Just url) runId event summary costCents mError = do
         Right () -> pure ()
   pure ()
 
--- | Monitor a pi agent process and update DB when it exits
-monitorPiAgent ::
+-- | Monitor an agent process and update DB when it exits
+monitorAgentProcess ::
   DaemonState ->
   Text ->
   Process.ProcessHandle ->
@@ -1789,7 +1745,7 @@ monitorPiAgent ::
   Maybe FilePath ->
   FilePath ->
   IO ()
-monitorPiAgent state runId ph readerAsync stderrAsync webhookUrl _mRepoRoot worktreePath = do
+monitorAgentProcess state runId ph readerAsync stderrAsync webhookUrl _mRepoRoot worktreePath = do
   exitCode <- Process.waitForProcess ph
   -- Wait for readers to finish processing remaining output
   _ <- Async.waitCatch readerAsync
@@ -1822,56 +1778,46 @@ monitorPiAgent state runId ph readerAsync stderrAsync webhookUrl _mRepoRoot work
   -- Remove from running map
   STM.atomically <| STM.modifyTVar' (dsRunning state) (Map.delete runId)
 
--- | Stop a running agent process
+-- | Stop a running agent process.
 stopAgentProcess :: DaemonState -> Text -> IO Bool
 stopAgentProcess state runId = do
   running <- STM.atomically <| STM.readTVar (dsRunning state)
   case Map.lookup runId running of
     Nothing -> pure False
     Just ra -> do
-      -- Send quit command first
-      let quitCmd = Aeson.object ["type" .= ("quit" :: Text)]
-      _ <- try @SomeException <| sendToPiProcess (raStdinHandle ra) quitCmd
-      -- Wait a bit then force terminate if still running
+      mPid <- Process.getPid (raProcessHandle ra)
+      _ <- try @SomeException <| forM_ mPid (Signals.signalProcess Signals.sigTERM)
+      _ <- try @SomeException <| IO.hClose (raStdinHandle ra)
       threadDelay 1000000 -- 1 second
       mExitCode <- Process.getProcessExitCode (raProcessHandle ra)
       case mExitCode of
-        Just _ -> pure () -- Already exited
-        Nothing -> do
-          -- Force terminate
-          Process.terminateProcess (raProcessHandle ra)
-      -- Cancel monitor thread
+        Just _ -> pure ()
+        Nothing -> Process.terminateProcess (raProcessHandle ra)
       Async.cancel (raMonitorAsync ra)
-      -- Update DB
       updateAgentStatus (dsDbConn state) runId "stopped"
       now <- Time.getCurrentTime
       updateWorkspaceStatus (dsDbConn state) runId "stopped"
       updateWorkspaceAccessedAt (dsDbConn state) runId now
       sizeBytes <- calculateWorkspaceSize (raWorktreePath ra)
       updateWorkspaceSize (dsDbConn state) runId sizeBytes
-      -- Send webhook
       sendWebhook (raWebhookUrl ra) runId "stopped" Nothing 0 Nothing
-      -- Don't delete worktree on stop — user may want to review/resume.
-      -- Remove from running map
       STM.atomically <| STM.modifyTVar' (dsRunning state) (Map.delete runId)
       pure True
 
--- | Send message to a running agent
+-- | Send message to a running agent.
 sendToAgent :: DaemonState -> Text -> Text -> IO Bool
 sendToAgent state runId message = do
   running <- STM.atomically <| STM.readTVar (dsRunning state)
   case Map.lookup runId running of
     Nothing -> pure False
     Just ra -> do
-      let promptCmd =
-            Aeson.object
-              [ "type" .= ("prompt" :: Text),
-                "message" .= message
-              ]
-      result <- try @SomeException <| sendToPiProcess (raStdinHandle ra) promptCmd
+      result <- try @SomeException <| sendPromptToAgent (raStdinHandle ra) message
       case result of
         Left _ -> pure False
-        Right _ -> pure True
+        Right _ -> do
+          insertMessage (dsDbConn state) runId "user" message
+          updateAgentStatus (dsDbConn state) runId "running"
+          pure True
 
 -- | Get live status of a running agent
 getLiveStatus :: DaemonState -> Text -> IO (Maybe AgentStatus)
@@ -1896,45 +1842,8 @@ server state =
 
 spawnHandler :: DaemonState -> SpawnRequest -> Handler SpawnResponse
 spawnHandler state req = do
-  case validateWebhookUrl (spawnWebhook req) of
-    Left err -> throwError err400 {errBody = BL.fromStrict (encodeUtf8 err)}
-    Right _ -> do
-      runId <- liftIO <| newRunId (spawnName req)
-      cwdText <- case spawnCwd req <|> spawnWorkspace req of
-        Just cwd -> pure cwd
-        Nothing -> throwError err400 {errBody = "Missing required field: cwd"}
-      absCwd <- liftIO <| Dir.makeAbsolute (Text.unpack cwdText)
-      let cfg =
-            AgentConfig
-              { acName = runId,
-                acProvider = fromMaybe "anthropic" (spawnProvider req),
-                acModel = fromMaybe defaultModel (spawnModel req),
-                acCwd = absCwd,
-                acThinking = normalizeThinking (fromMaybe "high" (spawnThinking req)),
-                acExtraArgs = spawnExtraArgs req,
-                acExtraEnv = Map.empty
-              }
-          dbPath = Just (dcDbPath (dsConfig state))
-      created <- liftIO <| createAgent dbPath cfg
-      case created of
-        Left err -> throwError err400 {errBody = BL.fromStrict (encodeUtf8 err)}
-        Right _ -> do
-          started <- liftIO <| startPersistentAgent dbPath runId
-          case started of
-            Left err -> throwError err500 {errBody = BL.fromStrict (encodeUtf8 err)}
-            Right _ -> do
-              case spawnPrompt req of
-                Just rawPrompt -> do
-                  let promptText = Text.strip rawPrompt
-                  if Text.null promptText
-                    then pure ()
-                    else do
-                      sendResult <- liftIO <| sendPersistentAgent dbPath runId promptText
-                      case sendResult of
-                        Left err -> throwError err500 {errBody = BL.fromStrict (encodeUtf8 err)}
-                        Right () -> pure ()
-                Nothing -> pure ()
-              pure <| SpawnResponse runId "started"
+  runId <- liftIO <| spawnPiAgent state req
+  pure <| SpawnResponse runId "started"
 
 listHandler :: DaemonState -> Handler [AgentInfo]
 listHandler state = do
@@ -1954,19 +1863,17 @@ statusHandler state runId = do
 
 sendHandler :: DaemonState -> Text -> SendRequest -> Handler Aeson.Value
 sendHandler state runId req = do
-  let dbPath = Just (dcDbPath (dsConfig state))
-  result <- liftIO <| sendPersistentAgent dbPath runId (sendMessage req)
-  case result of
-    Left err -> throwError err404 {errBody = BL.fromStrict (encodeUtf8 err)}
-    Right () -> pure <| Aeson.object ["status" .= ("sent" :: Text)]
+  sent <- liftIO <| sendToAgent state runId (sendMessage req)
+  if sent
+    then pure <| Aeson.object ["status" .= ("sent" :: Text)]
+    else throwError err404 {errBody = "Agent not found"}
 
 stopHandler :: DaemonState -> Text -> Handler Aeson.Value
 stopHandler state runId = do
-  let dbPath = Just (dcDbPath (dsConfig state))
-  result <- liftIO <| stopPersistentAgent dbPath runId
-  case result of
-    Left err -> throwError err404 {errBody = BL.fromStrict (encodeUtf8 err)}
-    Right _ -> pure <| Aeson.object ["status" .= ("stopped" :: Text)]
+  stopped <- liftIO <| stopAgentProcess state runId
+  if stopped
+    then pure <| Aeson.object ["status" .= ("stopped" :: Text)]
+    else throwError err404 {errBody = "Agent not found"}
 
 deleteHandler :: DaemonState -> Text -> Handler Aeson.Value
 deleteHandler state runId = do
@@ -2047,7 +1954,7 @@ runDaemon config = do
   putText <| "  Database: " <> Text.pack (dcDbPath config)
   putText <| "  Log root: " <> Text.pack (dcLogRoot config)
   putText <| "  Workspace: " <> Text.pack (dcWorkspace config)
-  putText <| "  Pi path: " <> Text.pack (dcPiPath config)
+  putText <| "  Agent path (--pi-path): " <> Text.pack (dcPiPath config)
   Warp.run (dcPort config) (serve agentAPI (server state))
 
 -- | Graceful shutdown: stop all running agents, close handles (t-564)
@@ -2447,32 +2354,26 @@ mockPiScript =
     [ "#!/usr/bin/env bash",
       "set -euo pipefail",
       "",
+      "ts='2026-01-01T00:00:00Z'",
       "emit() {",
       "  printf '%s\\n' \"$1\"",
       "}",
       "",
-      "while IFS= read -r line; do",
-      "  if [ -z \"$line\" ]; then",
+      "while IFS= read -r -d '' prompt; do",
+      "  if [ -z \"$prompt\" ]; then",
       "    continue",
       "  fi",
-      "  case \"$line\" in",
-      "    *'\"type\":\"quit\"'* ) exit 0 ;;",
-      "    *'\"type\":\"abort\"'* ) exit 1 ;;",
-      "  esac",
-      "  case \"$line\" in",
-      "    *'\"type\":\"prompt\"'* ) ;;",
-      "    * ) continue ;;",
-      "  esac",
-      "  message=$(printf '%s' \"$line\" | sed -n 's/.*\"message\"[ ]*:[ ]*\"\\([^\"]*\\)\".*/\\1/p')",
-      "  if printf '%s' \"$message\" | grep -q 'FAIL'; then",
-      "    emit '{\"type\":\"agent_start\"}'",
-      "    exit 1",
+      "",
+      "  emit \"{\\\"type\\\":\\\"custom\\\",\\\"custom_type\\\":\\\"agent_start\\\",\\\"data\\\":{},\\\"timestamp\\\":\\\"${ts}\\\"}\"",
+      "",
+      "  if printf '%s' \"$prompt\" | grep -q 'FAIL'; then",
+      "    emit \"{\\\"type\\\":\\\"custom\\\",\\\"custom_type\\\":\\\"agent_error\\\",\\\"data\\\":{\\\"message\\\":\\\"Mock failure\\\"},\\\"timestamp\\\":\\\"${ts}\\\"}\"",
+      "    continue",
       "  fi",
-      "  emit '{\"type\":\"agent_start\"}'",
-      "  emit '{\"type\":\"turn_start\"}'",
-      "  emit \"{\\\"type\\\":\\\"message_end\\\",\\\"message\\\":{\\\"role\\\":\\\"assistant\\\",\\\"content\\\":[{\\\"type\\\":\\\"text\\\",\\\"text\\\":\\\"Mock reply: ${message}\\\"}],\\\"usage\\\":{\\\"cost\\\":{\\\"total\\\":0.01}}}}\"",
-      "  emit '{\"type\":\"turn_end\"}'",
-      "  emit \"{\\\"type\\\":\\\"agent_end\\\",\\\"messages\\\":[{\\\"role\\\":\\\"user\\\",\\\"content\\\":[{\\\"type\\\":\\\"text\\\",\\\"text\\\":\\\"${message}\\\"}]},{\\\"role\\\":\\\"assistant\\\",\\\"content\\\":[{\\\"type\\\":\\\"text\\\",\\\"text\\\":\\\"Mock reply: ${message}\\\"}],\\\"usage\\\":{\\\"cost\\\":{\\\"total\\\":0.01}}}]}\"",
+      "",
+      "  emit \"{\\\"type\\\":\\\"infer_start\\\",\\\"model\\\":\\\"mock-model\\\",\\\"prompt_preview\\\":\\\"${prompt}\\\",\\\"timestamp\\\":\\\"${ts}\\\",\\\"iteration\\\":0}\"",
+      "  emit \"{\\\"type\\\":\\\"infer_end\\\",\\\"response_preview\\\":\\\"Mock reply: ${prompt}\\\",\\\"tokens\\\":42,\\\"cost_cents\\\":1.0,\\\"duration_ms\\\":5,\\\"timestamp\\\":\\\"${ts}\\\",\\\"iteration\\\":0}\"",
+      "  emit \"{\\\"type\\\":\\\"custom\\\",\\\"custom_type\\\":\\\"agent_complete\\\",\\\"data\\\":{\\\"response\\\":\\\"Mock reply: ${prompt}\\\"},\\\"timestamp\\\":\\\"${ts}\\\"}\"",
       "done"
     ]