commit d20cb79f4bcd838ef05bbe6f149dfeb67f7829a0
Author: Coder Agent <coder@agents.omni>
Date: Thu Feb 19 17:51:06 2026
Pipeline scheduler: orchestrate task/fund/agentd by domain
Task-Id: t-623 (scheduler domain orchestration)
diff --git a/Omni/Pipeline.hs b/Omni/Pipeline.hs
index dbb5ed6a..ace29ac5 100755
--- a/Omni/Pipeline.hs
+++ b/Omni/Pipeline.hs
@@ -1,753 +1,890 @@
#!/usr/bin/env run.sh
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE NoImplicitPrelude #-}
--- | Pipeline: automated dev → verify → ship.
+-- | Pipeline scheduler: task -> fund -> agentd orchestration.
--
--- Single-process pipeline that drives tasks from Open through development,
--- build verification, and integration into the base branch.
---
--- Only the dev phase uses an LLM agent. Verify and integrate are deterministic.
+-- Scans `task ready`, gates by `fund` budget per namespace/domain,
+-- spawns `agentd` runs tagged by domain (in run id), and on completion
+-- moves tasks to Review while recording cost back to fund.
--
-- : out pipeline
-- : dep aeson
-- : dep optparse-applicative
--- : dep sqlite-simple
--- : dep async
--- : dep stm
+-- : dep bytestring
+-- : dep process
+-- : dep text
+-- : dep time
module Omni.Pipeline where
import Alpha
-import qualified Control.Concurrent.STM as STM
-import qualified Data.Map.Strict as Map
+import qualified Data.Aeson as Aeson
+import qualified Data.Aeson.Key as Key
+import qualified Data.Aeson.KeyMap as KeyMap
+import qualified Data.ByteString.Lazy as BL
+import qualified Data.Char as Char
+import qualified Data.Set as Set
import qualified Data.Text as T
+import qualified Data.Text.Encoding as TE
import qualified Data.Text.IO as TIO
-import Data.Time (diffUTCTime, getCurrentTime)
-import qualified Database.SQLite.Simple as SQL
+import Data.Time (UTCTime, getCurrentTime)
+import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
import qualified Omni.Cli as Cli
-import qualified Omni.Pipeline.Core as Core
-import qualified Omni.Pipeline.Dev as Dev
-import qualified Omni.Pipeline.Git as Git
-import qualified Omni.Pipeline.Integrate as Integrate
-import qualified Omni.Pipeline.State as State
-import qualified Omni.Pipeline.Verify as Verify
-import qualified Omni.Pipeline.Workspace as Workspace
-import qualified Omni.Task.Core as Task
import qualified Omni.Test as Test
import qualified Options.Applicative as Opt
import qualified System.Directory as Dir
-import qualified System.Environment as Env
+import qualified System.Exit as Exit
+import qualified System.FilePath as FP
import qualified System.IO as IO
+import qualified System.Process as Process
import qualified Text.Read as Read
-main :: IO ()
-main =
- join (Opt.execParser (Cli.info (parser <**> Opt.helper) Cli.fullDesc))
+-- ============================================================================
+-- Types
+-- ============================================================================
-test :: Test.Tree
-test =
- Test.group
- "Omni.Pipeline"
- [ Test.unit "buildDevPrompt includes task id" <| do
- let task =
- Task.Task
- { Task.taskId = "t-999",
- Task.taskTitle = "Test task",
- Task.taskType = Task.WorkTask,
- Task.taskParent = Nothing,
- Task.taskNamespace = Just "Omni/Test.hs",
- Task.taskStatus = Task.Open,
- Task.taskPatchsetCount = 0,
- Task.taskPriority = Task.P2,
- Task.taskComplexity = Nothing,
- Task.taskDependencies = [],
- Task.taskDescription = "A test task",
- Task.taskComments = [],
- Task.taskTags = Task.Tags [],
- Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
- Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
- }
- prompt = Dev.buildDevPrompt "live" task 0 Nothing
- Test.assertBool "prompt contains task id" (T.isInfixOf "t-999" prompt)
- Test.assertBool "prompt says no status changes" (T.isInfixOf "Do NOT change task status" prompt),
- --
- Test.unit "buildDevPrompt includes review feedback" <| do
- let task =
- Task.Task
- { Task.taskId = "t-888",
- Task.taskTitle = "Retry task",
- Task.taskType = Task.WorkTask,
- Task.taskParent = Nothing,
- Task.taskNamespace = Just "Omni/Foo.hs",
- Task.taskStatus = Task.InProgress,
- Task.taskPatchsetCount = 1,
- Task.taskPriority = Task.P2,
- Task.taskComplexity = Nothing,
- Task.taskDependencies = [],
- Task.taskDescription = "Fix the thing",
- Task.taskComments = [],
- Task.taskTags = Task.Tags [],
- Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
- Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
- }
- prompt = Dev.buildDevPrompt "live" task 1 (Just "Build failed: missing import")
- Test.assertBool "prompt contains feedback" (T.isInfixOf "missing import" prompt)
- Test.assertBool "prompt mentions previous rejection" (T.isInfixOf "previous patchset was rejected" (T.toLower prompt)),
- --
- Test.unit "buildDevPrompt includes comments" <| do
- let task =
- Task.Task
- { Task.taskId = "t-777",
- Task.taskTitle = "Task with comments",
- Task.taskType = Task.WorkTask,
- Task.taskParent = Nothing,
- Task.taskNamespace = Just "Omni/Foo.hs",
- Task.taskStatus = Task.NeedsHelp,
- Task.taskPatchsetCount = 0,
- Task.taskPriority = Task.P2,
- Task.taskComplexity = Nothing,
- Task.taskDependencies = [],
- Task.taskDescription = "Do the thing",
- Task.taskComments =
- [ Task.Comment
- { Task.commentText = "What format should the output be?",
- Task.commentAuthor = Task.Agent Task.Engineer,
- Task.commentCreatedAt = Read.read "2026-01-01 01:00:00 UTC"
- },
- Task.Comment
- { Task.commentText = "Use JSON output",
- Task.commentAuthor = Task.Human,
- Task.commentCreatedAt = Read.read "2026-01-01 02:00:00 UTC"
- }
- ],
- Task.taskTags = Task.Tags [],
- Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
- Task.taskUpdatedAt = Read.read "2026-01-01 02:00:00 UTC"
- }
- prompt = Dev.buildDevPrompt "live" task 0 Nothing
- Test.assertBool "prompt contains Comments section" (T.isInfixOf "### Comments" prompt)
- Test.assertBool "prompt contains agent question" (T.isInfixOf "What format should the output be?" prompt)
- Test.assertBool "prompt contains human answer" (T.isInfixOf "Use JSON output" prompt)
- Test.assertBool "prompt contains human author tag" (T.isInfixOf "[human]" prompt)
- Test.assertBool "prompt contains agent author tag" (T.isInfixOf "[agent:engineer]" prompt),
- --
- Test.unit "buildDevPrompt includes needs-input instructions" <| do
- let task =
- Task.Task
- { Task.taskId = "t-666",
- Task.taskTitle = "Ambiguous task",
- Task.taskType = Task.WorkTask,
- Task.taskParent = Nothing,
- Task.taskNamespace = Nothing,
- Task.taskStatus = Task.Open,
- Task.taskPatchsetCount = 0,
- Task.taskPriority = Task.P2,
- Task.taskComplexity = Nothing,
- Task.taskDependencies = [],
- Task.taskDescription = "Do something",
- Task.taskComments = [],
- Task.taskTags = Task.Tags [],
- Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
- Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
- }
- prompt = Dev.buildDevPrompt "live" task 0 Nothing
- Test.assertBool "prompt contains needs-input instruction" (T.isInfixOf "ambiguous or missing critical details" prompt)
- Test.assertBool "prompt mentions task comment command" (T.isInfixOf "task comment" prompt)
- Test.assertBool "prompt mentions exit 0" (T.isInfixOf "exit 0 without committing" prompt),
- --
- Test.unit "buildDevPrompt omits Comments section when no comments" <| do
- let task =
- Task.Task
- { Task.taskId = "t-555",
- Task.taskTitle = "No comments task",
- Task.taskType = Task.WorkTask,
- Task.taskParent = Nothing,
- Task.taskNamespace = Just "Omni/Bar.hs",
- Task.taskStatus = Task.Open,
- Task.taskPatchsetCount = 0,
- Task.taskPriority = Task.P2,
- Task.taskComplexity = Nothing,
- Task.taskDependencies = [],
- Task.taskDescription = "Simple task",
- Task.taskComments = [],
- Task.taskTags = Task.Tags [],
- Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
- Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
- }
- prompt = Dev.buildDevPrompt "live" task 0 Nothing
- Test.assertBool "prompt does not contain Comments section" (not (T.isInfixOf "### Comments" prompt))
- ]
+data Config = Config
+ { cfgRoot :: FilePath,
+ cfgIntervalSec :: Int,
+ cfgMaxActive :: Int,
+ cfgReservationCents :: Int,
+ cfgToolchain :: Text,
+ cfgTaskCmd :: String,
+ cfgFundCmd :: String,
+ cfgAgentdCmd :: String,
+ cfgProvider :: Maybe Text,
+ cfgModel :: Maybe Text,
+ cfgTimeoutSec :: Maybe Int,
+ cfgMaxIter :: Maybe Int,
+ cfgMaxCostCents :: Maybe Int,
+ cfgOnce :: Bool,
+ cfgDryRun :: Bool
+ }
+ deriving (Show, Eq)
+
+data ActiveRun = ActiveRun
+ { arTaskId :: Text,
+ arNamespace :: Text,
+ arDomain :: Text,
+ arRunId :: Text,
+ arStartedAt :: UTCTime
+ }
+ deriving (Show, Eq, Generic)
+
+instance Aeson.ToJSON ActiveRun
+
+instance Aeson.FromJSON ActiveRun
+
+newtype SchedulerState = SchedulerState
+ { ssActiveRuns :: [ActiveRun]
+ }
+ deriving (Show, Eq, Generic)
+
+instance Aeson.ToJSON SchedulerState
+
+instance Aeson.FromJSON SchedulerState
+
+data ReadyTask = ReadyTask
+ { rtTaskId :: Text,
+ rtNamespace :: Maybe Text,
+ rtTitle :: Maybe Text,
+ rtStatus :: Maybe Text
+ }
+ deriving (Show, Eq)
+
+instance Aeson.FromJSON ReadyTask where
+ parseJSON =
+ Aeson.withObject "ReadyTask" <| \o ->
+ ReadyTask
+ </ o Aeson..: "taskId"
+ <*> o Aeson..:? "taskNamespace"
+ <*> o Aeson..:? "taskTitle"
+ <*> o Aeson..:? "taskStatus"
+
+data DomainTask = DomainTask
+ { dtTaskId :: Text,
+ dtNamespace :: Text,
+ dtDomain :: Text,
+ dtTitle :: Maybe Text
+ }
+ deriving (Show, Eq)
+
+data AgentRunStatus = AgentRunStatus
+ { arsStatus :: Text,
+ arsCostCents :: Double,
+ arsError :: Maybe Text
+ }
+ deriving (Show, Eq)
+
+instance Aeson.FromJSON AgentRunStatus where
+ parseJSON =
+ Aeson.withObject "AgentRunStatus" <| \o ->
+ AgentRunStatus
+ </ (o Aeson..:? "status" Aeson..!= "unknown")
+ <*> (o Aeson..:? "cost_cents" Aeson..!= 0)
+ <*> o Aeson..:? "error"
+
+data CommandResult = CommandResult
+ { crExitCode :: Exit.ExitCode,
+ crStdout :: Text,
+ crStderr :: Text
+ }
+ deriving (Show, Eq)
-- ============================================================================
--- CLI
+-- Entry / CLI
-- ============================================================================
+main :: IO ()
+main =
+ join (Opt.execParser (Cli.info (parser <**> Opt.helper) Cli.fullDesc))
+
parser :: Cli.Parser (IO ())
parser =
Cli.subparser
- ( Cli.command "run" (Cli.info runParser (Cli.progDesc "Start the pipeline"))
- <> Cli.command "status" (Cli.info statusParser (Cli.progDesc "Show pipeline health"))
+ ( Cli.command "run" (Cli.info runParser (Cli.progDesc "Run pipeline scheduler loop"))
+ <> Cli.command "status" (Cli.info statusParser (Cli.progDesc "Show scheduler state"))
+ <> Cli.command "test" (Cli.info (pure (Test.run test)) (Cli.progDesc "Run pipeline tests"))
)
runParser :: Cli.Parser (IO ())
-runParser =
- doRun
- </ Cli.strOption (Cli.long "root" <> Cli.value "_/pipeline" <> Cli.help "Workspace root")
- <*> Cli.strOption (Cli.long "base" <> Cli.value "live" <> Cli.help "Base branch")
- <*> Cli.option Cli.auto (Cli.long "concurrency" <> Cli.value 1 <> Cli.help "Max concurrent dev agents")
- <*> Cli.option Cli.auto (Cli.long "interval" <> Cli.value 60 <> Cli.help "Poll interval (seconds)")
- <*> Cli.option Cli.auto (Cli.long "max-retries" <> Cli.value 3 <> Cli.help "Max retries per patchset")
- <*> Cli.option Cli.auto (Cli.long "max-task-cost" <> Cli.value 0 <> Cli.help "Cost cap per task (cents, 0=unlimited)")
- <*> Cli.option Cli.auto (Cli.long "timeout" <> Cli.value 1800 <> Cli.help "Agent timeout (seconds)")
- <*> Cli.option Cli.auto (Cli.long "max-iter" <> Cli.value 80 <> Cli.help "Agent max iterations")
- <*> Cli.option Cli.auto (Cli.long "max-cost" <> Cli.value 300 <> Cli.help "Agent max cost per run (cents)")
- <*> Cli.optional (Cli.strOption (Cli.long "provider" <> Cli.help "Agent provider"))
- <*> Cli.optional (Cli.strOption (Cli.long "model" <> Cli.help "Agent model (e.g. gpt-5.3-codex)"))
- <*> Cli.optional (Cli.strOption (Cli.long "parent" <> Cli.help "Only process tasks under this epic"))
- <*> Cli.optional (Cli.strOption (Cli.long "task-id" <> Cli.help "Only process this one task"))
- <*> Cli.switch (Cli.long "once" <> Cli.help "Process one cycle then exit")
- <*> Cli.switch (Cli.long "dry-run" <> Cli.help "Print what would happen")
+runParser = doRun </ configParser
+
+configParser :: Cli.Parser Config
+configParser =
+ Config
+ </ Cli.strOption
+ ( Cli.long "root"
+ <> Cli.value "_/pipeline"
+ <> Cli.help "Pipeline state root"
+ )
+ <*> Cli.option
+ Cli.auto
+ ( Cli.long "interval"
+ <> Cli.value 60
+ <> Cli.help "Poll interval in seconds"
+ )
+ <*> Cli.option
+ Cli.auto
+ ( Cli.long "max-active"
+ <> Cli.value 8
+ <> Cli.help "Max active agentd runs tracked by scheduler"
+ )
+ <*> Cli.option
+ Cli.auto
+ ( Cli.long "reserve-cents"
+ <> Cli.value 300
+ <> Cli.help "Required available budget (cents) before spawning"
+ )
+ <*> (T.pack </ Cli.strOption
+ ( Cli.long "toolchain"
+ <> Cli.value "haskell"
+ <> Cli.help "agentd toolchain for generated prompts"
+ ))
+ <*> Cli.strOption
+ ( Cli.long "task-cmd"
+ <> Cli.value "task"
+ <> Cli.help "Task CLI command"
+ )
+ <*> Cli.strOption
+ ( Cli.long "fund-cmd"
+ <> Cli.value "fund"
+ <> Cli.help "Fund CLI command"
+ )
+ <*> Cli.strOption
+ ( Cli.long "agentd-cmd"
+ <> Cli.value "agentd"
+ <> Cli.help "agentd CLI command"
+ )
+ <*> Cli.optional
+ ( T.pack </ Cli.strOption
+ ( Cli.long "provider"
+ <> Cli.help "Optional provider override in generated prompt frontmatter"
+ )
+ )
+ <*> Cli.optional
+ ( T.pack </ Cli.strOption
+ ( Cli.long "model"
+ <> Cli.help "Optional model override in generated prompt frontmatter"
+ )
+ )
+ <*> Cli.optional
+ ( Cli.option
+ Cli.auto
+ ( Cli.long "timeout"
+ <> Cli.help "agentd run timeout (seconds)"
+ )
+ )
+ <*> Cli.optional
+ ( Cli.option
+ Cli.auto
+ ( Cli.long "max-iter"
+ <> Cli.help "agentd run max iterations"
+ )
+ )
+ <*> Cli.optional
+ ( Cli.option
+ Cli.auto
+ ( Cli.long "max-cost"
+ <> Cli.help "agentd run max cost (cents)"
+ )
+ )
+ <*> Cli.switch
+ ( Cli.long "once"
+ <> Cli.help "Run one scheduler cycle and exit"
+ )
+ <*> Cli.switch
+ ( Cli.long "dry-run"
+ <> Cli.help "Print actions without mutating task/fund state"
+ )
statusParser :: Cli.Parser (IO ())
statusParser =
doStatus
- </ Cli.strOption (Cli.long "root" <> Cli.value "_/pipeline" <> Cli.help "Workspace root")
- <*> Cli.switch (Cli.long "json" <> Cli.help "JSON output")
+ </ Cli.strOption
+ ( Cli.long "root"
+ <> Cli.value "_/pipeline"
+ <> Cli.help "Pipeline state root"
+ )
+ <*> Cli.switch
+ ( Cli.long "json"
+ <> Cli.help "JSON output"
+ )
-- ============================================================================
--- Run command
+-- Run loop
-- ============================================================================
-doRun ::
- String ->
- String ->
- Int ->
- Int ->
- Int ->
- Int ->
- Int ->
- Int ->
- Int ->
- Maybe String ->
- Maybe String ->
- Maybe String ->
- Maybe String ->
- Bool ->
- Bool ->
- IO ()
-doRun root base concurrency interval maxRetries maxTaskCost timeout maxIter maxCost provider model parent taskFilter once dryRun = do
- repoRoot <- Env.getEnv "CODEROOT"
- let cfg =
- Core.PipelineConfig
- { Core.pcRoot = root,
- Core.pcRepoRoot = repoRoot,
- Core.pcBaseBranch = T.pack base,
- Core.pcConcurrency = concurrency,
- Core.pcPollInterval = interval,
- Core.pcMaxRetries = maxRetries,
- Core.pcMaxTaskCost = maxTaskCost,
- Core.pcAgentTimeout = timeout,
- Core.pcAgentMaxIter = maxIter,
- Core.pcAgentMaxCost = maxCost,
- Core.pcAgentProvider = T.pack </ provider,
- Core.pcAgentModel = T.pack </ model,
- Core.pcParentFilter = T.pack </ parent,
- Core.pcTaskFilter = T.pack </ taskFilter,
- Core.pcDryRun = dryRun
- }
-
- -- Initialize
- Dir.createDirectoryIfMissing True root
- db <- State.initPipelineDb (root <> "/state.db")
- pool <- Workspace.newPool cfg
- activeDevs <- STM.newTVarIO (Map.empty :: Map.Map Text Core.ActiveDev)
-
- logMsg "Pipeline starting"
- logMsg ("root=" <> T.pack root <> " base=" <> T.pack base <> " concurrency=" <> T.pack (show concurrency))
-
- -- Startup recovery
- recoverStaleState cfg db pool activeDevs
-
- -- Main loop with exception handling
+doRun :: Config -> IO ()
+doRun cfg = do
+ Dir.createDirectoryIfMissing True (cfgRoot cfg)
+ logMsg "Pipeline scheduler starting"
+ logMsg
+ ( "root="
+ <> T.pack (cfgRoot cfg)
+ <> " interval="
+ <> tshow (cfgIntervalSec cfg)
+ <> "s max-active="
+ <> tshow (cfgMaxActive cfg)
+ <> if cfgDryRun cfg then " dry-run=true" else ""
+ )
let loop = do
- logMsg "--- cycle start ---"
- result <- try (runOneCycle cfg db pool activeDevs) :: IO (Either SomeException ())
- case result of
- Left e -> logMsg ("Cycle error (recovering): " <> T.pack (show e))
- Right () -> logMsg "--- cycle end ---"
- if once
- then logMsg "Single cycle complete, exiting"
+ runCycle cfg
+ if cfgOnce cfg
+ then logMsg "Single cycle complete"
else do
- threadDelay (interval * 1_000_000)
+ threadDelay (cfgIntervalSec cfg * 1_000_000)
loop
loop
--- | Run one full pipeline cycle.
-runOneCycle ::
- Core.PipelineConfig ->
- SQL.Connection ->
- Workspace.WorkspacePool ->
- STM.TVar (Map.Map Text Core.ActiveDev) ->
- IO ()
-runOneCycle cfg db pool activeDevs = do
- -- Load all tasks once per cycle
- allTasks <- Task.loadTasks
-
- let workTasks = filter isEligible allTasks
- byStatus s = filter (\t -> Task.taskStatus t == s) workTasks
-
- -- 1. INTEGRATE: process Verified tasks
- let verified = byStatus Task.Verified
- forM_ verified <| \task -> do
- let tid = Task.taskId task
- logMsg ("Integrating: " <> tid)
- if Core.pcDryRun cfg
- then logMsg ("DRY RUN: would integrate " <> tid)
- else doIntegrate cfg db pool task
-
- -- 2. HARVEST + VERIFY: check running dev agents
- active <- STM.readTVarIO activeDevs
- forM_ (Map.toList active) <| \(tid, ad) -> do
- result <- Dev.checkDevStatus ad
- case result of
- Core.StillRunning -> pure ()
- Core.DevSuccess maybeSha -> do
- logMsg ("Dev completed for " <> tid <> " (sha=" <> fromMaybe "?" maybeSha <> ")")
- cost <- Dev.getRunCost (Core.adRunId ad)
- State.markRunFinished db (Core.adRunDbId ad) Core.Success (Just cost) Nothing
- _ <- Task.addComment tid ("Pipeline: dev completed (run=" <> Core.adRunId ad <> ", cost=" <> T.pack (show cost) <> "c)") Task.System
- -- Remove from active
- STM.atomically <| STM.modifyTVar' activeDevs (Map.delete tid)
- -- Immediately verify
- doVerify cfg db pool ad
- Core.DevNeedsInput reason -> do
- logMsg ("Dev needs input for " <> tid <> ": " <> reason)
- cost <- Dev.getRunCost (Core.adRunId ad)
- State.markRunFinished db (Core.adRunDbId ad) Core.Success (Just cost) (Just reason)
- _ <- Task.addComment tid ("Pipeline: agent needs input (run=" <> Core.adRunId ad <> ")") Task.System
- -- Remove from active, release workspace, set task to NeedsHelp
- STM.atomically <| STM.modifyTVar' activeDevs (Map.delete tid)
- Workspace.releaseWorkspace pool tid
- Task.updateTaskStatus tid Task.NeedsHelp []
- Core.DevFailed err -> do
- logMsg ("Dev failed for " <> tid <> ": " <> err)
- cost <- Dev.getRunCost (Core.adRunId ad)
- State.markRunFinished db (Core.adRunDbId ad) Core.Failure (Just cost) (Just err)
- _ <- Task.addComment tid ("Pipeline: dev failed (run=" <> Core.adRunId ad <> "): " <> err) Task.System
- -- Remove from active, release workspace, reset task to Open
- STM.atomically <| STM.modifyTVar' activeDevs (Map.delete tid)
- Workspace.releaseWorkspace pool tid
- Task.updateTaskStatus tid Task.Open []
-
- -- 3. RE-SPAWN: check NeedsHelp tasks for new human comments
- let needsHelp = byStatus Task.NeedsHelp
- forM_ needsHelp <| \task -> do
- let tid = Task.taskId task
- lastRun <- State.getLastDevRunTime db tid
- let hasNewHumanComment = case lastRun of
- Nothing -> False
- Just runTime ->
- any
- ( \c ->
- Task.commentAuthor c
- == Task.Human
- && Task.commentCreatedAt c
- > runTime
- )
- (Task.taskComments task)
- when hasNewHumanComment <| do
- logMsg ("Human responded to " <> tid <> ", re-spawning dev")
- Task.updateTaskStatus tid Task.Open []
- _ <- Task.addComment tid "Pipeline: human responded, re-opening for dev" Task.System
- pure ()
-
- -- Reload tasks after NeedsHelp transitions
- allTasks' <- if null needsHelp then pure allTasks else Task.loadTasks
- let workTasks' = filter isEligible allTasks'
- byStatus' s = filter (\t -> Task.taskStatus t == s) workTasks'
-
- -- 4. DEVELOP: spawn agents for Open tasks
- let open = byStatus' Task.Open
- slots <- Workspace.slotsAvailable pool
- currentActive <- STM.readTVarIO activeDevs
- let availableSlots = slots - Map.size currentActive
- forM_ (take availableSlots open) <| \task -> do
- let tid = Task.taskId task
- -- Check retry/cost limits
- canRetry <- checkRetryLimits cfg db task
- when canRetry <| do
- if Core.pcDryRun cfg
- then logMsg ("DRY RUN: would start dev for " <> tid)
- else doSpawnDev cfg db pool activeDevs task
- where
- isEligible task =
- Task.taskType task
- == Task.WorkTask
- && isJust (Task.taskNamespace task) -- must have namespace for verification
- && matchesFilter (Core.pcTaskFilter cfg) (Task.taskId task)
- && matchesParent (Core.pcParentFilter cfg) (Task.taskParent task)
-
- matchesFilter Nothing _ = True
- matchesFilter (Just f) tid = f == tid
-
- matchesParent Nothing _ = True
- matchesParent (Just f) parent = parent == Just f
-
--- | Integrate a verified task.
-doIntegrate ::
- Core.PipelineConfig ->
- SQL.Connection ->
- Workspace.WorkspacePool ->
- Task.Task ->
- IO ()
-doIntegrate cfg db pool task = do
- let tid = Task.taskId task
- baseBranch = Core.pcBaseBranch cfg
- repoRoot = Core.pcRepoRoot cfg
- now <- getCurrentTime
+runCycle :: Config -> IO ()
+runCycle cfg = do
+ logMsg "--- cycle start ---"
+
+ state0 <- loadState cfg
+ activeAfterHarvest <- harvestCompletedRuns cfg (ssActiveRuns state0)
+
+ ready <- fetchReadyTasks cfg
+ let domainTasks = mapMaybe readyToDomainTask <| filter readyTaskIsOpen ready
+ slots = max 0 (cfgMaxActive cfg - length activeAfterHarvest)
+ candidates = chooseSpawnCandidates slots activeAfterHarvest domainTasks
+
+ logMsg
+ ( "ready="
+ <> tshow (length ready)
+ <> " open-with-namespace="
+ <> tshow (length domainTasks)
+ <> " active="
+ <> tshow (length activeAfterHarvest)
+ <> " spawn-candidates="
+ <> tshow (length candidates)
+ )
- -- Record the integration attempt
- rowId <-
- State.recordRun
- db
- Core.RunRecord
- { Core.rrId = Nothing,
- Core.rrTaskId = tid,
- Core.rrPhase = Core.Integrate,
- Core.rrPatchset = Task.taskPatchsetCount task,
- Core.rrAgentdRunId = Nothing,
- Core.rrStatus = Core.Running,
- Core.rrCostCents = 0,
- Core.rrStartedAt = now,
- Core.rrFinishedAt = Nothing,
- Core.rrError = Nothing
- }
-
- result <- Integrate.integrateTask pool repoRoot tid baseBranch
- case result of
- Core.Integrated commitHash -> do
- State.markRunFinished db rowId Core.Success Nothing Nothing
- Task.updateTaskStatus tid Task.Done []
- _ <- Task.addComment tid ("Pipeline: integrated into " <> baseBranch <> " at " <> commitHash) Task.System
- Workspace.releaseWorkspace pool tid
- logMsg ("Integrated " <> tid <> " at " <> commitHash)
- Core.Conflict details -> do
- State.markRunFinished db rowId Core.Failure Nothing (Just details)
- Task.updateTaskStatus tid Task.Open []
- _ <- Task.addComment tid ("Pipeline: integration conflict, resetting for re-dev: " <> details) Task.System
- Workspace.releaseWorkspace pool tid
- logMsg ("Integration conflict for " <> tid <> ": " <> details)
- Core.IntegrateBuildFail details -> do
- State.markRunFinished db rowId Core.Failure Nothing (Just details)
- Task.updateTaskStatus tid Task.Open []
- _ <- Task.addComment tid ("Pipeline: post-integration build failed, resetting: " <> details) Task.System
- Workspace.releaseWorkspace pool tid
- logMsg ("Integration build failed for " <> tid <> ": " <> details)
-
--- | Verify a completed dev run.
-doVerify ::
- Core.PipelineConfig ->
- SQL.Connection ->
- Workspace.WorkspacePool ->
- Core.ActiveDev ->
- IO ()
-doVerify cfg db pool ad = do
- let tid = Core.adTaskId ad
- baseBranch = Core.pcBaseBranch cfg
- workspace = Core.adWorkspace ad
- now <- getCurrentTime
+ spawned <- catMaybes </ traverse (spawnForTask cfg) candidates
- rowId <-
- State.recordRun
- db
- Core.RunRecord
- { Core.rrId = Nothing,
- Core.rrTaskId = tid,
- Core.rrPhase = Core.Verify,
- Core.rrPatchset = Core.adPatchset ad,
- Core.rrAgentdRunId = Nothing,
- Core.rrStatus = Core.Running,
- Core.rrCostCents = 0,
- Core.rrStartedAt = now,
- Core.rrFinishedAt = Nothing,
- Core.rrError = Nothing
- }
-
- result <- Verify.verifyTask workspace tid baseBranch
- case result of
- Core.VerifyPass -> do
- State.markRunFinished db rowId Core.Success Nothing Nothing
- Task.updateTaskStatus tid Task.Verified []
- _ <- Task.addComment tid "Pipeline: build verification passed" Task.System
- logMsg ("Verified " <> tid)
- Core.VerifySkip reason -> do
- State.markRunFinished db rowId Core.Success Nothing (Just reason)
- Task.updateTaskStatus tid Task.Verified []
- _ <- Task.addComment tid ("Pipeline: verification skipped (" <> reason <> "), promoting") Task.System
- logMsg ("Verified (skipped) " <> tid <> ": " <> reason)
- Core.VerifyFail reason -> do
- State.markRunFinished db rowId Core.Failure Nothing (Just reason)
- Task.updateTaskStatus tid Task.Open []
- _ <- Task.addComment tid ("Pipeline: verification failed:\n" <> reason) Task.System
- Workspace.releaseWorkspace pool tid
- logMsg ("Verify failed for " <> tid <> ": " <> T.take 200 reason)
-
--- | Spawn a dev agent for a task.
-doSpawnDev ::
- Core.PipelineConfig ->
- SQL.Connection ->
- Workspace.WorkspacePool ->
- STM.TVar (Map.Map Text Core.ActiveDev) ->
- Task.Task ->
- IO ()
-doSpawnDev cfg db pool activeDevs task = do
- let tid = Task.taskId task
- patchset = Task.taskPatchsetCount task
- now <- getCurrentTime
+ let state1 = SchedulerState {ssActiveRuns = activeAfterHarvest <> spawned}
+ saveState cfg state1
- -- Claim the task
- Task.updateTaskStatus tid Task.InProgress []
-
- -- Get or create workspace
- wsResult <- Workspace.acquireWorkspace pool tid
- case wsResult of
- Left e -> do
- logMsg ("Failed to acquire workspace for " <> tid <> ": " <> e)
- Task.updateTaskStatus tid Task.Open []
- pure ()
- Right workspace -> do
- -- Get review feedback from last verify failure
- reviewFeedback <- State.getLastVerifyFailure db tid
-
- -- Record the run
- rowId <-
- State.recordRun
- db
- Core.RunRecord
- { Core.rrId = Nothing,
- Core.rrTaskId = tid,
- Core.rrPhase = Core.Dev,
- Core.rrPatchset = patchset,
- Core.rrAgentdRunId = Nothing,
- Core.rrStatus = Core.Running,
- Core.rrCostCents = 0,
- Core.rrStartedAt = now,
- Core.rrFinishedAt = Nothing,
- Core.rrError = Nothing
- }
-
- -- Spawn the agent
- result <- Dev.spawnDev cfg workspace task patchset reviewFeedback rowId
- case result of
- Left e -> do
- State.markRunFinished db rowId Core.Failure Nothing (Just e)
- Task.updateTaskStatus tid Task.Open []
- _ <- Task.addComment tid ("Pipeline: failed to spawn dev agent: " <> e) Task.System
- logMsg ("Failed to spawn dev for " <> tid <> ": " <> e)
- Right ad -> do
- -- Update the run record with the agentd run ID
- -- (we didn't have it when we created the record)
- STM.atomically <| STM.modifyTVar' activeDevs (Map.insert tid ad)
- logMsg ("Spawned dev for " <> tid <> " (run=" <> Core.adRunId ad <> ")")
-
--- | Check if a task can be retried (within retry and cost limits).
-checkRetryLimits ::
- Core.PipelineConfig ->
- SQL.Connection ->
- Task.Task ->
- IO Bool
-checkRetryLimits cfg db task = do
- let tid = Task.taskId task
- patchset = Task.taskPatchsetCount task
- maxRetries = Core.pcMaxRetries cfg
- maxCost = Core.pcMaxTaskCost cfg
-
- -- Check retry count
- retryCount <- State.getRetryCount db tid Core.Dev patchset
- when (maxRetries > 0 && retryCount >= maxRetries) <| do
- logMsg ("Task " <> tid <> " exceeded max retries (" <> T.pack (show retryCount) <> "/" <> T.pack (show maxRetries) <> ")")
- Task.updateTaskStatus tid Task.NeedsHelp []
- _ <- Task.addComment tid ("Pipeline: exceeded " <> T.pack (show maxRetries) <> " retries on patchset " <> T.pack (show patchset)) Task.System
+ logMsg
+ ( "cycle summary: active="
+ <> tshow (length (ssActiveRuns state1))
+ <> " spawned="
+ <> tshow (length spawned)
+ )
+ logMsg "--- cycle end ---"
+
+-- ============================================================================
+-- Harvest
+-- ============================================================================
+
+harvestCompletedRuns :: Config -> [ActiveRun] -> IO [ActiveRun]
+harvestCompletedRuns cfg runs = do
+ kept <-
+ forM runs <| \ar -> do
+ status <- getAgentStatus cfg (arRunId ar)
+ if normalizeLabel (arsStatus status) == "running"
+ then pure <| Just ar
+ else do
+ finalizeRun cfg ar status
+ pure Nothing
+ pure (catMaybes kept)
+
+getAgentStatus :: Config -> Text -> IO AgentRunStatus
+getAgentStatus cfg runId = do
+ let cmd = cfgAgentdCmd cfg
+ args = ["status", T.unpack runId, "--json"]
+ res <- execCommand cmd args
+ case decodeJsonText (crStdout res) of
+ Right status -> pure status
+ Left _ ->
+ if crExitCode res == Exit.ExitSuccess
+ then pure <| AgentRunStatus "done" 0 Nothing
+ else
+ pure
+ <| AgentRunStatus
+ { arsStatus = "failed",
+ arsCostCents = 0,
+ arsError =
+ Just
+ ( T.strip
+ ( if T.null (crStderr res)
+ then crStdout res
+ else crStderr res
+ )
+ )
+ }
+
+finalizeRun :: Config -> ActiveRun -> AgentRunStatus -> IO ()
+finalizeRun cfg ar status = do
+ let tid = arTaskId ar
+ runId = arRunId ar
+ domain = arDomain ar
+ cost = arsCostCents status
+ costRounded = round cost :: Int
+ baseMsg =
+ "Pipeline scheduler: run="
+ <> runId
+ <> " domain="
+ <> domain
+ <> " status="
+ <> arsStatus status
+ <> " cost="
+ <> tshow costRounded
+ <> "c"
+ msg =
+ case arsError status of
+ Nothing -> baseMsg
+ Just e -> baseMsg <> " error=" <> T.take 400 (singleLine e)
+
+ logMsg <| "Finalizing " <> tid <> " from run " <> runId <> " (" <> arsStatus status <> ")"
+
+ spendOk <- recordDomainSpend cfg domain tid runId costRounded
+
+ unless (cfgDryRun cfg) <| do
+ _ <- runTask cfg ["update", T.unpack tid, "review", "--json"]
+ let withSpendNote =
+ if spendOk
+ then msg
+ else msg <> " (fund-spend=failed)"
+ _ <- runTask cfg ["comment", T.unpack tid, T.unpack withSpendNote, "--json"]
pure ()
- if maxRetries > 0 && retryCount >= maxRetries
- then pure False
- else do
- -- Check cost cap
- if maxCost > 0
+-- ============================================================================
+-- Spawn
+-- ============================================================================
+
+spawnForTask :: Config -> DomainTask -> IO (Maybe ActiveRun)
+spawnForTask cfg dt = do
+ let tid = dtTaskId dt
+ domain = dtDomain dt
+
+ budgetOk <- checkDomainBudget cfg domain
+ if not budgetOk
+ then do
+ logMsg <| "Skipping " <> tid <> " (no budget for domain " <> domain <> ")"
+ pure Nothing
+ else
+ if cfgDryRun cfg
then do
- cost <- State.getCumulativeCost db tid
- if cost >= fromIntegral maxCost
+ logMsg <| "DRY RUN: would spawn task " <> tid <> " in domain " <> domain
+ pure Nothing
+ else do
+ claimed <- claimTaskOpen cfg tid
+ if not claimed
then do
- logMsg ("Task " <> tid <> " exceeded cost cap (" <> T.pack (show cost) <> "c)")
- Task.updateTaskStatus tid Task.NeedsHelp []
- _ <- Task.addComment tid ("Pipeline: exceeded cost cap of " <> T.pack (show maxCost) <> "c") Task.System
- pure False
- else checkBackoff cfg db tid patchset retryCount
- else checkBackoff cfg db tid patchset retryCount
-
--- | Check exponential backoff.
-checkBackoff ::
- Core.PipelineConfig ->
- SQL.Connection ->
- Text ->
- Int ->
- Int ->
- IO Bool
-checkBackoff cfg db tid patchset retryCount = do
- if retryCount == 0
- then pure True
- else do
- lastFail <- State.getLastFailureTime db tid patchset
- case lastFail of
- Nothing -> pure True
- Just failTime -> do
- now <- getCurrentTime
- let baseInterval = fromIntegral (Core.pcPollInterval cfg) :: Double
- delay = min 600 (baseInterval * (2 ^ retryCount))
- elapsed = realToFrac (diffUTCTime now failTime) :: Double
- if elapsed >= delay
- then pure True
+ logMsg <| "Task not claimable as open (skip): " <> tid
+ pure Nothing
else do
- logMsg ("Task " <> tid <> " in backoff (" <> T.pack (show (round (delay - elapsed) :: Int)) <> "s remaining)")
- pure False
-
--- | Recover stale state on startup.
-recoverStaleState ::
- Core.PipelineConfig ->
- SQL.Connection ->
- Workspace.WorkspacePool ->
- STM.TVar (Map.Map Text Core.ActiveDev) ->
- IO ()
-recoverStaleState cfg db pool _activeDevs = do
- -- 1. Check DB for "running" records where agent is dead
- activeRuns <- State.getActiveDevRuns db
- forM_ activeRuns <| \(rowId, tid, _runId, _startedAt) -> do
- let workspace = Core.pcRoot cfg <> "/" <> T.unpack tid
-
- -- Find the actual PID file for this task
- pids <- findPidFiles workspace tid
- alive <- case pids of
- [] -> pure False
- (pidPath : _) -> do
- pidText <- T.strip </ TIO.readFile pidPath
- Dev.isProcessAlive pidText
-
- unless alive <| do
- logMsg ("Recovery: run " <> T.pack (show rowId) <> " for " <> tid <> " has dead agent")
- -- Check if agent left a commit
- afterSha <- Git.branchSha workspace tid
- -- Get the before-SHA from the base branch for comparison
- baseSha <- Git.branchSha workspace (Core.pcBaseBranch cfg)
- if afterSha /= baseSha && isJust afterSha
- then do
- logMsg ("Recovery: " <> tid <> " has commit " <> fromMaybe "?" afterSha <> ", proceeding to verify")
- State.markRunFinished db rowId Core.Success Nothing Nothing
- _ <- Task.addComment tid "Pipeline: recovered completed dev run on startup" Task.System
- -- Create ActiveDev so verify can run
- now <- getCurrentTime
- let ad =
- Core.ActiveDev
- { Core.adTaskId = tid,
- Core.adRunId = "recovered-" <> tid,
- Core.adWorkspace = workspace,
- Core.adStartedAt = now,
- Core.adBeforeSha = baseSha,
- Core.adPatchset = 0,
- Core.adNamespace = Nothing, -- will be looked up
- Core.adRunDbId = rowId
- }
- -- Look up namespace from task
- tasks <- Task.loadTasks
- let ns = Task.taskNamespace =<< Task.findTask tid tasks
- let ad' = ad {Core.adNamespace = ns}
- doVerify cfg db pool ad'
- else do
- logMsg ("Recovery: " <> tid <> " has no new commit, resetting to Open")
- State.markRunFinished db rowId Core.Failure Nothing (Just "Agent died without committing")
- Task.updateTaskStatus tid Task.Open []
- _ <- Task.addComment tid "Pipeline: agent died without producing a commit, resetting" Task.System
- pure ()
-
- -- 2. Find worktrees with no DB record at all
- staleTasks <- Workspace.findStaleWorktrees pool
- forM_ staleTasks <| \tid -> do
- hasRun <- State.hasActiveRun db tid
- unless hasRun <| do
- logMsg ("Recovering orphaned worktree for " <> tid)
- tasks <- Task.loadTasks
- case Task.findTask tid tasks of
- Nothing -> Workspace.releaseWorkspace pool tid
- Just task -> do
- when (Task.taskStatus task == Task.InProgress) <| do
- Task.updateTaskStatus tid Task.Open []
- _ <- Task.addComment tid "Pipeline: recovered stale InProgress task on startup" Task.System
- pure ()
-
--- | Find PID files for a task in its workspace.
-findPidFiles :: FilePath -> Text -> IO [FilePath]
-findPidFiles workspace tid = do
- let dir = workspace <> "/_/tmp/pipeline"
- exists <- Dir.doesDirectoryExist dir
+ runId <- mkRunId domain tid
+ promptPath <- writePrompt cfg dt runId
+ started <- startAgentRun cfg promptPath runId
+ if started
+ then do
+ _ <- runTask cfg ["comment", T.unpack tid, T.unpack ("Pipeline scheduler: started run=" <> runId <> " domain=" <> domain), "--json"]
+ now <- getCurrentTime
+ pure
+ <| Just
+ ActiveRun
+ { arTaskId = tid,
+ arNamespace = dtNamespace dt,
+ arDomain = domain,
+ arRunId = runId,
+ arStartedAt = now
+ }
+ else do
+ logMsg <| "Spawn failed for " <> tid <> ", resetting to open"
+ _ <- runTask cfg ["update", T.unpack tid, "open", "--json"]
+ _ <- runTask cfg ["comment", T.unpack tid, "Pipeline scheduler: failed to spawn agentd run", "--json"]
+ pure Nothing
+
+claimTaskOpen :: Config -> Text -> IO Bool
+claimTaskOpen cfg tid = do
+ res <- runTask cfg ["claim", T.unpack tid, "open", "in-progress", "--json"]
+ pure (crExitCode res == Exit.ExitSuccess)
+
+startAgentRun :: Config -> FilePath -> Text -> IO Bool
+startAgentRun cfg promptPath runId = do
+ let baseArgs = ["run", promptPath, "-n", T.unpack runId]
+ timeoutArgs = maybe [] (\n -> ["--timeout", show n]) (cfgTimeoutSec cfg)
+ maxIterArgs = maybe [] (\n -> ["--max-iter", show n]) (cfgMaxIter cfg)
+ maxCostArgs = maybe [] (\n -> ["--max-cost", show n]) (cfgMaxCostCents cfg)
+ args = baseArgs <> timeoutArgs <> maxIterArgs <> maxCostArgs
+ res <- execCommand (cfgAgentdCmd cfg) args
+ case crExitCode res of
+ Exit.ExitSuccess -> do
+ logMsg <| "Spawned agentd run " <> runId
+ pure True
+ _ -> do
+ logMsg <| "agentd run failed for " <> runId <> ": " <> T.take 400 (singleLine (crStderr res <> " " <> crStdout res))
+ pure False
+
+-- ============================================================================
+-- Budget
+-- ============================================================================
+
+checkDomainBudget :: Config -> Text -> IO Bool
+checkDomainBudget cfg domain
+ | cfgDryRun cfg = pure True
+ | otherwise = do
+ let reserve = cfgReservationCents cfg
+ d = T.unpack domain
+ r = show reserve
+ attempts =
+ [ ["check", "--domain", d, "--reserve-cents", r, "--json"],
+ ["budget", "--domain", d, "--json"],
+ ["available", "--domain", d, "--json"],
+ ["can-spend", "--domain", d, "--cents", r, "--json"]
+ ]
+ tryBudgetAttempts (cfgFundCmd cfg) reserve attempts
+
+tryBudgetAttempts :: String -> Int -> [[String]] -> IO Bool
+tryBudgetAttempts _ _ [] = pure False
+tryBudgetAttempts cmd reserve (args : rest) = do
+ res <- execCommand cmd args
+ let merged = pickNonEmpty (crStdout res) (crStderr res)
+ case parseBudgetAllowed reserve merged of
+ Just ok -> pure ok
+ Nothing ->
+ if crExitCode res == Exit.ExitSuccess
+ then pure True
+ else tryBudgetAttempts cmd reserve rest
+
+parseBudgetAllowed :: Int -> Text -> Maybe Bool
+parseBudgetAllowed reserve raw =
+ let stripped = T.strip raw
+ reserveD = fromIntegral reserve :: Double
+ in case decodeJsonText stripped :: Either Text Aeson.Value of
+ Right v ->
+ findBudgetBool v
+ <|> ((>= reserveD) </ findBudgetNumber v)
+ Left _ ->
+ parseLooseBool stripped
+ <|> ((>= reserveD) </ (Read.readMaybe (T.unpack stripped) :: Maybe Double))
+ where
+ findBudgetBool v =
+ findBoolField
+ [ "available",
+ "allowed",
+ "allow",
+ "ok",
+ "has_budget",
+ "can_spend"
+ ]
+ v
+
+ findBudgetNumber v =
+ findNumberField
+ [ "remaining_cents",
+ "available_cents",
+ "budget_remaining_cents",
+ "remaining",
+ "balance_cents"
+ ]
+ v
+
+recordDomainSpend :: Config -> Text -> Text -> Text -> Int -> IO Bool
+recordDomainSpend cfg domain tid runId costCents
+ | cfgDryRun cfg = do
+ logMsg
+ ( "DRY RUN: would record fund spend domain="
+ <> domain
+ <> " task="
+ <> tid
+ <> " run="
+ <> runId
+ <> " cents="
+ <> tshow costCents
+ )
+ pure True
+ | otherwise = do
+ let d = T.unpack domain
+ t = T.unpack tid
+ r = T.unpack runId
+ c = show costCents
+ attempts =
+ [ ["spend", "--domain", d, "--task-id", t, "--run-id", r, "--cents", c, "--json"],
+ ["debit", "--domain", d, "--cents", c, "--json"],
+ ["record", "--domain", d, "--cost-cents", c, "--task-id", t, "--json"]
+ ]
+ runFirstSuccessful (cfgFundCmd cfg) attempts
+
+runFirstSuccessful :: String -> [[String]] -> IO Bool
+runFirstSuccessful _ [] = pure False
+runFirstSuccessful cmd (args : rest) = do
+ res <- execCommand cmd args
+ if crExitCode res == Exit.ExitSuccess
+ then pure True
+ else runFirstSuccessful cmd rest
+
+-- ============================================================================
+-- Task scan / selection
+-- ============================================================================
+
+fetchReadyTasks :: Config -> IO [ReadyTask]
+fetchReadyTasks cfg = do
+ res <- runTask cfg ["ready", "--json"]
+ if crExitCode res /= Exit.ExitSuccess
+ then do
+ logMsg <| "task ready failed: " <> T.take 300 (singleLine (crStderr res <> " " <> crStdout res))
+ pure []
+ else
+ case decodeJsonText (crStdout res) of
+ Right tasks -> pure tasks
+ Left err -> do
+ logMsg <| "Could not decode task ready JSON: " <> err
+ pure []
+
+readyTaskIsOpen :: ReadyTask -> Bool
+readyTaskIsOpen t =
+ case rtStatus t of
+ Nothing -> True
+ Just st -> normalizeLabel st == "open"
+
+readyToDomainTask :: ReadyTask -> Maybe DomainTask
+readyToDomainTask rt =
+ case rtNamespace rt of
+ Nothing -> Nothing
+ Just ns ->
+ Just
+ DomainTask
+ { dtTaskId = rtTaskId rt,
+ dtNamespace = ns,
+ dtDomain = domainFromNamespace ns,
+ dtTitle = rtTitle rt
+ }
+
+-- | Current domain strategy: budget buckets are namespace-aligned.
+domainFromNamespace :: Text -> Text
+domainFromNamespace = T.strip
+
+chooseSpawnCandidates :: Int -> [ActiveRun] -> [DomainTask] -> [DomainTask]
+chooseSpawnCandidates maxCount active candidates = go maxCount Set.empty [] candidates
+ where
+ activeDomains = Set.fromList (map arDomain active)
+
+ go n _ acc _ | n <= 0 = reverse acc
+ go _ _ acc [] = reverse acc
+ go n seen acc (x : xs)
+ | dtDomain x `Set.member` activeDomains = go n seen acc xs
+ | dtDomain x `Set.member` seen = go n seen acc xs
+ | otherwise = go (n - 1) (Set.insert (dtDomain x) seen) (x : acc) xs
+
+-- ============================================================================
+-- Prompt generation
+-- ============================================================================
+
+writePrompt :: Config -> DomainTask -> Text -> IO FilePath
+writePrompt cfg dt runId = do
+ taskJson <- fetchTaskJson cfg (dtTaskId dt)
+ let dir = cfgRoot cfg FP.</> "prompts"
+ path = dir FP.</> (T.unpack runId <> ".md")
+ content = buildPrompt cfg dt taskJson
+ Dir.createDirectoryIfMissing True dir
+ TIO.writeFile path content
+ pure path
+
+fetchTaskJson :: Config -> Text -> IO (Maybe Aeson.Value)
+fetchTaskJson cfg tid = do
+ res <- runTask cfg ["show", T.unpack tid, "--json"]
+ if crExitCode res /= Exit.ExitSuccess
+ then pure Nothing
+ else case decodeJsonText (crStdout res) of
+ Right v -> pure (Just v)
+ Left _ -> pure Nothing
+
+buildPrompt :: Config -> DomainTask -> Maybe Aeson.Value -> Text
+buildPrompt cfg dt maybeTaskJson =
+ T.unlines
+ <| frontmatter
+ <> [ "",
+ "# Pipeline Scheduled Task",
+ "",
+ "You were scheduled by Omni/Pipeline for domain-scoped autonomous work.",
+ "",
+ "## Scheduling Context",
+ "- Task ID: " <> dtTaskId dt,
+ "- Domain: " <> dtDomain dt,
+ "- Namespace: " <> dtNamespace dt,
+ "- Run ID: " <> runIdHint,
+ "",
+ "## Required Workflow",
+ "1. Read AGENTS.md and follow project conventions.",
+ "2. Implement the task in this repository.",
+ "3. Run relevant build/test verification for the namespace.",
+ "4. Commit with trailer `Task-Id: " <> dtTaskId dt <> "`.",
+ "5. Do NOT change task status. The scheduler handles status transitions.",
+ "",
+ "## Task Record (JSON)",
+ fromMaybe "(task show JSON unavailable)" maybeJson,
+ ""
+ ]
+ where
+ runIdHint = "(provided by agentd runtime)"
+
+ maybeJson =
+ (\v -> TE.decodeUtf8 (BL.toStrict (Aeson.encode v))) </ maybeTaskJson
+
+ frontmatter =
+ [ "---",
+ "toolchain: " <> cfgToolchain cfg,
+ "workspace: ."
+ ]
+ <> maybe [] (\p -> ["provider: " <> p]) (cfgProvider cfg)
+ <> maybe [] (\m -> ["model: " <> m]) (cfgModel cfg)
+ <> maybe [] (\n -> ["max_iterations: " <> tshow n]) (cfgMaxIter cfg)
+ <> maybe [] (\n -> ["max_cost_cents: " <> tshow n]) (cfgMaxCostCents cfg)
+ <> ["---"]
+
+-- ============================================================================
+-- State persistence
+-- ============================================================================
+
+statePath :: Config -> FilePath
+statePath cfg = cfgRoot cfg FP.</> "scheduler-state.json"
+
+loadState :: Config -> IO SchedulerState
+loadState cfg = do
+ let path = statePath cfg
+ exists <- Dir.doesFileExist path
if not exists
- then pure []
+ then pure (SchedulerState [])
else do
- files <- Dir.listDirectory dir
- let prefix = "dev-" <> T.unpack tid <> "-"
- pure
- <| map (\f -> dir <> "/" <> f)
- <| filter (\f -> prefix `isPrefixOf` f && ".pid" `isSuffixOf` f)
- <| files
+ content <- BL.readFile path
+ case Aeson.eitherDecode content of
+ Left err -> do
+ logMsg <| "State decode failed, starting empty: " <> T.pack err
+ pure (SchedulerState [])
+ Right st -> pure st
+
+saveState :: Config -> SchedulerState -> IO ()
+saveState cfg st =
+ BL.writeFile (statePath cfg) (Aeson.encode st)
-- ============================================================================
-- Status command
-- ============================================================================
-doStatus :: String -> Bool -> IO ()
+doStatus :: FilePath -> Bool -> IO ()
doStatus root asJson = do
- let dbPath = root <> "/state.db"
- exists <- Dir.doesFileExist dbPath
- unless exists <| do
- TIO.putStrLn "Pipeline state DB not found. Has the pipeline been started?"
- pure ()
- when exists <| do
- db <- State.initPipelineDb dbPath
- allTasks <- Task.loadTasks
-
- let workTasks = filter (\t -> Task.taskType t == Task.WorkTask) allTasks
- countByStatus s = length (filter (\t -> Task.taskStatus t == s) workTasks)
-
- activeRuns <- State.getActiveDevRuns db
-
- if asJson
- then do
- -- Simple JSON output
- TIO.putStrLn "{"
- TIO.putStrLn (" \"open\": " <> T.pack (show (countByStatus Task.Open)) <> ",")
- TIO.putStrLn (" \"in_progress\": " <> T.pack (show (countByStatus Task.InProgress)) <> ",")
- TIO.putStrLn (" \"verified\": " <> T.pack (show (countByStatus Task.Verified)) <> ",")
- TIO.putStrLn (" \"done\": " <> T.pack (show (countByStatus Task.Done)) <> ",")
- TIO.putStrLn (" \"needs_help\": " <> T.pack (show (countByStatus Task.NeedsHelp)) <> ",")
- TIO.putStrLn (" \"active_dev_runs\": " <> T.pack (show (length activeRuns)))
- TIO.putStrLn "}"
- else do
- TIO.putStrLn ("Pipeline status (root=" <> T.pack root <> ")")
- TIO.putStrLn ""
- TIO.putStrLn "Tasks:"
- TIO.putStrLn (" Open: " <> T.pack (show (countByStatus Task.Open)))
- TIO.putStrLn (" InProgress: " <> T.pack (show (countByStatus Task.InProgress)))
- TIO.putStrLn (" Verified: " <> T.pack (show (countByStatus Task.Verified)))
- TIO.putStrLn (" Done: " <> T.pack (show (countByStatus Task.Done)))
- TIO.putStrLn (" NeedsHelp: " <> T.pack (show (countByStatus Task.NeedsHelp)))
- TIO.putStrLn ""
- TIO.putStrLn ("Active dev runs: " <> T.pack (show (length activeRuns)))
+ let cfg =
+ Config
+ { cfgRoot = root,
+ cfgIntervalSec = 60,
+ cfgMaxActive = 0,
+ cfgReservationCents = 0,
+ cfgToolchain = "haskell",
+ cfgTaskCmd = "task",
+ cfgFundCmd = "fund",
+ cfgAgentdCmd = "agentd",
+ cfgProvider = Nothing,
+ cfgModel = Nothing,
+ cfgTimeoutSec = Nothing,
+ cfgMaxIter = Nothing,
+ cfgMaxCostCents = Nothing,
+ cfgOnce = True,
+ cfgDryRun = False
+ }
+ st <- loadState cfg
+ if asJson
+ then do
+ BL.putStr <| Aeson.encode <| Aeson.object ["active_runs" Aeson..= ssActiveRuns st, "count" Aeson..= length (ssActiveRuns st)]
+ TIO.putStrLn ""
+ else do
+ TIO.putStrLn <| "Pipeline scheduler status (root=" <> T.pack root <> ")"
+ TIO.putStrLn <| "Active runs: " <> tshow (length (ssActiveRuns st))
+ forM_ (ssActiveRuns st) <| \r ->
+ TIO.putStrLn
+ ( " - "
+ <> arTaskId r
+ <> " run="
+ <> arRunId r
+ <> " domain="
+ <> arDomain r
+ )
-- ============================================================================
--- Helpers
+-- Command helpers
-- ============================================================================
+runTask :: Config -> [String] -> IO CommandResult
+runTask cfg = execCommand (cfgTaskCmd cfg)
+
+execCommand :: String -> [String] -> IO CommandResult
+execCommand cmd args = do
+ result <- try (Process.readProcessWithExitCode cmd args "") :: IO (Either SomeException (Exit.ExitCode, String, String))
+ case result of
+ Left e ->
+ pure
+ CommandResult
+ { crExitCode = Exit.ExitFailure 127,
+ crStdout = "",
+ crStderr = T.pack (show e)
+ }
+ Right (code, out, err) ->
+ pure
+ CommandResult
+ { crExitCode = code,
+ crStdout = T.pack out,
+ crStderr = T.pack err
+ }
+
+mkRunId :: Text -> Text -> IO Text
+mkRunId domain tid = do
+ now <- getCurrentTime
+ let epoch = floor (utcTimeToPOSIXSeconds now) :: Integer
+ pure
+ <| "pipeline-"
+ <> slug domain
+ <> "-"
+ <> slug tid
+ <> "-"
+ <> T.pack (show epoch)
+
+slug :: Text -> Text
+slug =
+ T.dropAround (== '-')
+ <. T.map
+ ( \c ->
+ if Char.isAlphaNum c
+ then Char.toLower c
+ else '-'
+ )
+
+normalizeLabel :: Text -> Text
+normalizeLabel = T.toLower <. T.filter Char.isAlphaNum
+
+singleLine :: Text -> Text
+singleLine = T.replace "\n" " " <. T.replace "\r" " "
+
+pickNonEmpty :: Text -> Text -> Text
+pickNonEmpty a b =
+ if T.null (T.strip a)
+ then b
+ else a
+
+decodeJsonText :: (Aeson.FromJSON a) => Text -> Either Text a
+decodeJsonText txt =
+ first T.pack (Aeson.eitherDecode (BL.fromStrict (TE.encodeUtf8 (T.strip txt))))
+
+parseLooseBool :: Text -> Maybe Bool
+parseLooseBool raw =
+ let t = normalizeLabel raw
+ in if t `elem` ["true", "yes", "ok", "allow", "allowed", "1"]
+ then Just True
+ else
+ if t `elem` ["false", "no", "deny", "denied", "0"]
+ then Just False
+ else Nothing
+
+findBoolField :: [Text] -> Aeson.Value -> Maybe Bool
+findBoolField keys v =
+ valueToBool =<< findFieldValue keys v
+
+findNumberField :: [Text] -> Aeson.Value -> Maybe Double
+findNumberField keys v =
+ valueToNumber =<< findFieldValue keys v
+
+findFieldValue :: [Text] -> Aeson.Value -> Maybe Aeson.Value
+findFieldValue keys = \case
+ Aeson.Object obj ->
+ let direct =
+ listToMaybe
+ [ val
+ | k <- keys,
+ val <- maybeToList (KeyMap.lookup (Key.fromText k) obj)
+ ]
+ nested = listToMaybe (mapMaybe (findFieldValue keys) (KeyMap.elems obj))
+ in direct <|> nested
+ Aeson.Array arr ->
+ listToMaybe (mapMaybe (findFieldValue keys) (toList arr))
+ _ -> Nothing
+
+valueToBool :: Aeson.Value -> Maybe Bool
+valueToBool = \case
+ Aeson.Bool b -> Just b
+ Aeson.String t -> parseLooseBool t
+ Aeson.Number n -> Just (realToFrac n > (0 :: Double))
+ _ -> Nothing
+
+valueToNumber :: Aeson.Value -> Maybe Double
+valueToNumber = \case
+ Aeson.Number n -> Just (realToFrac n)
+ Aeson.String t -> Read.readMaybe (T.unpack (T.strip t))
+ Aeson.Bool True -> Just 1
+ Aeson.Bool False -> Just 0
+ _ -> Nothing
+
logMsg :: Text -> IO ()
logMsg msg = do
now <- getCurrentTime
- TIO.putStrLn ("[" <> T.pack (show now) <> "] " <> msg)
+ TIO.putStrLn <| "[" <> T.pack (show now) <> "] " <> msg
IO.hFlush IO.stdout
+
+-- ============================================================================
+-- Tests
+-- ============================================================================
+
+test :: Test.Tree
+test =
+ Test.group
+ "Omni.Pipeline"
+ [ Test.unit "parseBudgetAllowed reads boolean fields" <| do
+ parseBudgetAllowed 300 "{\"available\": true}" Test.@?= Just True
+ parseBudgetAllowed 300 "{\"ok\": false}" Test.@?= Just False,
+ Test.unit "parseBudgetAllowed reads remaining cents" <| do
+ parseBudgetAllowed 300 "{\"remaining_cents\": 500}" Test.@?= Just True
+ parseBudgetAllowed 300 "{\"remaining_cents\": 100}" Test.@?= Just False,
+ Test.unit "chooseSpawnCandidates enforces one run per domain" <| do
+ let t1 = DomainTask "t-1" "Omni/A.hs" "Omni/A.hs" (Just "A")
+ t2 = DomainTask "t-2" "Omni/B.hs" "Omni/B.hs" (Just "B")
+ t3 = DomainTask "t-3" "Omni/A.hs" "Omni/A.hs" (Just "A2")
+ now = Read.read "2026-01-01 00:00:00 UTC"
+ active = [ActiveRun "t-9" "Biz/X.hs" "Biz/X.hs" "run-9" now]
+ chosen = chooseSpawnCandidates 5 active [t1, t2, t3]
+ map dtTaskId chosen Test.@?= ["t-1", "t-2"],
+ Test.unit "domainFromNamespace keeps namespace alignment" <| do
+ domainFromNamespace "Omni/Pipeline.hs" Test.@?= "Omni/Pipeline.hs"
+ ]