← Back to task

Commit d20cb79f

commit d20cb79f4bcd838ef05bbe6f149dfeb67f7829a0
Author: Coder Agent <coder@agents.omni>
Date:   Thu Feb 19 17:51:06 2026

    Pipeline scheduler: orchestrate task/fund/agentd by domain
    
    Task-Id: t-623 (scheduler domain orchestration)

diff --git a/Omni/Pipeline.hs b/Omni/Pipeline.hs
index dbb5ed6a..ace29ac5 100755
--- a/Omni/Pipeline.hs
+++ b/Omni/Pipeline.hs
@@ -1,753 +1,890 @@
 #!/usr/bin/env run.sh
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE LambdaCase #-}
 {-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
 {-# LANGUAGE NoImplicitPrelude #-}
 
--- | Pipeline: automated dev → verify → ship.
+-- | Pipeline scheduler: task -> fund -> agentd orchestration.
 --
--- Single-process pipeline that drives tasks from Open through development,
--- build verification, and integration into the base branch.
---
--- Only the dev phase uses an LLM agent. Verify and integrate are deterministic.
+-- Scans `task ready`, gates by `fund` budget per namespace/domain,
+-- spawns `agentd` runs tagged by domain (in run id), and on completion
+-- moves tasks to Review while recording cost back to fund.
 --
 -- : out pipeline
 -- : dep aeson
 -- : dep optparse-applicative
--- : dep sqlite-simple
--- : dep async
--- : dep stm
+-- : dep bytestring
+-- : dep process
+-- : dep text
+-- : dep time
 module Omni.Pipeline where
 
 import Alpha
-import qualified Control.Concurrent.STM as STM
-import qualified Data.Map.Strict as Map
+import qualified Data.Aeson as Aeson
+import qualified Data.Aeson.Key as Key
+import qualified Data.Aeson.KeyMap as KeyMap
+import qualified Data.ByteString.Lazy as BL
+import qualified Data.Char as Char
+import qualified Data.Set as Set
 import qualified Data.Text as T
+import qualified Data.Text.Encoding as TE
 import qualified Data.Text.IO as TIO
-import Data.Time (diffUTCTime, getCurrentTime)
-import qualified Database.SQLite.Simple as SQL
+import Data.Time (UTCTime, getCurrentTime)
+import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
 import qualified Omni.Cli as Cli
-import qualified Omni.Pipeline.Core as Core
-import qualified Omni.Pipeline.Dev as Dev
-import qualified Omni.Pipeline.Git as Git
-import qualified Omni.Pipeline.Integrate as Integrate
-import qualified Omni.Pipeline.State as State
-import qualified Omni.Pipeline.Verify as Verify
-import qualified Omni.Pipeline.Workspace as Workspace
-import qualified Omni.Task.Core as Task
 import qualified Omni.Test as Test
 import qualified Options.Applicative as Opt
 import qualified System.Directory as Dir
-import qualified System.Environment as Env
+import qualified System.Exit as Exit
+import qualified System.FilePath as FP
 import qualified System.IO as IO
+import qualified System.Process as Process
 import qualified Text.Read as Read
 
-main :: IO ()
-main =
-  join (Opt.execParser (Cli.info (parser <**> Opt.helper) Cli.fullDesc))
+-- ============================================================================
+-- Types
+-- ============================================================================
 
-test :: Test.Tree
-test =
-  Test.group
-    "Omni.Pipeline"
-    [ Test.unit "buildDevPrompt includes task id" <| do
-        let task =
-              Task.Task
-                { Task.taskId = "t-999",
-                  Task.taskTitle = "Test task",
-                  Task.taskType = Task.WorkTask,
-                  Task.taskParent = Nothing,
-                  Task.taskNamespace = Just "Omni/Test.hs",
-                  Task.taskStatus = Task.Open,
-                  Task.taskPatchsetCount = 0,
-                  Task.taskPriority = Task.P2,
-                  Task.taskComplexity = Nothing,
-                  Task.taskDependencies = [],
-                  Task.taskDescription = "A test task",
-                  Task.taskComments = [],
-                  Task.taskTags = Task.Tags [],
-                  Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
-                  Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
-                }
-            prompt = Dev.buildDevPrompt "live" task 0 Nothing
-        Test.assertBool "prompt contains task id" (T.isInfixOf "t-999" prompt)
-        Test.assertBool "prompt says no status changes" (T.isInfixOf "Do NOT change task status" prompt),
-      --
-      Test.unit "buildDevPrompt includes review feedback" <| do
-        let task =
-              Task.Task
-                { Task.taskId = "t-888",
-                  Task.taskTitle = "Retry task",
-                  Task.taskType = Task.WorkTask,
-                  Task.taskParent = Nothing,
-                  Task.taskNamespace = Just "Omni/Foo.hs",
-                  Task.taskStatus = Task.InProgress,
-                  Task.taskPatchsetCount = 1,
-                  Task.taskPriority = Task.P2,
-                  Task.taskComplexity = Nothing,
-                  Task.taskDependencies = [],
-                  Task.taskDescription = "Fix the thing",
-                  Task.taskComments = [],
-                  Task.taskTags = Task.Tags [],
-                  Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
-                  Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
-                }
-            prompt = Dev.buildDevPrompt "live" task 1 (Just "Build failed: missing import")
-        Test.assertBool "prompt contains feedback" (T.isInfixOf "missing import" prompt)
-        Test.assertBool "prompt mentions previous rejection" (T.isInfixOf "previous patchset was rejected" (T.toLower prompt)),
-      --
-      Test.unit "buildDevPrompt includes comments" <| do
-        let task =
-              Task.Task
-                { Task.taskId = "t-777",
-                  Task.taskTitle = "Task with comments",
-                  Task.taskType = Task.WorkTask,
-                  Task.taskParent = Nothing,
-                  Task.taskNamespace = Just "Omni/Foo.hs",
-                  Task.taskStatus = Task.NeedsHelp,
-                  Task.taskPatchsetCount = 0,
-                  Task.taskPriority = Task.P2,
-                  Task.taskComplexity = Nothing,
-                  Task.taskDependencies = [],
-                  Task.taskDescription = "Do the thing",
-                  Task.taskComments =
-                    [ Task.Comment
-                        { Task.commentText = "What format should the output be?",
-                          Task.commentAuthor = Task.Agent Task.Engineer,
-                          Task.commentCreatedAt = Read.read "2026-01-01 01:00:00 UTC"
-                        },
-                      Task.Comment
-                        { Task.commentText = "Use JSON output",
-                          Task.commentAuthor = Task.Human,
-                          Task.commentCreatedAt = Read.read "2026-01-01 02:00:00 UTC"
-                        }
-                    ],
-                  Task.taskTags = Task.Tags [],
-                  Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
-                  Task.taskUpdatedAt = Read.read "2026-01-01 02:00:00 UTC"
-                }
-            prompt = Dev.buildDevPrompt "live" task 0 Nothing
-        Test.assertBool "prompt contains Comments section" (T.isInfixOf "### Comments" prompt)
-        Test.assertBool "prompt contains agent question" (T.isInfixOf "What format should the output be?" prompt)
-        Test.assertBool "prompt contains human answer" (T.isInfixOf "Use JSON output" prompt)
-        Test.assertBool "prompt contains human author tag" (T.isInfixOf "[human]" prompt)
-        Test.assertBool "prompt contains agent author tag" (T.isInfixOf "[agent:engineer]" prompt),
-      --
-      Test.unit "buildDevPrompt includes needs-input instructions" <| do
-        let task =
-              Task.Task
-                { Task.taskId = "t-666",
-                  Task.taskTitle = "Ambiguous task",
-                  Task.taskType = Task.WorkTask,
-                  Task.taskParent = Nothing,
-                  Task.taskNamespace = Nothing,
-                  Task.taskStatus = Task.Open,
-                  Task.taskPatchsetCount = 0,
-                  Task.taskPriority = Task.P2,
-                  Task.taskComplexity = Nothing,
-                  Task.taskDependencies = [],
-                  Task.taskDescription = "Do something",
-                  Task.taskComments = [],
-                  Task.taskTags = Task.Tags [],
-                  Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
-                  Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
-                }
-            prompt = Dev.buildDevPrompt "live" task 0 Nothing
-        Test.assertBool "prompt contains needs-input instruction" (T.isInfixOf "ambiguous or missing critical details" prompt)
-        Test.assertBool "prompt mentions task comment command" (T.isInfixOf "task comment" prompt)
-        Test.assertBool "prompt mentions exit 0" (T.isInfixOf "exit 0 without committing" prompt),
-      --
-      Test.unit "buildDevPrompt omits Comments section when no comments" <| do
-        let task =
-              Task.Task
-                { Task.taskId = "t-555",
-                  Task.taskTitle = "No comments task",
-                  Task.taskType = Task.WorkTask,
-                  Task.taskParent = Nothing,
-                  Task.taskNamespace = Just "Omni/Bar.hs",
-                  Task.taskStatus = Task.Open,
-                  Task.taskPatchsetCount = 0,
-                  Task.taskPriority = Task.P2,
-                  Task.taskComplexity = Nothing,
-                  Task.taskDependencies = [],
-                  Task.taskDescription = "Simple task",
-                  Task.taskComments = [],
-                  Task.taskTags = Task.Tags [],
-                  Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
-                  Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
-                }
-            prompt = Dev.buildDevPrompt "live" task 0 Nothing
-        Test.assertBool "prompt does not contain Comments section" (not (T.isInfixOf "### Comments" prompt))
-    ]
+data Config = Config
+  { cfgRoot :: FilePath,
+    cfgIntervalSec :: Int,
+    cfgMaxActive :: Int,
+    cfgReservationCents :: Int,
+    cfgToolchain :: Text,
+    cfgTaskCmd :: String,
+    cfgFundCmd :: String,
+    cfgAgentdCmd :: String,
+    cfgProvider :: Maybe Text,
+    cfgModel :: Maybe Text,
+    cfgTimeoutSec :: Maybe Int,
+    cfgMaxIter :: Maybe Int,
+    cfgMaxCostCents :: Maybe Int,
+    cfgOnce :: Bool,
+    cfgDryRun :: Bool
+  }
+  deriving (Show, Eq)
+
+data ActiveRun = ActiveRun
+  { arTaskId :: Text,
+    arNamespace :: Text,
+    arDomain :: Text,
+    arRunId :: Text,
+    arStartedAt :: UTCTime
+  }
+  deriving (Show, Eq, Generic)
+
+instance Aeson.ToJSON ActiveRun
+
+instance Aeson.FromJSON ActiveRun
+
+newtype SchedulerState = SchedulerState
+  { ssActiveRuns :: [ActiveRun]
+  }
+  deriving (Show, Eq, Generic)
+
+instance Aeson.ToJSON SchedulerState
+
+instance Aeson.FromJSON SchedulerState
+
+data ReadyTask = ReadyTask
+  { rtTaskId :: Text,
+    rtNamespace :: Maybe Text,
+    rtTitle :: Maybe Text,
+    rtStatus :: Maybe Text
+  }
+  deriving (Show, Eq)
+
+instance Aeson.FromJSON ReadyTask where
+  parseJSON =
+    Aeson.withObject "ReadyTask" <| \o ->
+      ReadyTask
+        </ o Aeson..: "taskId"
+        <*> o Aeson..:? "taskNamespace"
+        <*> o Aeson..:? "taskTitle"
+        <*> o Aeson..:? "taskStatus"
+
+data DomainTask = DomainTask
+  { dtTaskId :: Text,
+    dtNamespace :: Text,
+    dtDomain :: Text,
+    dtTitle :: Maybe Text
+  }
+  deriving (Show, Eq)
+
+data AgentRunStatus = AgentRunStatus
+  { arsStatus :: Text,
+    arsCostCents :: Double,
+    arsError :: Maybe Text
+  }
+  deriving (Show, Eq)
+
+instance Aeson.FromJSON AgentRunStatus where
+  parseJSON =
+    Aeson.withObject "AgentRunStatus" <| \o ->
+      AgentRunStatus
+        </ (o Aeson..:? "status" Aeson..!= "unknown")
+        <*> (o Aeson..:? "cost_cents" Aeson..!= 0)
+        <*> o Aeson..:? "error"
+
+data CommandResult = CommandResult
+  { crExitCode :: Exit.ExitCode,
+    crStdout :: Text,
+    crStderr :: Text
+  }
+  deriving (Show, Eq)
 
 -- ============================================================================
--- CLI
+-- Entry / CLI
 -- ============================================================================
 
+main :: IO ()
+main =
+  join (Opt.execParser (Cli.info (parser <**> Opt.helper) Cli.fullDesc))
+
 parser :: Cli.Parser (IO ())
 parser =
   Cli.subparser
-    ( Cli.command "run" (Cli.info runParser (Cli.progDesc "Start the pipeline"))
-        <> Cli.command "status" (Cli.info statusParser (Cli.progDesc "Show pipeline health"))
+    ( Cli.command "run" (Cli.info runParser (Cli.progDesc "Run pipeline scheduler loop"))
+        <> Cli.command "status" (Cli.info statusParser (Cli.progDesc "Show scheduler state"))
+        <> Cli.command "test" (Cli.info (pure (Test.run test)) (Cli.progDesc "Run pipeline tests"))
     )
 
 runParser :: Cli.Parser (IO ())
-runParser =
-  doRun
-    </ Cli.strOption (Cli.long "root" <> Cli.value "_/pipeline" <> Cli.help "Workspace root")
-    <*> Cli.strOption (Cli.long "base" <> Cli.value "live" <> Cli.help "Base branch")
-    <*> Cli.option Cli.auto (Cli.long "concurrency" <> Cli.value 1 <> Cli.help "Max concurrent dev agents")
-    <*> Cli.option Cli.auto (Cli.long "interval" <> Cli.value 60 <> Cli.help "Poll interval (seconds)")
-    <*> Cli.option Cli.auto (Cli.long "max-retries" <> Cli.value 3 <> Cli.help "Max retries per patchset")
-    <*> Cli.option Cli.auto (Cli.long "max-task-cost" <> Cli.value 0 <> Cli.help "Cost cap per task (cents, 0=unlimited)")
-    <*> Cli.option Cli.auto (Cli.long "timeout" <> Cli.value 1800 <> Cli.help "Agent timeout (seconds)")
-    <*> Cli.option Cli.auto (Cli.long "max-iter" <> Cli.value 80 <> Cli.help "Agent max iterations")
-    <*> Cli.option Cli.auto (Cli.long "max-cost" <> Cli.value 300 <> Cli.help "Agent max cost per run (cents)")
-    <*> Cli.optional (Cli.strOption (Cli.long "provider" <> Cli.help "Agent provider"))
-    <*> Cli.optional (Cli.strOption (Cli.long "model" <> Cli.help "Agent model (e.g. gpt-5.3-codex)"))
-    <*> Cli.optional (Cli.strOption (Cli.long "parent" <> Cli.help "Only process tasks under this epic"))
-    <*> Cli.optional (Cli.strOption (Cli.long "task-id" <> Cli.help "Only process this one task"))
-    <*> Cli.switch (Cli.long "once" <> Cli.help "Process one cycle then exit")
-    <*> Cli.switch (Cli.long "dry-run" <> Cli.help "Print what would happen")
+runParser = doRun </ configParser
+
+configParser :: Cli.Parser Config
+configParser =
+  Config
+    </ Cli.strOption
+      ( Cli.long "root"
+          <> Cli.value "_/pipeline"
+          <> Cli.help "Pipeline state root"
+      )
+    <*> Cli.option
+      Cli.auto
+      ( Cli.long "interval"
+          <> Cli.value 60
+          <> Cli.help "Poll interval in seconds"
+      )
+    <*> Cli.option
+      Cli.auto
+      ( Cli.long "max-active"
+          <> Cli.value 8
+          <> Cli.help "Max active agentd runs tracked by scheduler"
+      )
+    <*> Cli.option
+      Cli.auto
+      ( Cli.long "reserve-cents"
+          <> Cli.value 300
+          <> Cli.help "Required available budget (cents) before spawning"
+      )
+    <*> (T.pack </ Cli.strOption
+      ( Cli.long "toolchain"
+          <> Cli.value "haskell"
+          <> Cli.help "agentd toolchain for generated prompts"
+      ))
+    <*> Cli.strOption
+      ( Cli.long "task-cmd"
+          <> Cli.value "task"
+          <> Cli.help "Task CLI command"
+      )
+    <*> Cli.strOption
+      ( Cli.long "fund-cmd"
+          <> Cli.value "fund"
+          <> Cli.help "Fund CLI command"
+      )
+    <*> Cli.strOption
+      ( Cli.long "agentd-cmd"
+          <> Cli.value "agentd"
+          <> Cli.help "agentd CLI command"
+      )
+    <*> Cli.optional
+      ( T.pack </ Cli.strOption
+          ( Cli.long "provider"
+              <> Cli.help "Optional provider override in generated prompt frontmatter"
+          )
+      )
+    <*> Cli.optional
+      ( T.pack </ Cli.strOption
+          ( Cli.long "model"
+              <> Cli.help "Optional model override in generated prompt frontmatter"
+          )
+      )
+    <*> Cli.optional
+      ( Cli.option
+          Cli.auto
+          ( Cli.long "timeout"
+              <> Cli.help "agentd run timeout (seconds)"
+          )
+      )
+    <*> Cli.optional
+      ( Cli.option
+          Cli.auto
+          ( Cli.long "max-iter"
+              <> Cli.help "agentd run max iterations"
+          )
+      )
+    <*> Cli.optional
+      ( Cli.option
+          Cli.auto
+          ( Cli.long "max-cost"
+              <> Cli.help "agentd run max cost (cents)"
+          )
+      )
+    <*> Cli.switch
+      ( Cli.long "once"
+          <> Cli.help "Run one scheduler cycle and exit"
+      )
+    <*> Cli.switch
+      ( Cli.long "dry-run"
+          <> Cli.help "Print actions without mutating task/fund state"
+      )
 
 statusParser :: Cli.Parser (IO ())
 statusParser =
   doStatus
-    </ Cli.strOption (Cli.long "root" <> Cli.value "_/pipeline" <> Cli.help "Workspace root")
-    <*> Cli.switch (Cli.long "json" <> Cli.help "JSON output")
+    </ Cli.strOption
+      ( Cli.long "root"
+          <> Cli.value "_/pipeline"
+          <> Cli.help "Pipeline state root"
+      )
+    <*> Cli.switch
+      ( Cli.long "json"
+          <> Cli.help "JSON output"
+      )
 
 -- ============================================================================
--- Run command
+-- Run loop
 -- ============================================================================
 
-doRun ::
-  String ->
-  String ->
-  Int ->
-  Int ->
-  Int ->
-  Int ->
-  Int ->
-  Int ->
-  Int ->
-  Maybe String ->
-  Maybe String ->
-  Maybe String ->
-  Maybe String ->
-  Bool ->
-  Bool ->
-  IO ()
-doRun root base concurrency interval maxRetries maxTaskCost timeout maxIter maxCost provider model parent taskFilter once dryRun = do
-  repoRoot <- Env.getEnv "CODEROOT"
-  let cfg =
-        Core.PipelineConfig
-          { Core.pcRoot = root,
-            Core.pcRepoRoot = repoRoot,
-            Core.pcBaseBranch = T.pack base,
-            Core.pcConcurrency = concurrency,
-            Core.pcPollInterval = interval,
-            Core.pcMaxRetries = maxRetries,
-            Core.pcMaxTaskCost = maxTaskCost,
-            Core.pcAgentTimeout = timeout,
-            Core.pcAgentMaxIter = maxIter,
-            Core.pcAgentMaxCost = maxCost,
-            Core.pcAgentProvider = T.pack </ provider,
-            Core.pcAgentModel = T.pack </ model,
-            Core.pcParentFilter = T.pack </ parent,
-            Core.pcTaskFilter = T.pack </ taskFilter,
-            Core.pcDryRun = dryRun
-          }
-
-  -- Initialize
-  Dir.createDirectoryIfMissing True root
-  db <- State.initPipelineDb (root <> "/state.db")
-  pool <- Workspace.newPool cfg
-  activeDevs <- STM.newTVarIO (Map.empty :: Map.Map Text Core.ActiveDev)
-
-  logMsg "Pipeline starting"
-  logMsg ("root=" <> T.pack root <> " base=" <> T.pack base <> " concurrency=" <> T.pack (show concurrency))
-
-  -- Startup recovery
-  recoverStaleState cfg db pool activeDevs
-
-  -- Main loop with exception handling
+doRun :: Config -> IO ()
+doRun cfg = do
+  Dir.createDirectoryIfMissing True (cfgRoot cfg)
+  logMsg "Pipeline scheduler starting"
+  logMsg
+    ( "root="
+        <> T.pack (cfgRoot cfg)
+        <> " interval="
+        <> tshow (cfgIntervalSec cfg)
+        <> "s max-active="
+        <> tshow (cfgMaxActive cfg)
+        <> if cfgDryRun cfg then " dry-run=true" else ""
+    )
   let loop = do
-        logMsg "--- cycle start ---"
-        result <- try (runOneCycle cfg db pool activeDevs) :: IO (Either SomeException ())
-        case result of
-          Left e -> logMsg ("Cycle error (recovering): " <> T.pack (show e))
-          Right () -> logMsg "--- cycle end ---"
-        if once
-          then logMsg "Single cycle complete, exiting"
+        runCycle cfg
+        if cfgOnce cfg
+          then logMsg "Single cycle complete"
           else do
-            threadDelay (interval * 1_000_000)
+            threadDelay (cfgIntervalSec cfg * 1_000_000)
             loop
   loop
 
--- | Run one full pipeline cycle.
-runOneCycle ::
-  Core.PipelineConfig ->
-  SQL.Connection ->
-  Workspace.WorkspacePool ->
-  STM.TVar (Map.Map Text Core.ActiveDev) ->
-  IO ()
-runOneCycle cfg db pool activeDevs = do
-  -- Load all tasks once per cycle
-  allTasks <- Task.loadTasks
-
-  let workTasks = filter isEligible allTasks
-      byStatus s = filter (\t -> Task.taskStatus t == s) workTasks
-
-  -- 1. INTEGRATE: process Verified tasks
-  let verified = byStatus Task.Verified
-  forM_ verified <| \task -> do
-    let tid = Task.taskId task
-    logMsg ("Integrating: " <> tid)
-    if Core.pcDryRun cfg
-      then logMsg ("DRY RUN: would integrate " <> tid)
-      else doIntegrate cfg db pool task
-
-  -- 2. HARVEST + VERIFY: check running dev agents
-  active <- STM.readTVarIO activeDevs
-  forM_ (Map.toList active) <| \(tid, ad) -> do
-    result <- Dev.checkDevStatus ad
-    case result of
-      Core.StillRunning -> pure ()
-      Core.DevSuccess maybeSha -> do
-        logMsg ("Dev completed for " <> tid <> " (sha=" <> fromMaybe "?" maybeSha <> ")")
-        cost <- Dev.getRunCost (Core.adRunId ad)
-        State.markRunFinished db (Core.adRunDbId ad) Core.Success (Just cost) Nothing
-        _ <- Task.addComment tid ("Pipeline: dev completed (run=" <> Core.adRunId ad <> ", cost=" <> T.pack (show cost) <> "c)") Task.System
-        -- Remove from active
-        STM.atomically <| STM.modifyTVar' activeDevs (Map.delete tid)
-        -- Immediately verify
-        doVerify cfg db pool ad
-      Core.DevNeedsInput reason -> do
-        logMsg ("Dev needs input for " <> tid <> ": " <> reason)
-        cost <- Dev.getRunCost (Core.adRunId ad)
-        State.markRunFinished db (Core.adRunDbId ad) Core.Success (Just cost) (Just reason)
-        _ <- Task.addComment tid ("Pipeline: agent needs input (run=" <> Core.adRunId ad <> ")") Task.System
-        -- Remove from active, release workspace, set task to NeedsHelp
-        STM.atomically <| STM.modifyTVar' activeDevs (Map.delete tid)
-        Workspace.releaseWorkspace pool tid
-        Task.updateTaskStatus tid Task.NeedsHelp []
-      Core.DevFailed err -> do
-        logMsg ("Dev failed for " <> tid <> ": " <> err)
-        cost <- Dev.getRunCost (Core.adRunId ad)
-        State.markRunFinished db (Core.adRunDbId ad) Core.Failure (Just cost) (Just err)
-        _ <- Task.addComment tid ("Pipeline: dev failed (run=" <> Core.adRunId ad <> "): " <> err) Task.System
-        -- Remove from active, release workspace, reset task to Open
-        STM.atomically <| STM.modifyTVar' activeDevs (Map.delete tid)
-        Workspace.releaseWorkspace pool tid
-        Task.updateTaskStatus tid Task.Open []
-
-  -- 3. RE-SPAWN: check NeedsHelp tasks for new human comments
-  let needsHelp = byStatus Task.NeedsHelp
-  forM_ needsHelp <| \task -> do
-    let tid = Task.taskId task
-    lastRun <- State.getLastDevRunTime db tid
-    let hasNewHumanComment = case lastRun of
-          Nothing -> False
-          Just runTime ->
-            any
-              ( \c ->
-                  Task.commentAuthor c
-                    == Task.Human
-                    && Task.commentCreatedAt c
-                    > runTime
-              )
-              (Task.taskComments task)
-    when hasNewHumanComment <| do
-      logMsg ("Human responded to " <> tid <> ", re-spawning dev")
-      Task.updateTaskStatus tid Task.Open []
-      _ <- Task.addComment tid "Pipeline: human responded, re-opening for dev" Task.System
-      pure ()
-
-  -- Reload tasks after NeedsHelp transitions
-  allTasks' <- if null needsHelp then pure allTasks else Task.loadTasks
-  let workTasks' = filter isEligible allTasks'
-      byStatus' s = filter (\t -> Task.taskStatus t == s) workTasks'
-
-  -- 4. DEVELOP: spawn agents for Open tasks
-  let open = byStatus' Task.Open
-  slots <- Workspace.slotsAvailable pool
-  currentActive <- STM.readTVarIO activeDevs
-  let availableSlots = slots - Map.size currentActive
-  forM_ (take availableSlots open) <| \task -> do
-    let tid = Task.taskId task
-    -- Check retry/cost limits
-    canRetry <- checkRetryLimits cfg db task
-    when canRetry <| do
-      if Core.pcDryRun cfg
-        then logMsg ("DRY RUN: would start dev for " <> tid)
-        else doSpawnDev cfg db pool activeDevs task
-  where
-    isEligible task =
-      Task.taskType task
-        == Task.WorkTask
-        && isJust (Task.taskNamespace task) -- must have namespace for verification
-        && matchesFilter (Core.pcTaskFilter cfg) (Task.taskId task)
-        && matchesParent (Core.pcParentFilter cfg) (Task.taskParent task)
-
-    matchesFilter Nothing _ = True
-    matchesFilter (Just f) tid = f == tid
-
-    matchesParent Nothing _ = True
-    matchesParent (Just f) parent = parent == Just f
-
--- | Integrate a verified task.
-doIntegrate ::
-  Core.PipelineConfig ->
-  SQL.Connection ->
-  Workspace.WorkspacePool ->
-  Task.Task ->
-  IO ()
-doIntegrate cfg db pool task = do
-  let tid = Task.taskId task
-      baseBranch = Core.pcBaseBranch cfg
-      repoRoot = Core.pcRepoRoot cfg
-  now <- getCurrentTime
+runCycle :: Config -> IO ()
+runCycle cfg = do
+  logMsg "--- cycle start ---"
+
+  state0 <- loadState cfg
+  activeAfterHarvest <- harvestCompletedRuns cfg (ssActiveRuns state0)
+
+  ready <- fetchReadyTasks cfg
+  let domainTasks = mapMaybe readyToDomainTask <| filter readyTaskIsOpen ready
+      slots = max 0 (cfgMaxActive cfg - length activeAfterHarvest)
+      candidates = chooseSpawnCandidates slots activeAfterHarvest domainTasks
+
+  logMsg
+    ( "ready="
+        <> tshow (length ready)
+        <> " open-with-namespace="
+        <> tshow (length domainTasks)
+        <> " active="
+        <> tshow (length activeAfterHarvest)
+        <> " spawn-candidates="
+        <> tshow (length candidates)
+    )
 
-  -- Record the integration attempt
-  rowId <-
-    State.recordRun
-      db
-      Core.RunRecord
-        { Core.rrId = Nothing,
-          Core.rrTaskId = tid,
-          Core.rrPhase = Core.Integrate,
-          Core.rrPatchset = Task.taskPatchsetCount task,
-          Core.rrAgentdRunId = Nothing,
-          Core.rrStatus = Core.Running,
-          Core.rrCostCents = 0,
-          Core.rrStartedAt = now,
-          Core.rrFinishedAt = Nothing,
-          Core.rrError = Nothing
-        }
-
-  result <- Integrate.integrateTask pool repoRoot tid baseBranch
-  case result of
-    Core.Integrated commitHash -> do
-      State.markRunFinished db rowId Core.Success Nothing Nothing
-      Task.updateTaskStatus tid Task.Done []
-      _ <- Task.addComment tid ("Pipeline: integrated into " <> baseBranch <> " at " <> commitHash) Task.System
-      Workspace.releaseWorkspace pool tid
-      logMsg ("Integrated " <> tid <> " at " <> commitHash)
-    Core.Conflict details -> do
-      State.markRunFinished db rowId Core.Failure Nothing (Just details)
-      Task.updateTaskStatus tid Task.Open []
-      _ <- Task.addComment tid ("Pipeline: integration conflict, resetting for re-dev: " <> details) Task.System
-      Workspace.releaseWorkspace pool tid
-      logMsg ("Integration conflict for " <> tid <> ": " <> details)
-    Core.IntegrateBuildFail details -> do
-      State.markRunFinished db rowId Core.Failure Nothing (Just details)
-      Task.updateTaskStatus tid Task.Open []
-      _ <- Task.addComment tid ("Pipeline: post-integration build failed, resetting: " <> details) Task.System
-      Workspace.releaseWorkspace pool tid
-      logMsg ("Integration build failed for " <> tid <> ": " <> details)
-
--- | Verify a completed dev run.
-doVerify ::
-  Core.PipelineConfig ->
-  SQL.Connection ->
-  Workspace.WorkspacePool ->
-  Core.ActiveDev ->
-  IO ()
-doVerify cfg db pool ad = do
-  let tid = Core.adTaskId ad
-      baseBranch = Core.pcBaseBranch cfg
-      workspace = Core.adWorkspace ad
-  now <- getCurrentTime
+  spawned <- catMaybes </ traverse (spawnForTask cfg) candidates
 
-  rowId <-
-    State.recordRun
-      db
-      Core.RunRecord
-        { Core.rrId = Nothing,
-          Core.rrTaskId = tid,
-          Core.rrPhase = Core.Verify,
-          Core.rrPatchset = Core.adPatchset ad,
-          Core.rrAgentdRunId = Nothing,
-          Core.rrStatus = Core.Running,
-          Core.rrCostCents = 0,
-          Core.rrStartedAt = now,
-          Core.rrFinishedAt = Nothing,
-          Core.rrError = Nothing
-        }
-
-  result <- Verify.verifyTask workspace tid baseBranch
-  case result of
-    Core.VerifyPass -> do
-      State.markRunFinished db rowId Core.Success Nothing Nothing
-      Task.updateTaskStatus tid Task.Verified []
-      _ <- Task.addComment tid "Pipeline: build verification passed" Task.System
-      logMsg ("Verified " <> tid)
-    Core.VerifySkip reason -> do
-      State.markRunFinished db rowId Core.Success Nothing (Just reason)
-      Task.updateTaskStatus tid Task.Verified []
-      _ <- Task.addComment tid ("Pipeline: verification skipped (" <> reason <> "), promoting") Task.System
-      logMsg ("Verified (skipped) " <> tid <> ": " <> reason)
-    Core.VerifyFail reason -> do
-      State.markRunFinished db rowId Core.Failure Nothing (Just reason)
-      Task.updateTaskStatus tid Task.Open []
-      _ <- Task.addComment tid ("Pipeline: verification failed:\n" <> reason) Task.System
-      Workspace.releaseWorkspace pool tid
-      logMsg ("Verify failed for " <> tid <> ": " <> T.take 200 reason)
-
--- | Spawn a dev agent for a task.
-doSpawnDev ::
-  Core.PipelineConfig ->
-  SQL.Connection ->
-  Workspace.WorkspacePool ->
-  STM.TVar (Map.Map Text Core.ActiveDev) ->
-  Task.Task ->
-  IO ()
-doSpawnDev cfg db pool activeDevs task = do
-  let tid = Task.taskId task
-      patchset = Task.taskPatchsetCount task
-  now <- getCurrentTime
+  let state1 = SchedulerState {ssActiveRuns = activeAfterHarvest <> spawned}
+  saveState cfg state1
 
-  -- Claim the task
-  Task.updateTaskStatus tid Task.InProgress []
-
-  -- Get or create workspace
-  wsResult <- Workspace.acquireWorkspace pool tid
-  case wsResult of
-    Left e -> do
-      logMsg ("Failed to acquire workspace for " <> tid <> ": " <> e)
-      Task.updateTaskStatus tid Task.Open []
-      pure ()
-    Right workspace -> do
-      -- Get review feedback from last verify failure
-      reviewFeedback <- State.getLastVerifyFailure db tid
-
-      -- Record the run
-      rowId <-
-        State.recordRun
-          db
-          Core.RunRecord
-            { Core.rrId = Nothing,
-              Core.rrTaskId = tid,
-              Core.rrPhase = Core.Dev,
-              Core.rrPatchset = patchset,
-              Core.rrAgentdRunId = Nothing,
-              Core.rrStatus = Core.Running,
-              Core.rrCostCents = 0,
-              Core.rrStartedAt = now,
-              Core.rrFinishedAt = Nothing,
-              Core.rrError = Nothing
-            }
-
-      -- Spawn the agent
-      result <- Dev.spawnDev cfg workspace task patchset reviewFeedback rowId
-      case result of
-        Left e -> do
-          State.markRunFinished db rowId Core.Failure Nothing (Just e)
-          Task.updateTaskStatus tid Task.Open []
-          _ <- Task.addComment tid ("Pipeline: failed to spawn dev agent: " <> e) Task.System
-          logMsg ("Failed to spawn dev for " <> tid <> ": " <> e)
-        Right ad -> do
-          -- Update the run record with the agentd run ID
-          -- (we didn't have it when we created the record)
-          STM.atomically <| STM.modifyTVar' activeDevs (Map.insert tid ad)
-          logMsg ("Spawned dev for " <> tid <> " (run=" <> Core.adRunId ad <> ")")
-
--- | Check if a task can be retried (within retry and cost limits).
-checkRetryLimits ::
-  Core.PipelineConfig ->
-  SQL.Connection ->
-  Task.Task ->
-  IO Bool
-checkRetryLimits cfg db task = do
-  let tid = Task.taskId task
-      patchset = Task.taskPatchsetCount task
-      maxRetries = Core.pcMaxRetries cfg
-      maxCost = Core.pcMaxTaskCost cfg
-
-  -- Check retry count
-  retryCount <- State.getRetryCount db tid Core.Dev patchset
-  when (maxRetries > 0 && retryCount >= maxRetries) <| do
-    logMsg ("Task " <> tid <> " exceeded max retries (" <> T.pack (show retryCount) <> "/" <> T.pack (show maxRetries) <> ")")
-    Task.updateTaskStatus tid Task.NeedsHelp []
-    _ <- Task.addComment tid ("Pipeline: exceeded " <> T.pack (show maxRetries) <> " retries on patchset " <> T.pack (show patchset)) Task.System
+  logMsg
+    ( "cycle summary: active="
+        <> tshow (length (ssActiveRuns state1))
+        <> " spawned="
+        <> tshow (length spawned)
+    )
+  logMsg "--- cycle end ---"
+
+-- ============================================================================
+-- Harvest
+-- ============================================================================
+
+harvestCompletedRuns :: Config -> [ActiveRun] -> IO [ActiveRun]
+harvestCompletedRuns cfg runs = do
+  kept <-
+    forM runs <| \ar -> do
+      status <- getAgentStatus cfg (arRunId ar)
+      if normalizeLabel (arsStatus status) == "running"
+        then pure <| Just ar
+        else do
+          finalizeRun cfg ar status
+          pure Nothing
+  pure (catMaybes kept)
+
+getAgentStatus :: Config -> Text -> IO AgentRunStatus
+getAgentStatus cfg runId = do
+  let cmd = cfgAgentdCmd cfg
+      args = ["status", T.unpack runId, "--json"]
+  res <- execCommand cmd args
+  case decodeJsonText (crStdout res) of
+    Right status -> pure status
+    Left _ ->
+      if crExitCode res == Exit.ExitSuccess
+        then pure <| AgentRunStatus "done" 0 Nothing
+        else
+          pure
+            <| AgentRunStatus
+              { arsStatus = "failed",
+                arsCostCents = 0,
+                arsError =
+                  Just
+                    ( T.strip
+                        ( if T.null (crStderr res)
+                            then crStdout res
+                            else crStderr res
+                        )
+                    )
+              }
+
+finalizeRun :: Config -> ActiveRun -> AgentRunStatus -> IO ()
+finalizeRun cfg ar status = do
+  let tid = arTaskId ar
+      runId = arRunId ar
+      domain = arDomain ar
+      cost = arsCostCents status
+      costRounded = round cost :: Int
+      baseMsg =
+        "Pipeline scheduler: run="
+          <> runId
+          <> " domain="
+          <> domain
+          <> " status="
+          <> arsStatus status
+          <> " cost="
+          <> tshow costRounded
+          <> "c"
+      msg =
+        case arsError status of
+          Nothing -> baseMsg
+          Just e -> baseMsg <> " error=" <> T.take 400 (singleLine e)
+
+  logMsg <| "Finalizing " <> tid <> " from run " <> runId <> " (" <> arsStatus status <> ")"
+
+  spendOk <- recordDomainSpend cfg domain tid runId costRounded
+
+  unless (cfgDryRun cfg) <| do
+    _ <- runTask cfg ["update", T.unpack tid, "review", "--json"]
+    let withSpendNote =
+          if spendOk
+            then msg
+            else msg <> " (fund-spend=failed)"
+    _ <- runTask cfg ["comment", T.unpack tid, T.unpack withSpendNote, "--json"]
     pure ()
 
-  if maxRetries > 0 && retryCount >= maxRetries
-    then pure False
-    else do
-      -- Check cost cap
-      if maxCost > 0
+-- ============================================================================
+-- Spawn
+-- ============================================================================
+
+spawnForTask :: Config -> DomainTask -> IO (Maybe ActiveRun)
+spawnForTask cfg dt = do
+  let tid = dtTaskId dt
+      domain = dtDomain dt
+
+  budgetOk <- checkDomainBudget cfg domain
+  if not budgetOk
+    then do
+      logMsg <| "Skipping " <> tid <> " (no budget for domain " <> domain <> ")"
+      pure Nothing
+    else
+      if cfgDryRun cfg
         then do
-          cost <- State.getCumulativeCost db tid
-          if cost >= fromIntegral maxCost
+          logMsg <| "DRY RUN: would spawn task " <> tid <> " in domain " <> domain
+          pure Nothing
+        else do
+          claimed <- claimTaskOpen cfg tid
+          if not claimed
             then do
-              logMsg ("Task " <> tid <> " exceeded cost cap (" <> T.pack (show cost) <> "c)")
-              Task.updateTaskStatus tid Task.NeedsHelp []
-              _ <- Task.addComment tid ("Pipeline: exceeded cost cap of " <> T.pack (show maxCost) <> "c") Task.System
-              pure False
-            else checkBackoff cfg db tid patchset retryCount
-        else checkBackoff cfg db tid patchset retryCount
-
--- | Check exponential backoff.
-checkBackoff ::
-  Core.PipelineConfig ->
-  SQL.Connection ->
-  Text ->
-  Int ->
-  Int ->
-  IO Bool
-checkBackoff cfg db tid patchset retryCount = do
-  if retryCount == 0
-    then pure True
-    else do
-      lastFail <- State.getLastFailureTime db tid patchset
-      case lastFail of
-        Nothing -> pure True
-        Just failTime -> do
-          now <- getCurrentTime
-          let baseInterval = fromIntegral (Core.pcPollInterval cfg) :: Double
-              delay = min 600 (baseInterval * (2 ^ retryCount))
-              elapsed = realToFrac (diffUTCTime now failTime) :: Double
-          if elapsed >= delay
-            then pure True
+              logMsg <| "Task not claimable as open (skip): " <> tid
+              pure Nothing
             else do
-              logMsg ("Task " <> tid <> " in backoff (" <> T.pack (show (round (delay - elapsed) :: Int)) <> "s remaining)")
-              pure False
-
--- | Recover stale state on startup.
-recoverStaleState ::
-  Core.PipelineConfig ->
-  SQL.Connection ->
-  Workspace.WorkspacePool ->
-  STM.TVar (Map.Map Text Core.ActiveDev) ->
-  IO ()
-recoverStaleState cfg db pool _activeDevs = do
-  -- 1. Check DB for "running" records where agent is dead
-  activeRuns <- State.getActiveDevRuns db
-  forM_ activeRuns <| \(rowId, tid, _runId, _startedAt) -> do
-    let workspace = Core.pcRoot cfg <> "/" <> T.unpack tid
-
-    -- Find the actual PID file for this task
-    pids <- findPidFiles workspace tid
-    alive <- case pids of
-      [] -> pure False
-      (pidPath : _) -> do
-        pidText <- T.strip </ TIO.readFile pidPath
-        Dev.isProcessAlive pidText
-
-    unless alive <| do
-      logMsg ("Recovery: run " <> T.pack (show rowId) <> " for " <> tid <> " has dead agent")
-      -- Check if agent left a commit
-      afterSha <- Git.branchSha workspace tid
-      -- Get the before-SHA from the base branch for comparison
-      baseSha <- Git.branchSha workspace (Core.pcBaseBranch cfg)
-      if afterSha /= baseSha && isJust afterSha
-        then do
-          logMsg ("Recovery: " <> tid <> " has commit " <> fromMaybe "?" afterSha <> ", proceeding to verify")
-          State.markRunFinished db rowId Core.Success Nothing Nothing
-          _ <- Task.addComment tid "Pipeline: recovered completed dev run on startup" Task.System
-          -- Create ActiveDev so verify can run
-          now <- getCurrentTime
-          let ad =
-                Core.ActiveDev
-                  { Core.adTaskId = tid,
-                    Core.adRunId = "recovered-" <> tid,
-                    Core.adWorkspace = workspace,
-                    Core.adStartedAt = now,
-                    Core.adBeforeSha = baseSha,
-                    Core.adPatchset = 0,
-                    Core.adNamespace = Nothing, -- will be looked up
-                    Core.adRunDbId = rowId
-                  }
-          -- Look up namespace from task
-          tasks <- Task.loadTasks
-          let ns = Task.taskNamespace =<< Task.findTask tid tasks
-          let ad' = ad {Core.adNamespace = ns}
-          doVerify cfg db pool ad'
-        else do
-          logMsg ("Recovery: " <> tid <> " has no new commit, resetting to Open")
-          State.markRunFinished db rowId Core.Failure Nothing (Just "Agent died without committing")
-          Task.updateTaskStatus tid Task.Open []
-          _ <- Task.addComment tid "Pipeline: agent died without producing a commit, resetting" Task.System
-          pure ()
-
-  -- 2. Find worktrees with no DB record at all
-  staleTasks <- Workspace.findStaleWorktrees pool
-  forM_ staleTasks <| \tid -> do
-    hasRun <- State.hasActiveRun db tid
-    unless hasRun <| do
-      logMsg ("Recovering orphaned worktree for " <> tid)
-      tasks <- Task.loadTasks
-      case Task.findTask tid tasks of
-        Nothing -> Workspace.releaseWorkspace pool tid
-        Just task -> do
-          when (Task.taskStatus task == Task.InProgress) <| do
-            Task.updateTaskStatus tid Task.Open []
-            _ <- Task.addComment tid "Pipeline: recovered stale InProgress task on startup" Task.System
-            pure ()
-
--- | Find PID files for a task in its workspace.
-findPidFiles :: FilePath -> Text -> IO [FilePath]
-findPidFiles workspace tid = do
-  let dir = workspace <> "/_/tmp/pipeline"
-  exists <- Dir.doesDirectoryExist dir
+              runId <- mkRunId domain tid
+              promptPath <- writePrompt cfg dt runId
+              started <- startAgentRun cfg promptPath runId
+              if started
+                then do
+                  _ <- runTask cfg ["comment", T.unpack tid, T.unpack ("Pipeline scheduler: started run=" <> runId <> " domain=" <> domain), "--json"]
+                  now <- getCurrentTime
+                  pure
+                    <| Just
+                      ActiveRun
+                        { arTaskId = tid,
+                          arNamespace = dtNamespace dt,
+                          arDomain = domain,
+                          arRunId = runId,
+                          arStartedAt = now
+                        }
+                else do
+                  logMsg <| "Spawn failed for " <> tid <> ", resetting to open"
+                  _ <- runTask cfg ["update", T.unpack tid, "open", "--json"]
+                  _ <- runTask cfg ["comment", T.unpack tid, "Pipeline scheduler: failed to spawn agentd run", "--json"]
+                  pure Nothing
+
+claimTaskOpen :: Config -> Text -> IO Bool
+claimTaskOpen cfg tid = do
+  res <- runTask cfg ["claim", T.unpack tid, "open", "in-progress", "--json"]
+  pure (crExitCode res == Exit.ExitSuccess)
+
+startAgentRun :: Config -> FilePath -> Text -> IO Bool
+startAgentRun cfg promptPath runId = do
+  let baseArgs = ["run", promptPath, "-n", T.unpack runId]
+      timeoutArgs = maybe [] (\n -> ["--timeout", show n]) (cfgTimeoutSec cfg)
+      maxIterArgs = maybe [] (\n -> ["--max-iter", show n]) (cfgMaxIter cfg)
+      maxCostArgs = maybe [] (\n -> ["--max-cost", show n]) (cfgMaxCostCents cfg)
+      args = baseArgs <> timeoutArgs <> maxIterArgs <> maxCostArgs
+  res <- execCommand (cfgAgentdCmd cfg) args
+  case crExitCode res of
+    Exit.ExitSuccess -> do
+      logMsg <| "Spawned agentd run " <> runId
+      pure True
+    _ -> do
+      logMsg <| "agentd run failed for " <> runId <> ": " <> T.take 400 (singleLine (crStderr res <> " " <> crStdout res))
+      pure False
+
+-- ============================================================================
+-- Budget
+-- ============================================================================
+
+checkDomainBudget :: Config -> Text -> IO Bool
+checkDomainBudget cfg domain
+  | cfgDryRun cfg = pure True
+  | otherwise = do
+      let reserve = cfgReservationCents cfg
+          d = T.unpack domain
+          r = show reserve
+          attempts =
+            [ ["check", "--domain", d, "--reserve-cents", r, "--json"],
+              ["budget", "--domain", d, "--json"],
+              ["available", "--domain", d, "--json"],
+              ["can-spend", "--domain", d, "--cents", r, "--json"]
+            ]
+      tryBudgetAttempts (cfgFundCmd cfg) reserve attempts
+
+tryBudgetAttempts :: String -> Int -> [[String]] -> IO Bool
+tryBudgetAttempts _ _ [] = pure False
+tryBudgetAttempts cmd reserve (args : rest) = do
+  res <- execCommand cmd args
+  let merged = pickNonEmpty (crStdout res) (crStderr res)
+  case parseBudgetAllowed reserve merged of
+    Just ok -> pure ok
+    Nothing ->
+      if crExitCode res == Exit.ExitSuccess
+        then pure True
+        else tryBudgetAttempts cmd reserve rest
+
+parseBudgetAllowed :: Int -> Text -> Maybe Bool
+parseBudgetAllowed reserve raw =
+  let stripped = T.strip raw
+      reserveD = fromIntegral reserve :: Double
+   in case decodeJsonText stripped :: Either Text Aeson.Value of
+        Right v ->
+          findBudgetBool v
+            <|> ((>= reserveD) </ findBudgetNumber v)
+        Left _ ->
+          parseLooseBool stripped
+            <|> ((>= reserveD) </ (Read.readMaybe (T.unpack stripped) :: Maybe Double))
+  where
+    findBudgetBool v =
+      findBoolField
+        [ "available",
+          "allowed",
+          "allow",
+          "ok",
+          "has_budget",
+          "can_spend"
+        ]
+        v
+
+    findBudgetNumber v =
+      findNumberField
+        [ "remaining_cents",
+          "available_cents",
+          "budget_remaining_cents",
+          "remaining",
+          "balance_cents"
+        ]
+        v
+
+recordDomainSpend :: Config -> Text -> Text -> Text -> Int -> IO Bool
+recordDomainSpend cfg domain tid runId costCents
+  | cfgDryRun cfg = do
+      logMsg
+        ( "DRY RUN: would record fund spend domain="
+            <> domain
+            <> " task="
+            <> tid
+            <> " run="
+            <> runId
+            <> " cents="
+            <> tshow costCents
+        )
+      pure True
+  | otherwise = do
+      let d = T.unpack domain
+          t = T.unpack tid
+          r = T.unpack runId
+          c = show costCents
+          attempts =
+            [ ["spend", "--domain", d, "--task-id", t, "--run-id", r, "--cents", c, "--json"],
+              ["debit", "--domain", d, "--cents", c, "--json"],
+              ["record", "--domain", d, "--cost-cents", c, "--task-id", t, "--json"]
+            ]
+      runFirstSuccessful (cfgFundCmd cfg) attempts
+
+runFirstSuccessful :: String -> [[String]] -> IO Bool
+runFirstSuccessful _ [] = pure False
+runFirstSuccessful cmd (args : rest) = do
+  res <- execCommand cmd args
+  if crExitCode res == Exit.ExitSuccess
+    then pure True
+    else runFirstSuccessful cmd rest
+
+-- ============================================================================
+-- Task scan / selection
+-- ============================================================================
+
+fetchReadyTasks :: Config -> IO [ReadyTask]
+fetchReadyTasks cfg = do
+  res <- runTask cfg ["ready", "--json"]
+  if crExitCode res /= Exit.ExitSuccess
+    then do
+      logMsg <| "task ready failed: " <> T.take 300 (singleLine (crStderr res <> " " <> crStdout res))
+      pure []
+    else
+      case decodeJsonText (crStdout res) of
+        Right tasks -> pure tasks
+        Left err -> do
+          logMsg <| "Could not decode task ready JSON: " <> err
+          pure []
+
+readyTaskIsOpen :: ReadyTask -> Bool
+readyTaskIsOpen t =
+  case rtStatus t of
+    Nothing -> True
+    Just st -> normalizeLabel st == "open"
+
+readyToDomainTask :: ReadyTask -> Maybe DomainTask
+readyToDomainTask rt =
+  case rtNamespace rt of
+    Nothing -> Nothing
+    Just ns ->
+      Just
+        DomainTask
+          { dtTaskId = rtTaskId rt,
+            dtNamespace = ns,
+            dtDomain = domainFromNamespace ns,
+            dtTitle = rtTitle rt
+          }
+
+-- | Current domain strategy: budget buckets are namespace-aligned.
+domainFromNamespace :: Text -> Text
+domainFromNamespace = T.strip
+
+chooseSpawnCandidates :: Int -> [ActiveRun] -> [DomainTask] -> [DomainTask]
+chooseSpawnCandidates maxCount active candidates = go maxCount Set.empty [] candidates
+  where
+    activeDomains = Set.fromList (map arDomain active)
+
+    go n _ acc _ | n <= 0 = reverse acc
+    go _ _ acc [] = reverse acc
+    go n seen acc (x : xs)
+      | dtDomain x `Set.member` activeDomains = go n seen acc xs
+      | dtDomain x `Set.member` seen = go n seen acc xs
+      | otherwise = go (n - 1) (Set.insert (dtDomain x) seen) (x : acc) xs
+
+-- ============================================================================
+-- Prompt generation
+-- ============================================================================
+
+writePrompt :: Config -> DomainTask -> Text -> IO FilePath
+writePrompt cfg dt runId = do
+  taskJson <- fetchTaskJson cfg (dtTaskId dt)
+  let dir = cfgRoot cfg FP.</> "prompts"
+      path = dir FP.</> (T.unpack runId <> ".md")
+      content = buildPrompt cfg dt taskJson
+  Dir.createDirectoryIfMissing True dir
+  TIO.writeFile path content
+  pure path
+
+fetchTaskJson :: Config -> Text -> IO (Maybe Aeson.Value)
+fetchTaskJson cfg tid = do
+  res <- runTask cfg ["show", T.unpack tid, "--json"]
+  if crExitCode res /= Exit.ExitSuccess
+    then pure Nothing
+    else case decodeJsonText (crStdout res) of
+      Right v -> pure (Just v)
+      Left _ -> pure Nothing
+
+buildPrompt :: Config -> DomainTask -> Maybe Aeson.Value -> Text
+buildPrompt cfg dt maybeTaskJson =
+  T.unlines
+    <| frontmatter
+    <> [ "",
+         "# Pipeline Scheduled Task",
+         "",
+         "You were scheduled by Omni/Pipeline for domain-scoped autonomous work.",
+         "",
+         "## Scheduling Context",
+         "- Task ID: " <> dtTaskId dt,
+         "- Domain: " <> dtDomain dt,
+         "- Namespace: " <> dtNamespace dt,
+         "- Run ID: " <> runIdHint,
+         "",
+         "## Required Workflow",
+         "1. Read AGENTS.md and follow project conventions.",
+         "2. Implement the task in this repository.",
+         "3. Run relevant build/test verification for the namespace.",
+         "4. Commit with trailer `Task-Id: " <> dtTaskId dt <> "`.",
+         "5. Do NOT change task status. The scheduler handles status transitions.",
+         "",
+         "## Task Record (JSON)",
+         fromMaybe "(task show JSON unavailable)" maybeJson,
+         ""
+       ]
+  where
+    runIdHint = "(provided by agentd runtime)"
+
+    maybeJson =
+      (\v -> TE.decodeUtf8 (BL.toStrict (Aeson.encode v))) </ maybeTaskJson
+
+    frontmatter =
+      [ "---",
+        "toolchain: " <> cfgToolchain cfg,
+        "workspace: ."
+      ]
+        <> maybe [] (\p -> ["provider: " <> p]) (cfgProvider cfg)
+        <> maybe [] (\m -> ["model: " <> m]) (cfgModel cfg)
+        <> maybe [] (\n -> ["max_iterations: " <> tshow n]) (cfgMaxIter cfg)
+        <> maybe [] (\n -> ["max_cost_cents: " <> tshow n]) (cfgMaxCostCents cfg)
+        <> ["---"]
+
+-- ============================================================================
+-- State persistence
+-- ============================================================================
+
+statePath :: Config -> FilePath
+statePath cfg = cfgRoot cfg FP.</> "scheduler-state.json"
+
+loadState :: Config -> IO SchedulerState
+loadState cfg = do
+  let path = statePath cfg
+  exists <- Dir.doesFileExist path
   if not exists
-    then pure []
+    then pure (SchedulerState [])
     else do
-      files <- Dir.listDirectory dir
-      let prefix = "dev-" <> T.unpack tid <> "-"
-      pure
-        <| map (\f -> dir <> "/" <> f)
-        <| filter (\f -> prefix `isPrefixOf` f && ".pid" `isSuffixOf` f)
-        <| files
+      content <- BL.readFile path
+      case Aeson.eitherDecode content of
+        Left err -> do
+          logMsg <| "State decode failed, starting empty: " <> T.pack err
+          pure (SchedulerState [])
+        Right st -> pure st
+
+saveState :: Config -> SchedulerState -> IO ()
+saveState cfg st =
+  BL.writeFile (statePath cfg) (Aeson.encode st)
 
 -- ============================================================================
 -- Status command
 -- ============================================================================
 
-doStatus :: String -> Bool -> IO ()
+doStatus :: FilePath -> Bool -> IO ()
 doStatus root asJson = do
-  let dbPath = root <> "/state.db"
-  exists <- Dir.doesFileExist dbPath
-  unless exists <| do
-    TIO.putStrLn "Pipeline state DB not found. Has the pipeline been started?"
-    pure ()
-  when exists <| do
-    db <- State.initPipelineDb dbPath
-    allTasks <- Task.loadTasks
-
-    let workTasks = filter (\t -> Task.taskType t == Task.WorkTask) allTasks
-        countByStatus s = length (filter (\t -> Task.taskStatus t == s) workTasks)
-
-    activeRuns <- State.getActiveDevRuns db
-
-    if asJson
-      then do
-        -- Simple JSON output
-        TIO.putStrLn "{"
-        TIO.putStrLn ("  \"open\": " <> T.pack (show (countByStatus Task.Open)) <> ",")
-        TIO.putStrLn ("  \"in_progress\": " <> T.pack (show (countByStatus Task.InProgress)) <> ",")
-        TIO.putStrLn ("  \"verified\": " <> T.pack (show (countByStatus Task.Verified)) <> ",")
-        TIO.putStrLn ("  \"done\": " <> T.pack (show (countByStatus Task.Done)) <> ",")
-        TIO.putStrLn ("  \"needs_help\": " <> T.pack (show (countByStatus Task.NeedsHelp)) <> ",")
-        TIO.putStrLn ("  \"active_dev_runs\": " <> T.pack (show (length activeRuns)))
-        TIO.putStrLn "}"
-      else do
-        TIO.putStrLn ("Pipeline status (root=" <> T.pack root <> ")")
-        TIO.putStrLn ""
-        TIO.putStrLn "Tasks:"
-        TIO.putStrLn ("  Open:        " <> T.pack (show (countByStatus Task.Open)))
-        TIO.putStrLn ("  InProgress:  " <> T.pack (show (countByStatus Task.InProgress)))
-        TIO.putStrLn ("  Verified:    " <> T.pack (show (countByStatus Task.Verified)))
-        TIO.putStrLn ("  Done:        " <> T.pack (show (countByStatus Task.Done)))
-        TIO.putStrLn ("  NeedsHelp:   " <> T.pack (show (countByStatus Task.NeedsHelp)))
-        TIO.putStrLn ""
-        TIO.putStrLn ("Active dev runs: " <> T.pack (show (length activeRuns)))
+  let cfg =
+        Config
+          { cfgRoot = root,
+            cfgIntervalSec = 60,
+            cfgMaxActive = 0,
+            cfgReservationCents = 0,
+            cfgToolchain = "haskell",
+            cfgTaskCmd = "task",
+            cfgFundCmd = "fund",
+            cfgAgentdCmd = "agentd",
+            cfgProvider = Nothing,
+            cfgModel = Nothing,
+            cfgTimeoutSec = Nothing,
+            cfgMaxIter = Nothing,
+            cfgMaxCostCents = Nothing,
+            cfgOnce = True,
+            cfgDryRun = False
+          }
+  st <- loadState cfg
+  if asJson
+    then do
+      BL.putStr <| Aeson.encode <| Aeson.object ["active_runs" Aeson..= ssActiveRuns st, "count" Aeson..= length (ssActiveRuns st)]
+      TIO.putStrLn ""
+    else do
+      TIO.putStrLn <| "Pipeline scheduler status (root=" <> T.pack root <> ")"
+      TIO.putStrLn <| "Active runs: " <> tshow (length (ssActiveRuns st))
+      forM_ (ssActiveRuns st) <| \r ->
+        TIO.putStrLn
+          ( "  - "
+              <> arTaskId r
+              <> "  run="
+              <> arRunId r
+              <> "  domain="
+              <> arDomain r
+          )
 
 -- ============================================================================
--- Helpers
+-- Command helpers
 -- ============================================================================
 
+runTask :: Config -> [String] -> IO CommandResult
+runTask cfg = execCommand (cfgTaskCmd cfg)
+
+execCommand :: String -> [String] -> IO CommandResult
+execCommand cmd args = do
+  result <- try (Process.readProcessWithExitCode cmd args "") :: IO (Either SomeException (Exit.ExitCode, String, String))
+  case result of
+    Left e ->
+      pure
+        CommandResult
+          { crExitCode = Exit.ExitFailure 127,
+            crStdout = "",
+            crStderr = T.pack (show e)
+          }
+    Right (code, out, err) ->
+      pure
+        CommandResult
+          { crExitCode = code,
+            crStdout = T.pack out,
+            crStderr = T.pack err
+          }
+
+mkRunId :: Text -> Text -> IO Text
+mkRunId domain tid = do
+  now <- getCurrentTime
+  let epoch = floor (utcTimeToPOSIXSeconds now) :: Integer
+  pure
+    <| "pipeline-"
+    <> slug domain
+    <> "-"
+    <> slug tid
+    <> "-"
+    <> T.pack (show epoch)
+
+slug :: Text -> Text
+slug =
+  T.dropAround (== '-')
+    <. T.map
+      ( \c ->
+          if Char.isAlphaNum c
+            then Char.toLower c
+            else '-'
+      )
+
+normalizeLabel :: Text -> Text
+normalizeLabel = T.toLower <. T.filter Char.isAlphaNum
+
+singleLine :: Text -> Text
+singleLine = T.replace "\n" " " <. T.replace "\r" " "
+
+pickNonEmpty :: Text -> Text -> Text
+pickNonEmpty a b =
+  if T.null (T.strip a)
+    then b
+    else a
+
+decodeJsonText :: (Aeson.FromJSON a) => Text -> Either Text a
+decodeJsonText txt =
+  first T.pack (Aeson.eitherDecode (BL.fromStrict (TE.encodeUtf8 (T.strip txt))))
+
+parseLooseBool :: Text -> Maybe Bool
+parseLooseBool raw =
+  let t = normalizeLabel raw
+   in if t `elem` ["true", "yes", "ok", "allow", "allowed", "1"]
+        then Just True
+        else
+          if t `elem` ["false", "no", "deny", "denied", "0"]
+            then Just False
+            else Nothing
+
+findBoolField :: [Text] -> Aeson.Value -> Maybe Bool
+findBoolField keys v =
+  valueToBool =<< findFieldValue keys v
+
+findNumberField :: [Text] -> Aeson.Value -> Maybe Double
+findNumberField keys v =
+  valueToNumber =<< findFieldValue keys v
+
+findFieldValue :: [Text] -> Aeson.Value -> Maybe Aeson.Value
+findFieldValue keys = \case
+  Aeson.Object obj ->
+    let direct =
+          listToMaybe
+            [ val
+              | k <- keys,
+                val <- maybeToList (KeyMap.lookup (Key.fromText k) obj)
+            ]
+        nested = listToMaybe (mapMaybe (findFieldValue keys) (KeyMap.elems obj))
+     in direct <|> nested
+  Aeson.Array arr ->
+    listToMaybe (mapMaybe (findFieldValue keys) (toList arr))
+  _ -> Nothing
+
+valueToBool :: Aeson.Value -> Maybe Bool
+valueToBool = \case
+  Aeson.Bool b -> Just b
+  Aeson.String t -> parseLooseBool t
+  Aeson.Number n -> Just (realToFrac n > (0 :: Double))
+  _ -> Nothing
+
+valueToNumber :: Aeson.Value -> Maybe Double
+valueToNumber = \case
+  Aeson.Number n -> Just (realToFrac n)
+  Aeson.String t -> Read.readMaybe (T.unpack (T.strip t))
+  Aeson.Bool True -> Just 1
+  Aeson.Bool False -> Just 0
+  _ -> Nothing
+
 logMsg :: Text -> IO ()
 logMsg msg = do
   now <- getCurrentTime
-  TIO.putStrLn ("[" <> T.pack (show now) <> "] " <> msg)
+  TIO.putStrLn <| "[" <> T.pack (show now) <> "] " <> msg
   IO.hFlush IO.stdout
+
+-- ============================================================================
+-- Tests
+-- ============================================================================
+
+test :: Test.Tree
+test =
+  Test.group
+    "Omni.Pipeline"
+    [ Test.unit "parseBudgetAllowed reads boolean fields" <| do
+        parseBudgetAllowed 300 "{\"available\": true}" Test.@?= Just True
+        parseBudgetAllowed 300 "{\"ok\": false}" Test.@?= Just False,
+      Test.unit "parseBudgetAllowed reads remaining cents" <| do
+        parseBudgetAllowed 300 "{\"remaining_cents\": 500}" Test.@?= Just True
+        parseBudgetAllowed 300 "{\"remaining_cents\": 100}" Test.@?= Just False,
+      Test.unit "chooseSpawnCandidates enforces one run per domain" <| do
+        let t1 = DomainTask "t-1" "Omni/A.hs" "Omni/A.hs" (Just "A")
+            t2 = DomainTask "t-2" "Omni/B.hs" "Omni/B.hs" (Just "B")
+            t3 = DomainTask "t-3" "Omni/A.hs" "Omni/A.hs" (Just "A2")
+            now = Read.read "2026-01-01 00:00:00 UTC"
+            active = [ActiveRun "t-9" "Biz/X.hs" "Biz/X.hs" "run-9" now]
+            chosen = chooseSpawnCandidates 5 active [t1, t2, t3]
+        map dtTaskId chosen Test.@?= ["t-1", "t-2"],
+      Test.unit "domainFromNamespace keeps namespace alignment" <| do
+        domainFromNamespace "Omni/Pipeline.hs" Test.@?= "Omni/Pipeline.hs"
+    ]