commit 9c3da38ff72f5913ee40ca59d479c7bd7c9a1f15
Author: Coder Agent <coder@agents.omni>
Date: Sat Feb 14 15:57:37 2026
Omni/Pipeline: implement automated dev-verify-ship
Single-process pipeline replacing dev-review-release.sh.
Only dev phase uses an LLM agent; verify (bild) and integrate
(cherry-pick) are deterministic. Concurrent dev via workspace
pool. Structured state in SQLite pipeline_runs table.
Modules:
- Core.hs: types, config
- State.hs: pipeline_runs DB layer
- Git.hs: typed git operations
- Workspace.hs: worktree pool management
- Verify.hs: deterministic build verification
- Integrate.hs: cherry-pick integration
- Dev.hs: agent spawning, prompt building
- Pipeline.hs: CLI, main loop, status command
Task-Id: t-603
diff --git a/Omni/Pipeline.hs b/Omni/Pipeline.hs
new file mode 100755
index 00000000..62acf994
--- /dev/null
+++ b/Omni/Pipeline.hs
@@ -0,0 +1,562 @@
+#!/usr/bin/env run.sh
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Pipeline: automated dev → verify → ship.
+--
+-- Single-process pipeline that drives tasks from Open through development,
+-- build verification, and integration into the base branch.
+--
+-- Only the dev phase uses an LLM agent. Verify and integrate are deterministic.
+--
+-- : out pipeline
+-- : dep aeson
+-- : dep optparse-applicative
+-- : dep sqlite-simple
+-- : dep async
+-- : dep stm
+module Omni.Pipeline where
+
+import Alpha
+import Control.Concurrent (threadDelay)
+import qualified Control.Concurrent.STM as STM
+import qualified Data.Map.Strict as Map
+import qualified Data.Text as T
+import qualified Data.Text.IO as TIO
+import Data.Time (UTCTime, diffUTCTime, getCurrentTime)
+import qualified Database.SQLite.Simple as SQL
+import qualified Omni.Cli as Cli
+import qualified Omni.Pipeline.Core as Core
+import qualified Omni.Pipeline.Dev as Dev
+import qualified Omni.Pipeline.Integrate as Integrate
+import qualified Omni.Pipeline.State as State
+import qualified Omni.Pipeline.Verify as Verify
+import qualified Omni.Pipeline.Workspace as Workspace
+import qualified Omni.Task.Core as Task
+import qualified Omni.Test as Test
+import qualified Options.Applicative as Opt
+import qualified System.Directory as Dir
+import qualified System.Environment as Env
+import qualified Text.Read as Read
+
+main :: IO ()
+main =
+ join (Opt.execParser (Cli.info (parser <**> Opt.helper) Cli.fullDesc))
+
+test :: Test.Tree
+test =
+ Test.group
+ "Omni.Pipeline"
+ [ Test.unit "buildDevPrompt includes task id" <| do
+ let task =
+ Task.Task
+ { Task.taskId = "t-999",
+ Task.taskTitle = "Test task",
+ Task.taskType = Task.WorkTask,
+ Task.taskParent = Nothing,
+ Task.taskNamespace = Just "Omni/Test.hs",
+ Task.taskStatus = Task.Open,
+ Task.taskPatchsetCount = 0,
+ Task.taskPriority = Task.P2,
+ Task.taskComplexity = Nothing,
+ Task.taskDependencies = [],
+ Task.taskDescription = "A test task",
+ Task.taskComments = [],
+ Task.taskTags = Task.Tags [],
+ Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
+ Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
+ }
+ prompt = Dev.buildDevPrompt "live" task 0 Nothing
+ Test.assertBool "prompt contains task id" (T.isInfixOf "t-999" prompt)
+ Test.assertBool "prompt says no status changes" (T.isInfixOf "Do NOT change task status" prompt),
+ --
+ Test.unit "buildDevPrompt includes review feedback" <| do
+ let task =
+ Task.Task
+ { Task.taskId = "t-888",
+ Task.taskTitle = "Retry task",
+ Task.taskType = Task.WorkTask,
+ Task.taskParent = Nothing,
+ Task.taskNamespace = Just "Omni/Foo.hs",
+ Task.taskStatus = Task.InProgress,
+ Task.taskPatchsetCount = 1,
+ Task.taskPriority = Task.P2,
+ Task.taskComplexity = Nothing,
+ Task.taskDependencies = [],
+ Task.taskDescription = "Fix the thing",
+ Task.taskComments = [],
+ Task.taskTags = Task.Tags [],
+ Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
+ Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
+ }
+ prompt = Dev.buildDevPrompt "live" task 1 (Just "Build failed: missing import")
+ Test.assertBool "prompt contains feedback" (T.isInfixOf "missing import" prompt)
+ Test.assertBool "prompt mentions previous rejection" (T.isInfixOf "previous patchset was rejected" (T.toLower prompt))
+ ]
+
+-- ============================================================================
+-- CLI
+-- ============================================================================
+
+parser :: Cli.Parser (IO ())
+parser =
+ Cli.subparser
+ ( Cli.command "run" (Cli.info runParser (Cli.progDesc "Start the pipeline"))
+ <> Cli.command "status" (Cli.info statusParser (Cli.progDesc "Show pipeline health"))
+ )
+
+runParser :: Cli.Parser (IO ())
+runParser =
+ doRun
+ </ Cli.strOption (Cli.long "root" <> Cli.value "_/pipeline" <> Cli.help "Workspace root")
+ <*> Cli.strOption (Cli.long "base" <> Cli.value "live" <> Cli.help "Base branch")
+ <*> Cli.option Cli.auto (Cli.long "concurrency" <> Cli.value 2 <> Cli.help "Max concurrent dev agents")
+ <*> Cli.option Cli.auto (Cli.long "interval" <> Cli.value 10 <> Cli.help "Poll interval (seconds)")
+ <*> Cli.option Cli.auto (Cli.long "max-retries" <> Cli.value 5 <> Cli.help "Max retries per patchset")
+ <*> Cli.option Cli.auto (Cli.long "max-task-cost" <> Cli.value 0 <> Cli.help "Cost cap per task (cents, 0=unlimited)")
+ <*> Cli.option Cli.auto (Cli.long "timeout" <> Cli.value 1800 <> Cli.help "Agent timeout (seconds)")
+ <*> Cli.option Cli.auto (Cli.long "max-iter" <> Cli.value 80 <> Cli.help "Agent max iterations")
+ <*> Cli.option Cli.auto (Cli.long "max-cost" <> Cli.value 300 <> Cli.help "Agent max cost per run (cents)")
+ <*> Cli.optional (Cli.strOption (Cli.long "provider" <> Cli.help "Agent provider"))
+ <*> Cli.optional (Cli.strOption (Cli.long "parent" <> Cli.help "Only process tasks under this epic"))
+ <*> Cli.optional (Cli.strOption (Cli.long "task-id" <> Cli.help "Only process this one task"))
+ <*> Cli.switch (Cli.long "once" <> Cli.help "Process one cycle then exit")
+ <*> Cli.switch (Cli.long "dry-run" <> Cli.help "Print what would happen")
+
+statusParser :: Cli.Parser (IO ())
+statusParser =
+ doStatus
+ </ Cli.strOption (Cli.long "root" <> Cli.value "_/pipeline" <> Cli.help "Workspace root")
+ <*> Cli.switch (Cli.long "json" <> Cli.help "JSON output")
+
+-- ============================================================================
+-- Run command
+-- ============================================================================
+
+doRun ::
+ String ->
+ String ->
+ Int ->
+ Int ->
+ Int ->
+ Int ->
+ Int ->
+ Int ->
+ Int ->
+ Maybe String ->
+ Maybe String ->
+ Maybe String ->
+ Bool ->
+ Bool ->
+ IO ()
+doRun root base concurrency interval maxRetries maxTaskCost timeout maxIter maxCost provider parent taskFilter once dryRun = do
+ repoRoot <- Env.getEnv "CODEROOT"
+ let cfg =
+ Core.PipelineConfig
+ { Core.pcRoot = root,
+ Core.pcRepoRoot = repoRoot,
+ Core.pcBaseBranch = T.pack base,
+ Core.pcConcurrency = concurrency,
+ Core.pcPollInterval = interval,
+ Core.pcMaxRetries = maxRetries,
+ Core.pcMaxTaskCost = maxTaskCost,
+ Core.pcAgentTimeout = timeout,
+ Core.pcAgentMaxIter = maxIter,
+ Core.pcAgentMaxCost = maxCost,
+ Core.pcAgentProvider = T.pack </ provider,
+ Core.pcParentFilter = T.pack </ parent,
+ Core.pcTaskFilter = T.pack </ taskFilter,
+ Core.pcDryRun = dryRun
+ }
+
+ -- Initialize
+ Dir.createDirectoryIfMissing True root
+ db <- State.initPipelineDb (root <> "/state.db")
+ pool <- Workspace.newPool cfg
+ activeDevs <- STM.newTVarIO (Map.empty :: Map.Map Text Core.ActiveDev)
+
+ logMsg "Pipeline starting"
+ logMsg ("root=" <> T.pack root <> " base=" <> T.pack base <> " concurrency=" <> T.pack (show concurrency))
+
+ -- Startup recovery
+ recoverStaleState db pool activeDevs
+
+ -- Main loop
+ let loop = do
+ runOneCycle cfg db pool activeDevs
+ if once
+ then logMsg "Single cycle complete, exiting"
+ else do
+ threadDelay (interval * 1_000_000)
+ loop
+ loop
+
+-- | Run one full pipeline cycle.
+runOneCycle ::
+ Core.PipelineConfig ->
+ SQL.Connection ->
+ Workspace.WorkspacePool ->
+ STM.TVar (Map.Map Text Core.ActiveDev) ->
+ IO ()
+runOneCycle cfg db pool activeDevs = do
+ -- Load all tasks once per cycle
+ allTasks <- Task.loadTasks
+
+ let workTasks = filter isEligible allTasks
+ byStatus s = filter (\t -> Task.taskStatus t == s) workTasks
+
+ -- 1. INTEGRATE: process Verified tasks
+ let verified = byStatus Task.Verified
+ forM_ verified <| \task -> do
+ let tid = Task.taskId task
+ logMsg ("Integrating: " <> tid)
+ if Core.pcDryRun cfg
+ then logMsg ("DRY RUN: would integrate " <> tid)
+ else doIntegrate cfg db pool task
+
+ -- 2. HARVEST + VERIFY: check running dev agents
+ active <- STM.readTVarIO activeDevs
+ forM_ (Map.toList active) <| \(tid, ad) -> do
+ result <- Dev.checkDevStatus ad
+ case result of
+ Core.StillRunning -> pure ()
+ Core.DevSuccess maybeSha -> do
+ logMsg ("Dev completed for " <> tid <> " (sha=" <> fromMaybe "?" maybeSha <> ")")
+ cost <- Dev.getRunCost (Core.adRunId ad)
+ State.markRunFinished db (Core.adRunDbId ad) Core.Success (Just cost) Nothing
+ _ <- Task.addComment tid ("Pipeline: dev completed (run=" <> Core.adRunId ad <> ", cost=" <> T.pack (show cost) <> "c)") Task.System
+ -- Remove from active
+ STM.atomically <| STM.modifyTVar' activeDevs (Map.delete tid)
+ -- Immediately verify
+ doVerify cfg db ad
+ Core.DevFailed err -> do
+ logMsg ("Dev failed for " <> tid <> ": " <> err)
+ cost <- Dev.getRunCost (Core.adRunId ad)
+ State.markRunFinished db (Core.adRunDbId ad) Core.Failure (Just cost) (Just err)
+ _ <- Task.addComment tid ("Pipeline: dev failed (run=" <> Core.adRunId ad <> "): " <> err) Task.System
+ -- Remove from active, reset task to Open
+ STM.atomically <| STM.modifyTVar' activeDevs (Map.delete tid)
+ Task.updateTaskStatus tid Task.Open []
+
+ -- 3. DEVELOP: spawn agents for Open tasks
+ let open = byStatus Task.Open
+ slots <- Workspace.slotsAvailable pool
+ currentActive <- STM.readTVarIO activeDevs
+ let availableSlots = slots - Map.size currentActive
+ forM_ (take availableSlots open) <| \task -> do
+ let tid = Task.taskId task
+ -- Check retry/cost limits
+ canRetry <- checkRetryLimits cfg db task
+ when canRetry <| do
+ if Core.pcDryRun cfg
+ then logMsg ("DRY RUN: would start dev for " <> tid)
+ else doSpawnDev cfg db pool activeDevs task
+ where
+ isEligible task =
+ Task.taskType task
+ == Task.WorkTask
+ && matchesFilter (Core.pcTaskFilter cfg) (Task.taskId task)
+ && matchesParent (Core.pcParentFilter cfg) (Task.taskParent task)
+
+ matchesFilter Nothing _ = True
+ matchesFilter (Just f) tid = f == tid
+
+ matchesParent Nothing _ = True
+ matchesParent (Just f) parent = parent == Just f
+
+-- | Integrate a verified task.
+doIntegrate ::
+ Core.PipelineConfig ->
+ SQL.Connection ->
+ Workspace.WorkspacePool ->
+ Task.Task ->
+ IO ()
+doIntegrate cfg db pool task = do
+ let tid = Task.taskId task
+ baseBranch = Core.pcBaseBranch cfg
+ repoRoot = Core.pcRepoRoot cfg
+ ns = Task.taskNamespace task
+ now <- getCurrentTime
+
+ -- Record the integration attempt
+ rowId <-
+ State.recordRun
+ db
+ Core.RunRecord
+ { Core.rrId = Nothing,
+ Core.rrTaskId = tid,
+ Core.rrPhase = Core.Integrate,
+ Core.rrPatchset = Task.taskPatchsetCount task,
+ Core.rrAgentdRunId = Nothing,
+ Core.rrStatus = Core.Running,
+ Core.rrCostCents = 0,
+ Core.rrStartedAt = now,
+ Core.rrFinishedAt = Nothing,
+ Core.rrError = Nothing
+ }
+
+ result <- Integrate.integrateTask pool repoRoot tid baseBranch ns
+ case result of
+ Core.Integrated commitHash -> do
+ State.markRunFinished db rowId Core.Success Nothing Nothing
+ Task.updateTaskStatus tid Task.Done []
+ _ <- Task.addComment tid ("Pipeline: integrated into " <> baseBranch <> " at " <> commitHash) Task.System
+ Workspace.releaseWorkspace pool tid
+ logMsg ("Integrated " <> tid <> " at " <> commitHash)
+ Core.Conflict details -> do
+ State.markRunFinished db rowId Core.Failure Nothing (Just details)
+ Task.updateTaskStatus tid Task.Open []
+ _ <- Task.addComment tid ("Pipeline: integration conflict, resetting for re-dev: " <> details) Task.System
+ -- Reset workspace so dev starts fresh
+ _ <- Workspace.resetWorkspace pool tid
+ logMsg ("Integration conflict for " <> tid <> ": " <> details)
+ Core.IntegrateBuildFail details -> do
+ State.markRunFinished db rowId Core.Failure Nothing (Just details)
+ Task.updateTaskStatus tid Task.Open []
+ _ <- Task.addComment tid ("Pipeline: post-integration build failed, resetting: " <> details) Task.System
+ _ <- Workspace.resetWorkspace pool tid
+ logMsg ("Integration build failed for " <> tid <> ": " <> details)
+
+-- | Verify a completed dev run.
+doVerify ::
+ Core.PipelineConfig ->
+ SQL.Connection ->
+ Core.ActiveDev ->
+ IO ()
+doVerify cfg db ad = do
+ let tid = Core.adTaskId ad
+ baseBranch = Core.pcBaseBranch cfg
+ workspace = Core.adWorkspace ad
+ ns = Core.adNamespace ad
+ now <- getCurrentTime
+
+ rowId <-
+ State.recordRun
+ db
+ Core.RunRecord
+ { Core.rrId = Nothing,
+ Core.rrTaskId = tid,
+ Core.rrPhase = Core.Verify,
+ Core.rrPatchset = Core.adPatchset ad,
+ Core.rrAgentdRunId = Nothing,
+ Core.rrStatus = Core.Running,
+ Core.rrCostCents = 0,
+ Core.rrStartedAt = now,
+ Core.rrFinishedAt = Nothing,
+ Core.rrError = Nothing
+ }
+
+ result <- Verify.verifyTask workspace tid baseBranch ns
+ case result of
+ Core.VerifyPass -> do
+ State.markRunFinished db rowId Core.Success Nothing Nothing
+ Task.updateTaskStatus tid Task.Verified []
+ _ <- Task.addComment tid "Pipeline: build verification passed" Task.System
+ logMsg ("Verified " <> tid)
+ Core.VerifySkip reason -> do
+ State.markRunFinished db rowId Core.Success Nothing (Just reason)
+ Task.updateTaskStatus tid Task.Verified []
+ _ <- Task.addComment tid ("Pipeline: verification skipped (" <> reason <> "), promoting") Task.System
+ logMsg ("Verified (skipped) " <> tid <> ": " <> reason)
+ Core.VerifyFail reason -> do
+ State.markRunFinished db rowId Core.Failure Nothing (Just reason)
+ Task.updateTaskStatus tid Task.Open []
+ _ <- Task.addComment tid ("Pipeline: verification failed:\n" <> reason) Task.System
+ logMsg ("Verify failed for " <> tid <> ": " <> T.take 200 reason)
+
+-- | Spawn a dev agent for a task.
+doSpawnDev ::
+ Core.PipelineConfig ->
+ SQL.Connection ->
+ Workspace.WorkspacePool ->
+ STM.TVar (Map.Map Text Core.ActiveDev) ->
+ Task.Task ->
+ IO ()
+doSpawnDev cfg db pool activeDevs task = do
+ let tid = Task.taskId task
+ patchset = Task.taskPatchsetCount task
+ now <- getCurrentTime
+
+ -- Claim the task
+ Task.updateTaskStatus tid Task.InProgress []
+
+ -- Get or create workspace
+ wsResult <- Workspace.acquireWorkspace pool tid
+ case wsResult of
+ Left e -> do
+ logMsg ("Failed to acquire workspace for " <> tid <> ": " <> e)
+ Task.updateTaskStatus tid Task.Open []
+ pure ()
+ Right workspace -> do
+ -- Get review feedback from last verify failure
+ reviewFeedback <- State.getLastVerifyFailure db tid
+
+ -- Record the run
+ rowId <-
+ State.recordRun
+ db
+ Core.RunRecord
+ { Core.rrId = Nothing,
+ Core.rrTaskId = tid,
+ Core.rrPhase = Core.Dev,
+ Core.rrPatchset = patchset,
+ Core.rrAgentdRunId = Nothing,
+ Core.rrStatus = Core.Running,
+ Core.rrCostCents = 0,
+ Core.rrStartedAt = now,
+ Core.rrFinishedAt = Nothing,
+ Core.rrError = Nothing
+ }
+
+ -- Spawn the agent
+ result <- Dev.spawnDev cfg workspace task patchset reviewFeedback rowId
+ case result of
+ Left e -> do
+ State.markRunFinished db rowId Core.Failure Nothing (Just e)
+ Task.updateTaskStatus tid Task.Open []
+ _ <- Task.addComment tid ("Pipeline: failed to spawn dev agent: " <> e) Task.System
+ logMsg ("Failed to spawn dev for " <> tid <> ": " <> e)
+ Right ad -> do
+ -- Update the run record with the agentd run ID
+ -- (we didn't have it when we created the record)
+ STM.atomically <| STM.modifyTVar' activeDevs (Map.insert tid ad)
+ logMsg ("Spawned dev for " <> tid <> " (run=" <> Core.adRunId ad <> ")")
+
+-- | Check if a task can be retried (within retry and cost limits).
+checkRetryLimits ::
+ Core.PipelineConfig ->
+ SQL.Connection ->
+ Task.Task ->
+ IO Bool
+checkRetryLimits cfg db task = do
+ let tid = Task.taskId task
+ patchset = Task.taskPatchsetCount task
+ maxRetries = Core.pcMaxRetries cfg
+ maxCost = Core.pcMaxTaskCost cfg
+
+ -- Check retry count
+ retryCount <- State.getRetryCount db tid Core.Dev patchset
+ when (maxRetries > 0 && retryCount >= maxRetries) <| do
+ logMsg ("Task " <> tid <> " exceeded max retries (" <> T.pack (show retryCount) <> "/" <> T.pack (show maxRetries) <> ")")
+ Task.updateTaskStatus tid Task.NeedsHelp []
+ _ <- Task.addComment tid ("Pipeline: exceeded " <> T.pack (show maxRetries) <> " retries on patchset " <> T.pack (show patchset)) Task.System
+ pure ()
+
+ if maxRetries > 0 && retryCount >= maxRetries
+ then pure False
+ else do
+ -- Check cost cap
+ if maxCost > 0
+ then do
+ cost <- State.getCumulativeCost db tid
+ if cost >= fromIntegral maxCost
+ then do
+ logMsg ("Task " <> tid <> " exceeded cost cap (" <> T.pack (show cost) <> "c)")
+ Task.updateTaskStatus tid Task.NeedsHelp []
+ _ <- Task.addComment tid ("Pipeline: exceeded cost cap of " <> T.pack (show maxCost) <> "c") Task.System
+ pure False
+ else checkBackoff cfg db tid patchset retryCount
+ else checkBackoff cfg db tid patchset retryCount
+
+-- | Check exponential backoff.
+checkBackoff ::
+ Core.PipelineConfig ->
+ SQL.Connection ->
+ Text ->
+ Int ->
+ Int ->
+ IO Bool
+checkBackoff cfg db tid patchset retryCount = do
+ if retryCount == 0
+ then pure True
+ else do
+ lastFail <- State.getLastFailureTime db tid patchset
+ case lastFail of
+ Nothing -> pure True
+ Just failTime -> do
+ now <- getCurrentTime
+ let baseInterval = fromIntegral (Core.pcPollInterval cfg) :: Double
+ delay = min 600 (baseInterval * (2 ^ retryCount))
+ elapsed = realToFrac (diffUTCTime now failTime) :: Double
+ if elapsed >= delay
+ then pure True
+ else do
+ logMsg ("Task " <> tid <> " in backoff (" <> T.pack (show (round (delay - elapsed) :: Int)) <> "s remaining)")
+ pure False
+
+-- | Recover stale state on startup.
+recoverStaleState ::
+ SQL.Connection ->
+ Workspace.WorkspacePool ->
+ STM.TVar (Map.Map Text Core.ActiveDev) ->
+ IO ()
+recoverStaleState db pool _activeDevs = do
+ -- Find worktrees that exist but have no running agent
+ staleTasks <- Workspace.findStaleWorktrees pool
+ forM_ staleTasks <| \tid -> do
+ hasRun <- State.hasActiveRun db tid
+ unless hasRun <| do
+ logMsg ("Recovering stale worktree for " <> tid)
+ -- Check task status — if InProgress with no agent, reset to Open
+ tasks <- Task.loadTasks
+ case Task.findTask tid tasks of
+ Nothing -> do
+ -- Unknown task, just clean up
+ Workspace.releaseWorkspace pool tid
+ Just task -> do
+ when (Task.taskStatus task == Task.InProgress) <| do
+ Task.updateTaskStatus tid Task.Open []
+ _ <- Task.addComment tid "Pipeline: recovered stale InProgress task on startup" Task.System
+ pure ()
+
+-- ============================================================================
+-- Status command
+-- ============================================================================
+
+doStatus :: String -> Bool -> IO ()
+doStatus root asJson = do
+ let dbPath = root <> "/state.db"
+ exists <- Dir.doesFileExist dbPath
+ unless exists <| do
+ TIO.putStrLn "Pipeline state DB not found. Has the pipeline been started?"
+ pure ()
+ when exists <| do
+ db <- State.initPipelineDb dbPath
+ allTasks <- Task.loadTasks
+
+ let workTasks = filter (\t -> Task.taskType t == Task.WorkTask) allTasks
+ countByStatus s = length (filter (\t -> Task.taskStatus t == s) workTasks)
+
+ activeRuns <- State.getActiveDevRuns db
+
+ if asJson
+ then do
+ -- Simple JSON output
+ TIO.putStrLn "{"
+ TIO.putStrLn (" \"open\": " <> T.pack (show (countByStatus Task.Open)) <> ",")
+ TIO.putStrLn (" \"in_progress\": " <> T.pack (show (countByStatus Task.InProgress)) <> ",")
+ TIO.putStrLn (" \"verified\": " <> T.pack (show (countByStatus Task.Verified)) <> ",")
+ TIO.putStrLn (" \"done\": " <> T.pack (show (countByStatus Task.Done)) <> ",")
+ TIO.putStrLn (" \"needs_help\": " <> T.pack (show (countByStatus Task.NeedsHelp)) <> ",")
+ TIO.putStrLn (" \"active_dev_runs\": " <> T.pack (show (length activeRuns)))
+ TIO.putStrLn "}"
+ else do
+ TIO.putStrLn ("Pipeline status (root=" <> T.pack root <> ")")
+ TIO.putStrLn ""
+ TIO.putStrLn "Tasks:"
+ TIO.putStrLn (" Open: " <> T.pack (show (countByStatus Task.Open)))
+ TIO.putStrLn (" InProgress: " <> T.pack (show (countByStatus Task.InProgress)))
+ TIO.putStrLn (" Verified: " <> T.pack (show (countByStatus Task.Verified)))
+ TIO.putStrLn (" Done: " <> T.pack (show (countByStatus Task.Done)))
+ TIO.putStrLn (" NeedsHelp: " <> T.pack (show (countByStatus Task.NeedsHelp)))
+ TIO.putStrLn ""
+ TIO.putStrLn ("Active dev runs: " <> T.pack (show (length activeRuns)))
+
+-- ============================================================================
+-- Helpers
+-- ============================================================================
+
+logMsg :: Text -> IO ()
+logMsg msg = do
+ now <- getCurrentTime
+ TIO.putStrLn ("[" <> T.pack (show now) <> "] " <> msg)
diff --git a/Omni/Pipeline/Core.hs b/Omni/Pipeline/Core.hs
new file mode 100644
index 00000000..8b26be64
--- /dev/null
+++ b/Omni/Pipeline/Core.hs
@@ -0,0 +1,147 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Core types and configuration for the pipeline.
+module Omni.Pipeline.Core where
+
+import Alpha
+import Data.Aeson (FromJSON, ToJSON)
+import qualified Data.Aeson as Aeson
+import qualified Data.Text as T
+import Data.Time (UTCTime)
+
+-- | Pipeline configuration, parsed from CLI flags.
+data PipelineConfig = PipelineConfig
+ { pcRoot :: FilePath,
+ pcRepoRoot :: FilePath,
+ pcBaseBranch :: Text,
+ pcConcurrency :: Int,
+ pcPollInterval :: Int, -- seconds
+ pcMaxRetries :: Int, -- per patchset
+ pcMaxTaskCost :: Int, -- cents, 0 = unlimited
+ pcAgentTimeout :: Int, -- seconds
+ pcAgentMaxIter :: Int,
+ pcAgentMaxCost :: Int, -- cents per run
+ pcAgentProvider :: Maybe Text,
+ pcParentFilter :: Maybe Text,
+ pcTaskFilter :: Maybe Text,
+ pcDryRun :: Bool
+ }
+ deriving (Show, Eq, Generic)
+
+defaultConfig :: PipelineConfig
+defaultConfig =
+ PipelineConfig
+ { pcRoot = "_/pipeline",
+ pcRepoRoot = ".",
+ pcBaseBranch = "live",
+ pcConcurrency = 2,
+ pcPollInterval = 10,
+ pcMaxRetries = 5,
+ pcMaxTaskCost = 0,
+ pcAgentTimeout = 1800,
+ pcAgentMaxIter = 80,
+ pcAgentMaxCost = 300,
+ pcAgentProvider = Nothing,
+ pcParentFilter = Nothing,
+ pcTaskFilter = Nothing,
+ pcDryRun = False
+ }
+
+-- | Pipeline execution phases.
+data Phase = Dev | Verify | Integrate
+ deriving (Show, Eq, Read, Generic)
+
+instance ToJSON Phase
+
+instance FromJSON Phase
+
+phaseToText :: Phase -> Text
+phaseToText Dev = "dev"
+phaseToText Verify = "verify"
+phaseToText Integrate = "integrate"
+
+phaseFromText :: Text -> Maybe Phase
+phaseFromText "dev" = Just Dev
+phaseFromText "verify" = Just Verify
+phaseFromText "integrate" = Just Integrate
+phaseFromText _ = Nothing
+
+-- | Status of a pipeline run.
+data RunStatus = Running | Success | Failure
+ deriving (Show, Eq, Read, Generic)
+
+instance ToJSON RunStatus
+
+instance FromJSON RunStatus
+
+runStatusToText :: RunStatus -> Text
+runStatusToText Running = "running"
+runStatusToText Success = "success"
+runStatusToText Failure = "failure"
+
+runStatusFromText :: Text -> Maybe RunStatus
+runStatusFromText "running" = Just Running
+runStatusFromText "success" = Just Success
+runStatusFromText "failure" = Just Failure
+runStatusFromText _ = Nothing
+
+-- | A record of one pipeline run (dev agent, verify, or integrate).
+data RunRecord = RunRecord
+ { rrId :: Maybe Int64,
+ rrTaskId :: Text,
+ rrPhase :: Phase,
+ rrPatchset :: Int,
+ rrAgentdRunId :: Maybe Text,
+ rrStatus :: RunStatus,
+ rrCostCents :: Double,
+ rrStartedAt :: UTCTime,
+ rrFinishedAt :: Maybe UTCTime,
+ rrError :: Maybe Text
+ }
+ deriving (Show, Eq, Generic)
+
+instance ToJSON RunRecord
+
+-- | Tracks an in-flight dev agent.
+data ActiveDev = ActiveDev
+ { adTaskId :: Text,
+ adRunId :: Text,
+ adWorkspace :: FilePath,
+ adStartedAt :: UTCTime,
+ adBeforeSha :: Maybe Text,
+ adPatchset :: Int,
+ adNamespace :: Maybe Text,
+ adRunDbId :: Int64
+ }
+ deriving (Show, Eq)
+
+-- | Result of build verification.
+data VerifyResult
+ = VerifyPass
+ | VerifyFail Text
+ | VerifySkip Text
+ deriving (Show, Eq)
+
+-- | Result of integration attempt.
+data IntegrateResult
+ = Integrated Text -- commit hash
+ | Conflict Text -- conflict details
+ | IntegrateBuildFail Text -- post-cherry-pick verify failed
+ deriving (Show, Eq)
+
+-- | Result of checking a dev agent's status.
+data DevResult
+ = StillRunning
+ | DevSuccess (Maybe Text) -- new commit SHA, if changed
+ | DevFailed Text -- error message
+ deriving (Show, Eq)
+
+-- | Workspace path for the integration worktree.
+integrationDir :: PipelineConfig -> FilePath
+integrationDir cfg = pcRoot cfg <> "/integration"
+
+-- | Workspace path for a task's dev worktree.
+taskDir :: PipelineConfig -> Text -> FilePath
+taskDir cfg tid = pcRoot cfg <> "/" <> T.unpack tid
diff --git a/Omni/Pipeline/Dev.hs b/Omni/Pipeline/Dev.hs
new file mode 100644
index 00000000..8bb6fe48
--- /dev/null
+++ b/Omni/Pipeline/Dev.hs
@@ -0,0 +1,204 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Dev phase: agent spawning and prompt building.
+--
+-- The only phase that uses an LLM agent. Builds a simplified prompt
+-- (agent just writes code, pipeline owns status transitions) and
+-- manages the agent lifecycle via agentd.
+module Omni.Pipeline.Dev where
+
+import Alpha
+import qualified Data.List as List
+import qualified Data.Text as T
+import qualified Data.Text.IO as TIO
+import Data.Time (UTCTime, getCurrentTime, toModifiedJulianDay, utctDay, utctDayTime)
+import qualified Omni.Pipeline.Core as Core
+import qualified Omni.Pipeline.Git as Git
+import qualified Omni.Task.Core as Task
+import qualified System.Directory as Dir
+import qualified System.Exit as Exit
+import qualified System.Process as Process
+import qualified Text.Read as Read
+
+-- | Build the dev prompt. Pure function — easy to test.
+buildDevPrompt :: Text -> Task.Task -> Int -> Maybe Text -> Text
+buildDevPrompt baseBranch task patchset maybeReviewFeedback =
+ T.unlines
+ [ "# Dev Task",
+ "",
+ "You are a coder. Implement the task below.",
+ "",
+ "## Rules",
+ "1. Work only in this workspace.",
+ "2. One commit on branch `" <> Task.taskId task <> "` with trailer `Task-Id: " <> Task.taskId task <> "`.",
+ "3. If a commit already exists (patchset > 0), amend it with `git commit --amend`.",
+ "4. Verify: `typecheck.sh " <> ns <> "` then `bild " <> ns <> "` then `bild --test " <> ns <> "`.",
+ "5. **Do NOT change task status.** The pipeline handles transitions.",
+ "6. **Do NOT push.**",
+ "7. Follow repository conventions from AGENTS.md.",
+ "",
+ "## Task",
+ "- ID: " <> Task.taskId task,
+ "- Branch: " <> Task.taskId task,
+ "- Base: " <> baseBranch,
+ "- Patchset: " <> T.pack (show patchset),
+ "- Namespace: " <> ns,
+ "",
+ "### Title",
+ Task.taskTitle task,
+ "",
+ "### Description",
+ Task.taskDescription task,
+ "",
+ feedback,
+ "### Constraints",
+ "- Work only in the provided workspace.",
+ "- Keep one-task-one-commit discipline.",
+ "- Commit message must include: `Task-Id: " <> Task.taskId task <> "`"
+ ]
+ where
+ ns = fromMaybe "(none)" (Task.taskNamespace task)
+ feedback = case maybeReviewFeedback of
+ Nothing -> ""
+ Just fb ->
+ T.unlines
+ [ "### Previous Review Feedback",
+ "The previous patchset was rejected. Fix the issues below:",
+ "",
+ fb,
+ ""
+ ]
+
+-- | Spawn a dev agent via agentd. Returns an ActiveDev record.
+spawnDev ::
+ Core.PipelineConfig ->
+ FilePath ->
+ Task.Task ->
+ Int ->
+ Maybe Text ->
+ Int64 ->
+ IO (Either Text Core.ActiveDev)
+spawnDev cfg workspace task patchset maybeReviewFeedback dbRowId = do
+ let taskId = Task.taskId task
+ baseBranch = Core.pcBaseBranch cfg
+
+ -- Get commit SHA before agent runs (to detect changes later)
+ beforeSha <- Git.branchSha workspace taskId
+
+ -- Build and write prompt
+ let prompt = buildDevPrompt baseBranch task patchset maybeReviewFeedback
+ now <- getCurrentTime
+ let runId = "dev-" <> T.unpack taskId <> "-" <> show (floor (utcTimeToPOSIXSeconds now) :: Integer)
+ promptDir = workspace <> "/_/tmp/pipeline"
+ promptFile = promptDir <> "/" <> runId <> ".md"
+ Dir.createDirectoryIfMissing True promptDir
+ TIO.writeFile promptFile prompt
+
+ -- Build agentd command
+ let args =
+ ["run", promptFile, "-n", runId, "--timeout", show (Core.pcAgentTimeout cfg), "--max-iter", show (Core.pcAgentMaxIter cfg), "--max-cost", show (Core.pcAgentMaxCost cfg)]
+ ++ maybe [] (\p -> ["-p", T.unpack p]) (Core.pcAgentProvider cfg)
+
+ -- Spawn in background (don't use --fg)
+ (exitCode, _stdout, stderr) <-
+ Process.readCreateProcessWithExitCode
+ (Process.proc "agentd" args) {Process.cwd = Just workspace}
+ ""
+
+ case exitCode of
+ Exit.ExitSuccess ->
+ pure
+ <| Right
+ Core.ActiveDev
+ { Core.adTaskId = taskId,
+ Core.adRunId = T.pack runId,
+ Core.adWorkspace = workspace,
+ Core.adStartedAt = now,
+ Core.adBeforeSha = beforeSha,
+ Core.adPatchset = patchset,
+ Core.adNamespace = Task.taskNamespace task,
+ Core.adRunDbId = dbRowId
+ }
+ Exit.ExitFailure _ ->
+ pure (Left ("agentd spawn failed: " <> T.pack stderr))
+
+-- | Check the status of a running dev agent.
+checkDevStatus :: Core.ActiveDev -> IO Core.DevResult
+checkDevStatus ad = do
+ -- Poll agentd status
+ (exitCode, stdout, _stderr) <-
+ Process.readProcessWithExitCode
+ "agentd"
+ ["status", T.unpack (Core.adRunId ad), "--json"]
+ ""
+ case exitCode of
+ Exit.ExitFailure _ ->
+ -- Can't determine status — treat as still running
+ -- (agentd might not have started yet)
+ pure Core.StillRunning
+ Exit.ExitSuccess -> do
+ let status = extractField "status" stdout
+ case status of
+ Just "running" -> pure Core.StillRunning
+ Just "completed" -> checkForCommit ad
+ Just "failed" ->
+ pure (Core.DevFailed (T.pack (fromMaybe "Agent run failed" (extractField "error" stdout))))
+ Just "timeout" ->
+ pure (Core.DevFailed "Agent run timed out")
+ _ -> pure Core.StillRunning
+
+-- | After agent completes, check if a new commit was produced.
+checkForCommit :: Core.ActiveDev -> IO Core.DevResult
+checkForCommit ad = do
+ afterSha <- Git.branchSha (Core.adWorkspace ad) (Core.adTaskId ad)
+ case (Core.adBeforeSha ad, afterSha) of
+ (_, Nothing) -> pure (Core.DevFailed "Task branch not found after agent run")
+ (before, Just after)
+ | before == Just after -> pure (Core.DevFailed "Agent completed but made no commit")
+ | otherwise -> pure (Core.DevSuccess (Just after))
+
+-- | Get the cost of an agentd run.
+getRunCost :: Text -> IO Double
+getRunCost runId = do
+ (exitCode, stdout, _) <-
+ Process.readProcessWithExitCode
+ "agentd"
+ ["status", T.unpack runId, "--json"]
+ ""
+ case exitCode of
+ Exit.ExitSuccess ->
+ case extractField "cost_cents" stdout of
+ Just s -> pure (fromMaybe 0 (Read.readMaybe s))
+ Nothing -> pure 0
+ _ -> pure 0
+
+-- | Simple field extraction from JSON output (avoids aeson dependency for
+-- quick status checks). Looks for "field": "value" or "field": number.
+extractField :: String -> String -> Maybe String
+extractField field input =
+ case matchingLine of
+ Nothing -> Nothing
+ Just line ->
+ let (_, rest) = List.break (== ':') line
+ in case rest of
+ ':' : val -> Just (List.filter (\c -> c /= '"' && c /= ',' && c /= ' ') val)
+ _ -> Nothing
+ where
+ matchingLine = List.find (List.isInfixOf ("\"" <> field <> "\"")) (splitOn '\n' input)
+
+-- | Split a String on a delimiter character.
+splitOn :: Char -> String -> [String]
+splitOn _ [] = []
+splitOn c s =
+ let (before, after) = List.break (== c) s
+ in before : case after of
+ [] -> []
+ (_ : rest) -> splitOn c rest
+
+-- | Convert UTCTime to POSIX seconds.
+utcTimeToPOSIXSeconds :: UTCTime -> Double
+utcTimeToPOSIXSeconds t =
+ let day = toModifiedJulianDay (utctDay t)
+ secs = realToFrac (utctDayTime t) :: Double
+ in fromIntegral (day - 40587) * 86400 + secs
diff --git a/Omni/Pipeline/Git.hs b/Omni/Pipeline/Git.hs
new file mode 100644
index 00000000..eb3fc9b6
--- /dev/null
+++ b/Omni/Pipeline/Git.hs
@@ -0,0 +1,158 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Typed wrappers around git CLI operations.
+module Omni.Pipeline.Git where
+
+import Alpha
+import qualified Data.List as List
+import qualified Data.Text as T
+import qualified System.Exit as Exit
+import qualified System.Process as Process
+
+-- | Structured git error.
+data GitError = GitError
+ { geCommand :: Text,
+ geExitCode :: Int,
+ geStderr :: Text
+ }
+ deriving (Show, Eq)
+
+-- | Run a git command in a given working directory.
+runGit :: FilePath -> [String] -> IO (Either GitError String)
+runGit cwd args = do
+ (exitCode, stdout, stderr) <- Process.readProcessWithExitCode "git" (["-C", cwd] ++ args) ""
+ case exitCode of
+ Exit.ExitSuccess -> pure (Right (strip stdout))
+ Exit.ExitFailure n ->
+ pure
+ (Left (GitError (T.unwords (map T.pack ("git" : args))) (fromIntegral n) (T.pack stderr)))
+ where
+ strip = List.reverse <. List.dropWhile (== '\n') <. List.reverse
+
+-- | Run git in the repo root (no -C flag), useful for worktree commands
+-- that need to operate on the main repo.
+runGitRepo :: FilePath -> [String] -> IO (Either GitError String)
+runGitRepo = runGit
+
+-- | Create a new worktree with a new branch from a start point.
+createWorktree :: FilePath -> FilePath -> Text -> Text -> IO (Either GitError ())
+createWorktree repoRoot path branch startPoint =
+ fmap
+ (() <$)
+ (runGitRepo repoRoot ["worktree", "add", "-b", T.unpack branch, path, T.unpack startPoint])
+
+-- | Create a worktree from an existing branch.
+createWorktreeExisting :: FilePath -> FilePath -> Text -> IO (Either GitError ())
+createWorktreeExisting repoRoot path branch =
+ fmap
+ (() <$)
+ (runGitRepo repoRoot ["worktree", "add", path, T.unpack branch])
+
+-- | Remove a worktree.
+removeWorktree :: FilePath -> FilePath -> IO (Either GitError ())
+removeWorktree repoRoot path =
+ fmap (() <$) (runGitRepo repoRoot ["worktree", "remove", "--force", path])
+
+-- | Check out a branch in a workspace.
+checkoutBranch :: FilePath -> Text -> IO (Either GitError ())
+checkoutBranch workspace branch =
+ fmap (() <$) (runGit workspace ["checkout", T.unpack branch])
+
+-- | Create a new branch from a start point.
+createBranch :: FilePath -> Text -> Text -> IO (Either GitError ())
+createBranch workspace branch startPoint =
+ fmap
+ (() <$)
+ (runGit workspace ["checkout", "-b", T.unpack branch, T.unpack startPoint])
+
+-- | Delete a local branch.
+deleteBranch :: FilePath -> Text -> IO (Either GitError ())
+deleteBranch repoRoot branch =
+ fmap (() <$) (runGitRepo repoRoot ["branch", "-D", T.unpack branch])
+
+-- | Cherry-pick a commit (by branch name or SHA). Returns the new commit hash.
+cherryPick :: FilePath -> Text -> IO (Either GitError Text)
+cherryPick workspace ref = do
+ result <- runGit workspace ["cherry-pick", T.unpack ref]
+ case result of
+ Left e -> pure (Left e)
+ Right _ -> do
+ sha <- runGit workspace ["rev-parse", "HEAD"]
+ pure (T.pack </ sha)
+
+-- | Abort an in-progress cherry-pick.
+abortCherryPick :: FilePath -> IO (Either GitError ())
+abortCherryPick workspace =
+ fmap (() <$) (runGit workspace ["cherry-pick", "--abort"])
+
+-- | Count commits between two refs: `git rev-list --count base..head`
+commitCountBetween :: FilePath -> Text -> Text -> IO (Either GitError Int)
+commitCountBetween workspace base headRef = do
+ result <- runGit workspace ["rev-list", "--count", T.unpack base <> ".." <> T.unpack headRef]
+ case result of
+ Left e -> pure (Left e)
+ Right s -> case reads s of
+ [(n, _)] -> pure (Right n)
+ _ -> pure (Left (GitError "rev-list --count parse" 1 ("Could not parse: " <> T.pack s)))
+
+-- | Get the SHA of a branch tip, or Nothing if it doesn't exist.
+branchSha :: FilePath -> Text -> IO (Maybe Text)
+branchSha workspace ref = do
+ result <- runGit workspace ["rev-parse", "--verify", T.unpack ref <> "^{commit}"]
+ case result of
+ Right sha -> pure (Just (T.pack sha))
+ Left _ -> pure Nothing
+
+-- | Check if a workspace has no uncommitted changes.
+isCleanWorkspace :: FilePath -> IO Bool
+isCleanWorkspace workspace = do
+ result <- runGit workspace ["status", "--porcelain"]
+ case result of
+ Right "" -> pure True
+ Right _ -> pure False
+ Left _ -> pure False
+
+-- | Check if a path is a valid git worktree.
+worktreeExists :: FilePath -> IO Bool
+worktreeExists path = do
+ result <- runGit path ["rev-parse", "--is-inside-work-tree"]
+ case result of
+ Right "true" -> pure True
+ _ -> pure False
+
+-- | Revert the last commit (keep it in the history as a revert).
+revertLastCommit :: FilePath -> IO (Either GitError ())
+revertLastCommit workspace =
+ fmap (() <$) (runGit workspace ["revert", "--no-edit", "HEAD"])
+
+-- | Reset the current branch to a given ref (hard reset).
+hardReset :: FilePath -> Text -> IO (Either GitError ())
+hardReset workspace ref =
+ fmap (() <$) (runGit workspace ["reset", "--hard", T.unpack ref])
+
+-- | Get the current branch name.
+currentBranch :: FilePath -> IO (Maybe Text)
+currentBranch workspace = do
+ result <- runGit workspace ["rev-parse", "--abbrev-ref", "HEAD"]
+ case result of
+ Right b -> pure (Just (T.pack b))
+ Left _ -> pure Nothing
+
+-- | Fast-forward pull (for the integration worktree).
+pullFf :: FilePath -> IO (Either GitError ())
+pullFf workspace =
+ fmap (() <$) (runGit workspace ["pull", "--ff-only"])
+
+-- | Check if a branch exists in the repo.
+branchExists :: FilePath -> Text -> IO Bool
+branchExists repoRoot branch = do
+ result <- runGitRepo repoRoot ["show-ref", "--verify", "--quiet", "refs/heads/" <> T.unpack branch]
+ case result of
+ Right _ -> pure True
+ Left _ -> pure False
+
+-- | Fetch to ensure we have latest refs (useful when worktrees may be stale).
+fetch :: FilePath -> IO (Either GitError ())
+fetch workspace =
+ fmap (() <$) (runGit workspace ["fetch", "--quiet"])
diff --git a/Omni/Pipeline/Integrate.hs b/Omni/Pipeline/Integrate.hs
new file mode 100644
index 00000000..9327842f
--- /dev/null
+++ b/Omni/Pipeline/Integrate.hs
@@ -0,0 +1,71 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Deterministic integration: cherry-pick task commits into the base branch.
+--
+-- Replaces the LLM integrator. No agent involved.
+module Omni.Pipeline.Integrate where
+
+import Alpha
+import qualified Data.Text as T
+import qualified Omni.Pipeline.Core as Core
+import qualified Omni.Pipeline.Git as Git
+import qualified Omni.Pipeline.Verify as Verify
+import qualified Omni.Pipeline.Workspace as Workspace
+
+-- | Integrate a verified task into the base branch.
+--
+-- 1. Ensure integration worktree is on baseBranch
+-- 2. Cherry-pick the task commit
+-- 3. Verify the build on the base branch
+-- 4. Return result (caller handles task status + cleanup)
+integrateTask ::
+ Workspace.WorkspacePool ->
+ FilePath ->
+ Text ->
+ Text ->
+ Maybe Text ->
+ IO Core.IntegrateResult
+integrateTask pool repoRoot taskId baseBranch maybeNamespace = do
+ -- 1. Ensure integration worktree
+ wsResult <- Workspace.ensureIntegrationWorktree pool
+ case wsResult of
+ Left e -> pure (Core.Conflict ("Integration worktree setup failed: " <> e))
+ Right workspace -> do
+ -- Make sure we're up to date
+ cur <- Git.currentBranch workspace
+ case cur of
+ Nothing -> pure (Core.Conflict "Cannot determine current branch in integration worktree")
+ Just branch -> do
+ -- Verify we're on the right branch (or a tracking branch)
+ when (branch /= baseBranch) <| do
+ _ <- Git.checkoutBranch workspace baseBranch
+ pure ()
+
+ -- 2. Cherry-pick the task commit
+ pickResult <- Git.cherryPick workspace taskId
+ case pickResult of
+ Left e -> do
+ -- Conflict — abort and report
+ _ <- Git.abortCherryPick workspace
+ pure
+ ( Core.Conflict
+ ( "Cherry-pick of "
+ <> taskId
+ <> " failed: "
+ <> Git.geStderr e
+ )
+ )
+ Right commitHash -> do
+ -- 3. Verify build on base branch
+ verifyResult <- Verify.verifyOnBase workspace maybeNamespace
+ case verifyResult of
+ Core.VerifyPass ->
+ pure (Core.Integrated commitHash)
+ Core.VerifySkip _ ->
+ -- No buildable namespace = trust the pre-integration verify
+ pure (Core.Integrated commitHash)
+ Core.VerifyFail reason -> do
+ -- Revert the cherry-pick
+ _ <- Git.revertLastCommit workspace
+ pure (Core.IntegrateBuildFail reason)
diff --git a/Omni/Pipeline/State.hs b/Omni/Pipeline/State.hs
new file mode 100644
index 00000000..5aeb1579
--- /dev/null
+++ b/Omni/Pipeline/State.hs
@@ -0,0 +1,161 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Pipeline state persistence in SQLite.
+--
+-- Tracks per-phase run records: dev agent runs, verify results, and
+-- integration attempts. Replaces the comment-mining approach.
+module Omni.Pipeline.State where
+
+import Alpha
+import qualified Data.Text as T
+import Data.Time (UTCTime, getCurrentTime)
+import qualified Database.SQLite.Simple as SQL
+import qualified Database.SQLite.Simple.FromField as SQL
+import qualified Database.SQLite.Simple.ToField as SQL
+import qualified Omni.Pipeline.Core as Core
+
+-- | Open (or create) the pipeline state DB and ensure schema exists.
+initPipelineDb :: FilePath -> IO SQL.Connection
+initPipelineDb dbPath = do
+ conn <- SQL.open dbPath
+ SQL.execute_ conn "PRAGMA journal_mode=WAL"
+ SQL.execute_
+ conn
+ "CREATE TABLE IF NOT EXISTS pipeline_runs (\
+ \ id INTEGER PRIMARY KEY AUTOINCREMENT, \
+ \ task_id TEXT NOT NULL, \
+ \ phase TEXT NOT NULL, \
+ \ patchset INTEGER NOT NULL, \
+ \ agentd_run_id TEXT, \
+ \ status TEXT NOT NULL, \
+ \ cost_cents REAL DEFAULT 0, \
+ \ started_at TIMESTAMP NOT NULL, \
+ \ finished_at TIMESTAMP, \
+ \ error_summary TEXT \
+ \)"
+ SQL.execute_
+ conn
+ "CREATE INDEX IF NOT EXISTS idx_pipeline_runs_task \
+ \ ON pipeline_runs(task_id)"
+ SQL.execute_
+ conn
+ "CREATE INDEX IF NOT EXISTS idx_pipeline_runs_status \
+ \ ON pipeline_runs(status)"
+ pure conn
+
+-- | Record a new pipeline run. Returns the row ID.
+recordRun :: SQL.Connection -> Core.RunRecord -> IO Int64
+recordRun conn rr = do
+ SQL.execute
+ conn
+ "INSERT INTO pipeline_runs (task_id, phase, patchset, agentd_run_id, status, cost_cents, started_at, finished_at, error_summary) \
+ \ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
+ ( Core.rrTaskId rr,
+ Core.phaseToText (Core.rrPhase rr),
+ Core.rrPatchset rr,
+ Core.rrAgentdRunId rr,
+ Core.runStatusToText (Core.rrStatus rr),
+ Core.rrCostCents rr,
+ Core.rrStartedAt rr,
+ Core.rrFinishedAt rr,
+ Core.rrError rr
+ )
+ SQL.lastInsertRowId conn
+
+-- | Mark a run as finished with a status, optional cost, and optional error.
+markRunFinished :: SQL.Connection -> Int64 -> Core.RunStatus -> Maybe Double -> Maybe Text -> IO ()
+markRunFinished conn rowId status maybeCost maybeError = do
+ now <- getCurrentTime
+ SQL.execute
+ conn
+ "UPDATE pipeline_runs SET status = ?, finished_at = ?, cost_cents = COALESCE(?, cost_cents), error_summary = COALESCE(?, error_summary) WHERE id = ?"
+ (Core.runStatusToText status, now, maybeCost, maybeError, rowId)
+
+-- | Count failed dev runs for a task at a given patchset.
+getRetryCount :: SQL.Connection -> Text -> Core.Phase -> Int -> IO Int
+getRetryCount conn taskId phase patchset = do
+ results <-
+ SQL.query
+ conn
+ "SELECT COUNT(*) FROM pipeline_runs WHERE task_id = ? AND phase = ? AND patchset = ? AND status = 'failure'"
+ (taskId, Core.phaseToText phase, patchset) ::
+ IO [SQL.Only Int]
+ case results of
+ [SQL.Only n] -> pure n
+ _ -> pure 0
+
+-- | Get cumulative cost across all runs for a task.
+getCumulativeCost :: SQL.Connection -> Text -> IO Double
+getCumulativeCost conn taskId = do
+ results <-
+ SQL.query
+ conn
+ "SELECT COALESCE(SUM(cost_cents), 0) FROM pipeline_runs WHERE task_id = ?"
+ (SQL.Only taskId) ::
+ IO [SQL.Only Double]
+ case results of
+ [SQL.Only n] -> pure n
+ _ -> pure 0
+
+-- | Get the error from the most recent verify failure for a task.
+-- Used to build review feedback for the next dev prompt.
+getLastVerifyFailure :: SQL.Connection -> Text -> IO (Maybe Text)
+getLastVerifyFailure conn taskId = do
+ results <-
+ SQL.query
+ conn
+ "SELECT error_summary FROM pipeline_runs WHERE task_id = ? AND phase = 'verify' AND status = 'failure' ORDER BY finished_at DESC LIMIT 1"
+ (SQL.Only taskId) ::
+ IO [SQL.Only (Maybe Text)]
+ case results of
+ [SQL.Only e] -> pure e
+ _ -> pure Nothing
+
+-- | Get all currently running dev runs.
+getActiveDevRuns :: SQL.Connection -> IO [(Int64, Text, Text, UTCTime)]
+getActiveDevRuns conn =
+ SQL.query_
+ conn
+ "SELECT id, task_id, agentd_run_id, started_at FROM pipeline_runs WHERE phase = 'dev' AND status = 'running' ORDER BY started_at ASC"
+
+-- | Get the last dev failure error for a task (any patchset).
+-- Used for general failure context.
+getLastDevFailure :: SQL.Connection -> Text -> IO (Maybe Text)
+getLastDevFailure conn taskId = do
+ results <-
+ SQL.query
+ conn
+ "SELECT error_summary FROM pipeline_runs WHERE task_id = ? AND phase = 'dev' AND status = 'failure' ORDER BY finished_at DESC LIMIT 1"
+ (SQL.Only taskId) ::
+ IO [SQL.Only (Maybe Text)]
+ case results of
+ [SQL.Only e] -> pure e
+ _ -> pure Nothing
+
+-- | Check if a task has any running dev runs.
+hasActiveRun :: SQL.Connection -> Text -> IO Bool
+hasActiveRun conn taskId = do
+ results <-
+ SQL.query
+ conn
+ "SELECT COUNT(*) FROM pipeline_runs WHERE task_id = ? AND status = 'running'"
+ (SQL.Only taskId) ::
+ IO [SQL.Only Int]
+ case results of
+ [SQL.Only n] -> pure (n > 0)
+ _ -> pure False
+
+-- | Get the most recent finished_at for a failed dev run on a given patchset.
+-- Used for exponential backoff calculation.
+getLastFailureTime :: SQL.Connection -> Text -> Int -> IO (Maybe UTCTime)
+getLastFailureTime conn taskId patchset = do
+ results <-
+ SQL.query
+ conn
+ "SELECT finished_at FROM pipeline_runs WHERE task_id = ? AND phase = 'dev' AND patchset = ? AND status = 'failure' ORDER BY finished_at DESC LIMIT 1"
+ (taskId, patchset) ::
+ IO [SQL.Only (Maybe UTCTime)]
+ case results of
+ [SQL.Only t] -> pure t
+ _ -> pure Nothing
diff --git a/Omni/Pipeline/Verify.hs b/Omni/Pipeline/Verify.hs
new file mode 100644
index 00000000..85055d54
--- /dev/null
+++ b/Omni/Pipeline/Verify.hs
@@ -0,0 +1,139 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Deterministic build verification.
+--
+-- Replaces the LLM reviewer. Runs bild and tests, returns structured results.
+module Omni.Pipeline.Verify where
+
+import Alpha
+import qualified Data.Text as T
+import qualified Omni.Pipeline.Core as Core
+import qualified Omni.Pipeline.Git as Git
+import qualified System.Exit as Exit
+import qualified System.Process as Process
+
+-- | Result of running bild on a namespace.
+data BildResult
+ = BildPass
+ | BildFail Int Text -- exit code, output
+ | BildNotBuildable -- exit code 2
+ deriving (Show, Eq)
+
+-- | Verify a task: check branch shape, run bild, run tests.
+--
+-- Runs in the dev worktree where the commit already exists.
+verifyTask :: FilePath -> Text -> Text -> Maybe Text -> IO Core.VerifyResult
+verifyTask workspace taskId baseBranch maybeNamespace = do
+ -- 1. Check branch shape: exactly 1 commit between base and task
+ countResult <- Git.commitCountBetween workspace baseBranch taskId
+ case countResult of
+ Left e ->
+ pure (Core.VerifyFail ("Branch shape check failed: " <> T.pack (show e)))
+ Right n
+ | n /= 1 ->
+ pure
+ ( Core.VerifyFail
+ ( "Expected 1 commit on "
+ <> taskId
+ <> " relative to "
+ <> baseBranch
+ <> ", got "
+ <> T.pack (show n)
+ )
+ )
+ | otherwise -> do
+ -- 2. Run build verification
+ case maybeNamespace of
+ Nothing -> pure (Core.VerifySkip "No namespace set, skipping build verification")
+ Just ns -> do
+ buildResult <- runBild workspace ns
+ case buildResult of
+ BildNotBuildable ->
+ pure (Core.VerifySkip ("Namespace " <> ns <> " is not buildable (bild exit 2)"))
+ BildFail code output ->
+ pure
+ ( Core.VerifyFail
+ ( "Build failed for "
+ <> ns
+ <> " (exit "
+ <> T.pack (show code)
+ <> "):\n"
+ <> output
+ )
+ )
+ BildPass -> do
+ -- 3. Run tests
+ testResult <- runBildTest workspace ns
+ case testResult of
+ BildNotBuildable -> pure Core.VerifyPass -- no tests to run
+ BildFail code output ->
+ pure
+ ( Core.VerifyFail
+ ( "Tests failed for "
+ <> ns
+ <> " (exit "
+ <> T.pack (show code)
+ <> "):\n"
+ <> output
+ )
+ )
+ BildPass -> pure Core.VerifyPass
+
+-- | Post-integration verification on the base branch.
+-- Just runs bild + tests, no branch shape check.
+verifyOnBase :: FilePath -> Maybe Text -> IO Core.VerifyResult
+verifyOnBase workspace maybeNamespace =
+ case maybeNamespace of
+ Nothing -> pure (Core.VerifySkip "No namespace, skipping post-integration verify")
+ Just ns -> do
+ buildResult <- runBild workspace ns
+ case buildResult of
+ BildNotBuildable -> pure (Core.VerifySkip ("Not buildable: " <> ns))
+ BildFail code output ->
+ pure
+ ( Core.VerifyFail
+ ( "Post-integration build failed for "
+ <> ns
+ <> " (exit "
+ <> T.pack (show code)
+ <> "):\n"
+ <> output
+ )
+ )
+ BildPass -> do
+ testResult <- runBildTest workspace ns
+ case testResult of
+ BildNotBuildable -> pure Core.VerifyPass
+ BildFail code output ->
+ pure
+ ( Core.VerifyFail
+ ( "Post-integration tests failed for "
+ <> ns
+ <> " (exit "
+ <> T.pack (show code)
+ <> "):\n"
+ <> output
+ )
+ )
+ BildPass -> pure Core.VerifyPass
+
+-- | Run `bild <namespace>` and interpret the exit code.
+runBild :: FilePath -> Text -> IO BildResult
+runBild workspace ns = runBildCmd workspace [T.unpack ns]
+
+-- | Run `bild --test <namespace>`.
+runBildTest :: FilePath -> Text -> IO BildResult
+runBildTest workspace ns = runBildCmd workspace ["--test", T.unpack ns]
+
+-- | Run a bild command and classify the result.
+runBildCmd :: FilePath -> [String] -> IO BildResult
+runBildCmd workspace args = do
+ (exitCode, stdout, stderr) <-
+ Process.readCreateProcessWithExitCode
+ (Process.proc "bild" args) {Process.cwd = Just workspace}
+ ""
+ case exitCode of
+ Exit.ExitSuccess -> pure BildPass
+ Exit.ExitFailure 2 -> pure BildNotBuildable
+ Exit.ExitFailure n -> pure (BildFail (fromIntegral n) (T.pack (stdout <> stderr)))
diff --git a/Omni/Pipeline/Workspace.hs b/Omni/Pipeline/Workspace.hs
new file mode 100644
index 00000000..a7abd566
--- /dev/null
+++ b/Omni/Pipeline/Workspace.hs
@@ -0,0 +1,137 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Worktree pool management for the pipeline.
+--
+-- Manages per-task dev worktrees and the persistent integration worktree.
+module Omni.Pipeline.Workspace where
+
+import Alpha
+import qualified Control.Concurrent.STM as STM
+import qualified Data.Map.Strict as Map
+import qualified Data.Text as T
+import qualified Omni.Pipeline.Core as Core
+import qualified Omni.Pipeline.Git as Git
+import qualified System.Directory as Dir
+
+-- | The workspace pool tracks active dev worktrees.
+data WorkspacePool = WorkspacePool
+ { wpActive :: STM.TVar (Map.Map Text FilePath),
+ wpConfig :: Core.PipelineConfig
+ }
+
+-- | Create a new workspace pool.
+newPool :: Core.PipelineConfig -> IO WorkspacePool
+newPool cfg = do
+ active <- STM.newTVarIO Map.empty
+ Dir.createDirectoryIfMissing True (Core.pcRoot cfg)
+ pure (WorkspacePool active cfg)
+
+-- | How many dev slots are available?
+slotsAvailable :: WorkspacePool -> IO Int
+slotsAvailable pool = do
+ active <- STM.readTVarIO (wpActive pool)
+ pure (Core.pcConcurrency (wpConfig pool) - Map.size active)
+
+-- | Acquire a dev worktree for a task. Creates the branch and worktree.
+-- Returns the workspace path, or an error message.
+acquireWorkspace :: WorkspacePool -> Text -> IO (Either Text FilePath)
+acquireWorkspace pool taskId = do
+ let cfg = wpConfig pool
+ workspace = Core.taskDir cfg taskId
+ repoRoot = Core.pcRepoRoot cfg
+ baseBranch = Core.pcBaseBranch cfg
+
+ slots <- slotsAvailable pool
+ if slots <= 0
+ then pure (Left "No workspace slots available")
+ else do
+ exists <- Git.worktreeExists workspace
+ if exists
+ then do
+ -- Worktree already exists (retry scenario). Just register it.
+ STM.atomically
+ <| STM.modifyTVar' (wpActive pool) (Map.insert taskId workspace)
+ pure (Right workspace)
+ else do
+ -- Check if branch already exists
+ hasBranch <- Git.branchExists repoRoot taskId
+ result <-
+ if hasBranch
+ then Git.createWorktreeExisting repoRoot workspace taskId
+ else Git.createWorktree repoRoot workspace taskId baseBranch
+ case result of
+ Left e -> pure (Left (T.pack (show e)))
+ Right () -> do
+ STM.atomically
+ <| STM.modifyTVar' (wpActive pool) (Map.insert taskId workspace)
+ pure (Right workspace)
+
+-- | Release a dev worktree: remove it and delete the task branch.
+releaseWorkspace :: WorkspacePool -> Text -> IO ()
+releaseWorkspace pool taskId = do
+ let cfg = wpConfig pool
+ workspace = Core.taskDir cfg taskId
+ repoRoot = Core.pcRepoRoot cfg
+
+ -- Remove from pool first
+ STM.atomically
+ <| STM.modifyTVar' (wpActive pool) (Map.delete taskId)
+
+ -- Remove worktree (ignore errors — might already be gone)
+ _ <- Git.removeWorktree repoRoot workspace
+
+ -- Delete branch (ignore errors — might not exist)
+ _ <- Git.deleteBranch repoRoot taskId
+
+ pure ()
+
+-- | Reset a workspace for retry: delete the branch, recreate from base.
+resetWorkspace :: WorkspacePool -> Text -> IO (Either Text FilePath)
+resetWorkspace pool taskId = do
+ releaseWorkspace pool taskId
+ acquireWorkspace pool taskId
+
+-- | Ensure the integration worktree exists and is on the base branch.
+ensureIntegrationWorktree :: WorkspacePool -> IO (Either Text FilePath)
+ensureIntegrationWorktree pool = do
+ let cfg = wpConfig pool
+ workspace = Core.integrationDir cfg
+ repoRoot = Core.pcRepoRoot cfg
+ baseBranch = Core.pcBaseBranch cfg
+ branch = baseBranch <> "-integration"
+
+ exists <- Git.worktreeExists workspace
+ if exists
+ then do
+ -- Make sure it's on the right branch
+ cur <- Git.currentBranch workspace
+ case cur of
+ Just b | b == branch || b == baseBranch -> pure (Right workspace)
+ _ -> do
+ result <- Git.checkoutBranch workspace baseBranch
+ case result of
+ Right () -> pure (Right workspace)
+ Left e -> pure (Left ("Integration worktree on wrong branch: " <> T.pack (show e)))
+ else do
+ hasBranch <- Git.branchExists repoRoot branch
+ result <-
+ if hasBranch
+ then Git.createWorktreeExisting repoRoot workspace branch
+ else Git.createWorktree repoRoot workspace branch baseBranch
+ case result of
+ Left e -> pure (Left ("Failed to create integration worktree: " <> T.pack (show e)))
+ Right () -> pure (Right workspace)
+
+-- | Startup recovery: scan for stale worktrees and return their task IDs.
+-- Does NOT clean them up — the caller decides what to do.
+findStaleWorktrees :: WorkspacePool -> IO [Text]
+findStaleWorktrees pool = do
+ let root = Core.pcRoot (wpConfig pool)
+ exists <- Dir.doesDirectoryExist root
+ if not exists
+ then pure []
+ else do
+ entries <- Dir.listDirectory root
+ let taskDirs = filter (\e -> e /= "integration" && e /= "state.db") entries
+ pure (map T.pack taskDirs)