commit 3abe99a2685f4a169f791d12352041c70daf2b05
Author: Coder Agent <coder@agents.omni>
Date: Tue Feb 17 10:52:06 2026
Pipeline: native pi spawning, fix stale worktree detection
Switch dev agent spawning from agentd (Docker containers) to native
pi process. This enables buffet providers (claude-code, codex) that
don't work inside containers.
Key changes:
- Dev.hs: spawn pi directly with --provider flag, track via PID files
and .exit marker files for async status polling
- Workspace.hs: filter out state.db-wal/shm files from stale worktree
scan, check entries are actual directories
Tested end-to-end: pipeline spawns pi agent, agent edits file, runs
typecheck/bild, commits with Task-Id trailer. Verify and integrate
phases also exercised.
Task-Id: t-603
diff --git a/Omni/Pipeline/Dev.hs b/Omni/Pipeline/Dev.hs
index 75e3720f..aa32615e 100644
--- a/Omni/Pipeline/Dev.hs
+++ b/Omni/Pipeline/Dev.hs
@@ -3,9 +3,8 @@
-- | Dev phase: agent spawning and prompt building.
--
--- The only phase that uses an LLM agent. Builds a simplified prompt
--- (agent just writes code, pipeline owns status transitions) and
--- manages the agent lifecycle via agentd.
+-- Uses `pi` directly on the host (supports claude-code/codex buffet providers).
+-- Tracks agent processes via PID files for async status polling.
module Omni.Pipeline.Dev where
import Alpha
@@ -18,8 +17,8 @@ import qualified Omni.Pipeline.Git as Git
import qualified Omni.Task.Core as Task
import qualified System.Directory as Dir
import qualified System.Exit as Exit
+import qualified System.IO as IO
import qualified System.Process as Process
-import qualified Text.Read as Read
-- | Build the dev prompt. Pure function — easy to test.
buildDevPrompt :: Text -> Task.Task -> Int -> Maybe Text -> Text
@@ -70,7 +69,9 @@ buildDevPrompt baseBranch task patchset maybeReviewFeedback =
""
]
--- | Spawn a dev agent via agentd. Returns an ActiveDev record.
+-- | Spawn a dev agent via `pi` (native host, supports buffet providers).
+-- Writes prompt to file, spawns pi in background, records PID.
+-- Returns ActiveDev for async polling.
spawnDev ::
Core.PipelineConfig ->
FilePath ->
@@ -90,63 +91,123 @@ spawnDev cfg workspace task patchset maybeReviewFeedback dbRowId = do
let prompt = buildDevPrompt baseBranch task patchset maybeReviewFeedback
now <- getCurrentTime
let runId = "dev-" <> T.unpack taskId <> "-" <> show (floor (utcTimeToPOSIXSeconds now) :: Integer)
- promptDir = workspace <> "/_/tmp/pipeline"
- promptFile = promptDir <> "/" <> runId <> ".md"
- Dir.createDirectoryIfMissing True promptDir
+ pipelineDir = workspace <> "/_/tmp/pipeline"
+ promptFile = pipelineDir <> "/" <> runId <> ".md"
+ logFile = pipelineDir <> "/" <> runId <> ".log"
+ pidFile = pipelineDir <> "/" <> runId <> ".pid"
+ exitFile = pipelineDir <> "/" <> runId <> ".exit"
+ Dir.createDirectoryIfMissing True pipelineDir
TIO.writeFile promptFile prompt
- -- Build agentd command
- let args =
- ["run", promptFile, "-n", runId, "--timeout", show (Core.pcAgentTimeout cfg), "--max-iter", show (Core.pcAgentMaxIter cfg), "--max-cost", show (Core.pcAgentMaxCost cfg)]
- ++ maybe [] (\p -> ["-p", T.unpack p]) (Core.pcAgentProvider cfg)
-
- -- Spawn in background (don't use --fg)
- (exitCode, _out, err) <-
- Process.readCreateProcessWithExitCode
- (Process.proc "agentd" args) {Process.cwd = Just workspace}
- ""
-
- case exitCode of
- Exit.ExitSuccess ->
- pure
- <| Right
- Core.ActiveDev
- { Core.adTaskId = taskId,
- Core.adRunId = T.pack runId,
- Core.adWorkspace = workspace,
- Core.adStartedAt = now,
- Core.adBeforeSha = beforeSha,
- Core.adPatchset = patchset,
- Core.adNamespace = Task.taskNamespace task,
- Core.adRunDbId = dbRowId
- }
- Exit.ExitFailure _ ->
- pure (Left ("agentd spawn failed: " <> T.pack err))
+ -- Build pi command (prompt path must be relative to workspace CWD)
+ let provider = fromMaybe "claude-code" (Core.pcAgentProvider cfg)
+ relPromptFile = "_/tmp/pipeline/" <> runId <> ".md"
+ piArgs =
+ [ "--provider",
+ T.unpack provider,
+ "--no-session",
+ "-p",
+ "@" <> relPromptFile
+ ]
+
+ -- Open log file for immediate writing
+ logHandle <- IO.openFile logFile IO.WriteMode
+ IO.hSetBuffering logHandle IO.LineBuffering
+
+ -- Spawn pi directly with stdout/stderr to log file
+ let cp =
+ (Process.proc "pi" piArgs)
+ { Process.cwd = Just workspace,
+ Process.std_in = Process.NoStream,
+ Process.std_out = Process.UseHandle logHandle,
+ Process.std_err = Process.UseHandle logHandle,
+ Process.create_group = True
+ }
+
+ spawnResult <- try (Process.createProcess cp)
+ case spawnResult of
+ Left (e :: SomeException) -> do
+ IO.hClose logHandle
+ pure (Left ("pi spawn failed: " <> T.pack (show e)))
+ Right (_, _, _, ph) -> do
+ -- Record PID for status polling
+ mPid <- Process.getPid ph
+ case mPid of
+ Just pid -> writeFile pidFile (show pid)
+ Nothing -> pure ()
+
+ -- Fork a thread to wait for pi to finish and record exit code
+ _ <-
+ forkIO <| do
+ exitCode <- Process.waitForProcess ph
+ IO.hClose logHandle
+ let code = case exitCode of
+ Exit.ExitSuccess -> "0"
+ Exit.ExitFailure n -> show n
+ writeFile exitFile code
+
+ -- Give pi a moment to start (check for immediate failure)
+ threadDelay 500_000 -- 500ms
+ maybeExit <- Process.getProcessExitCode ph
+ case maybeExit of
+ Just (Exit.ExitFailure code) -> do
+ threadDelay 200_000 -- let log flush
+ errOutput <- safeReadFile logFile
+ pure
+ ( Left
+ ( "pi exited immediately (code "
+ <> T.pack (show code)
+ <> "): "
+ <> T.take 500 errOutput
+ )
+ )
+ _ ->
+ pure
+ <| Right
+ Core.ActiveDev
+ { Core.adTaskId = taskId,
+ Core.adRunId = T.pack runId,
+ Core.adWorkspace = workspace,
+ Core.adStartedAt = now,
+ Core.adBeforeSha = beforeSha,
+ Core.adPatchset = patchset,
+ Core.adNamespace = Task.taskNamespace task,
+ Core.adRunDbId = dbRowId
+ }
-- | Check the status of a running dev agent.
+-- Checks the .exit file (written by wrapper script on completion).
checkDevStatus :: Core.ActiveDev -> IO Core.DevResult
checkDevStatus ad = do
- -- Poll agentd status
- (exitCode, out, _err) <-
- Process.readProcessWithExitCode
- "agentd"
- ["status", T.unpack (Core.adRunId ad), "--json"]
- ""
- case exitCode of
- Exit.ExitFailure _ ->
- -- Can't determine status — treat as still running
- -- (agentd might not have started yet)
- pure Core.StillRunning
- Exit.ExitSuccess -> do
- let agentStatus = extractField "status" out
- case agentStatus of
- Just "running" -> pure Core.StillRunning
- Just "completed" -> checkForCommit ad
- Just "failed" ->
- pure (Core.DevFailed (T.pack (fromMaybe "Agent run failed" (extractField "error" out))))
- Just "timeout" ->
- pure (Core.DevFailed "Agent run timed out")
- _ -> pure Core.StillRunning
+ let pipelineDir = Core.adWorkspace ad <> "/_/tmp/pipeline"
+ exitFile = pipelineDir <> "/" <> T.unpack (Core.adRunId ad) <> ".exit"
+ pidFile = pipelineDir <> "/" <> T.unpack (Core.adRunId ad) <> ".pid"
+ logFile = pipelineDir <> "/" <> T.unpack (Core.adRunId ad) <> ".log"
+
+ -- Check if agent has exited (wrapper script writes .exit file)
+ exitExists <- Dir.doesFileExist exitFile
+ if exitExists
+ then do
+ exitStr <- T.strip </ safeReadFile exitFile
+ if exitStr == "0"
+ then checkForCommit ad
+ else do
+ logTail <- T.takeEnd 1000 </ safeReadFile logFile
+ pure (Core.DevFailed ("Agent exited with code " <> exitStr <> ":\n" <> logTail))
+ else do
+ -- Check if process is still alive
+ pidExists <- Dir.doesFileExist pidFile
+ if pidExists
+ then do
+ pidStr <- T.strip </ safeReadFile pidFile
+ alive <- isProcessAlive pidStr
+ if alive
+ then pure Core.StillRunning
+ else -- Process died without .exit file — check for commits anyway
+ checkForCommit ad
+ else
+ -- No PID file yet — still starting
+ pure Core.StillRunning
-- | After agent completes, check if a new commit was produced.
checkForCommit :: Core.ActiveDev -> IO Core.DevResult
@@ -158,23 +219,38 @@ checkForCommit ad = do
| before == Just after -> pure (Core.DevFailed "Agent completed but made no commit")
| otherwise -> pure (Core.DevSuccess (Just after))
--- | Get the cost of an agentd run.
+-- | Check if a process with the given PID is alive.
+isProcessAlive :: Text -> IO Bool
+isProcessAlive pidText = do
+ let pidStr = T.unpack pidText
+ (exitCode, _, _) <- Process.readProcessWithExitCode "kill" ["-0", pidStr] ""
+ pure (exitCode == Exit.ExitSuccess)
+
+-- | Safely read a file, returning empty text if it doesn't exist.
+safeReadFile :: FilePath -> IO Text
+safeReadFile path = do
+ exists <- Dir.doesFileExist path
+ if exists
+ then do
+ h <- IO.openFile path IO.ReadMode
+ IO.hSetEncoding h IO.utf8
+ content <- TIO.hGetContents h
+ content `seq` IO.hClose h
+ pure content
+ else pure ""
+
+-- | Get the cost of a run (native mode doesn't track per-run cost).
getRunCost :: Text -> IO Double
-getRunCost runId = do
- (exitCode, out, _) <-
- Process.readProcessWithExitCode
- "agentd"
- ["status", T.unpack runId, "--json"]
- ""
- case exitCode of
- Exit.ExitSuccess ->
- case extractField "cost_cents" out of
- Just s -> pure (fromMaybe 0 (Read.readMaybe s))
- Nothing -> pure 0
- _ -> pure 0
-
--- | Simple field extraction from JSON output (avoids aeson dependency for
--- quick status checks). Looks for "field": "value" or "field": number.
+getRunCost _runId = pure 0
+
+-- | Shell-escape a string for safe interpolation.
+shellEscape :: String -> String
+shellEscape s = "'" <> concatMap esc s <> "'"
+ where
+ esc '\'' = "'\\''"
+ esc c = [c]
+
+-- | Simple field extraction from JSON output.
extractField :: String -> String -> Maybe String
extractField field input =
case matchingLine of
diff --git a/Omni/Pipeline/Workspace.hs b/Omni/Pipeline/Workspace.hs
index a7abd566..cafd2197 100644
--- a/Omni/Pipeline/Workspace.hs
+++ b/Omni/Pipeline/Workspace.hs
@@ -8,6 +8,7 @@ module Omni.Pipeline.Workspace where
import Alpha
import qualified Control.Concurrent.STM as STM
+import qualified Data.List as List
import qualified Data.Map.Strict as Map
import qualified Data.Text as T
import qualified Omni.Pipeline.Core as Core
@@ -133,5 +134,14 @@ findStaleWorktrees pool = do
then pure []
else do
entries <- Dir.listDirectory root
- let taskDirs = filter (\e -> e /= "integration" && e /= "state.db") entries
- pure (map T.pack taskDirs)
+ let taskDirs =
+ filter
+ ( \e ->
+ e
+ /= "integration"
+ && not ("state.db" `List.isPrefixOf` e)
+ )
+ entries
+ -- Only return entries that are actual directories (not files)
+ dirs <- filterM (\e -> Dir.doesDirectoryExist (root <> "/" <> e)) taskDirs
+ pure (map T.pack dirs)