← Back to task

Commit 9c3da38f

commit 9c3da38ff72f5913ee40ca59d479c7bd7c9a1f15
Author: Coder Agent <coder@agents.omni>
Date:   Sat Feb 14 15:57:37 2026

    Omni/Pipeline: implement automated dev-verify-ship
    
    Single-process pipeline replacing dev-review-release.sh.
    Only dev phase uses an LLM agent; verify (bild) and integrate
    (cherry-pick) are deterministic. Concurrent dev via workspace
    pool. Structured state in SQLite pipeline_runs table.
    
    Modules:
    - Core.hs: types, config
    - State.hs: pipeline_runs DB layer
    - Git.hs: typed git operations
    - Workspace.hs: worktree pool management
    - Verify.hs: deterministic build verification
    - Integrate.hs: cherry-pick integration
    - Dev.hs: agent spawning, prompt building
    - Pipeline.hs: CLI, main loop, status command
    
    Task-Id: t-603

diff --git a/Omni/Pipeline.hs b/Omni/Pipeline.hs
new file mode 100755
index 00000000..62acf994
--- /dev/null
+++ b/Omni/Pipeline.hs
@@ -0,0 +1,562 @@
+#!/usr/bin/env run.sh
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Pipeline: automated dev → verify → ship.
+--
+-- Single-process pipeline that drives tasks from Open through development,
+-- build verification, and integration into the base branch.
+--
+-- Only the dev phase uses an LLM agent. Verify and integrate are deterministic.
+--
+-- : out pipeline
+-- : dep aeson
+-- : dep optparse-applicative
+-- : dep sqlite-simple
+-- : dep async
+-- : dep stm
+module Omni.Pipeline where
+
+import Alpha
+import Control.Concurrent (threadDelay)
+import qualified Control.Concurrent.STM as STM
+import qualified Data.Map.Strict as Map
+import qualified Data.Text as T
+import qualified Data.Text.IO as TIO
+import Data.Time (UTCTime, diffUTCTime, getCurrentTime)
+import qualified Database.SQLite.Simple as SQL
+import qualified Omni.Cli as Cli
+import qualified Omni.Pipeline.Core as Core
+import qualified Omni.Pipeline.Dev as Dev
+import qualified Omni.Pipeline.Integrate as Integrate
+import qualified Omni.Pipeline.State as State
+import qualified Omni.Pipeline.Verify as Verify
+import qualified Omni.Pipeline.Workspace as Workspace
+import qualified Omni.Task.Core as Task
+import qualified Omni.Test as Test
+import qualified Options.Applicative as Opt
+import qualified System.Directory as Dir
+import qualified System.Environment as Env
+import qualified Text.Read as Read
+
+main :: IO ()
+main =
+  join (Opt.execParser (Cli.info (parser <**> Opt.helper) Cli.fullDesc))
+
+test :: Test.Tree
+test =
+  Test.group
+    "Omni.Pipeline"
+    [ Test.unit "buildDevPrompt includes task id" <| do
+        let task =
+              Task.Task
+                { Task.taskId = "t-999",
+                  Task.taskTitle = "Test task",
+                  Task.taskType = Task.WorkTask,
+                  Task.taskParent = Nothing,
+                  Task.taskNamespace = Just "Omni/Test.hs",
+                  Task.taskStatus = Task.Open,
+                  Task.taskPatchsetCount = 0,
+                  Task.taskPriority = Task.P2,
+                  Task.taskComplexity = Nothing,
+                  Task.taskDependencies = [],
+                  Task.taskDescription = "A test task",
+                  Task.taskComments = [],
+                  Task.taskTags = Task.Tags [],
+                  Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
+                  Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
+                }
+            prompt = Dev.buildDevPrompt "live" task 0 Nothing
+        Test.assertBool "prompt contains task id" (T.isInfixOf "t-999" prompt)
+        Test.assertBool "prompt says no status changes" (T.isInfixOf "Do NOT change task status" prompt),
+      --
+      Test.unit "buildDevPrompt includes review feedback" <| do
+        let task =
+              Task.Task
+                { Task.taskId = "t-888",
+                  Task.taskTitle = "Retry task",
+                  Task.taskType = Task.WorkTask,
+                  Task.taskParent = Nothing,
+                  Task.taskNamespace = Just "Omni/Foo.hs",
+                  Task.taskStatus = Task.InProgress,
+                  Task.taskPatchsetCount = 1,
+                  Task.taskPriority = Task.P2,
+                  Task.taskComplexity = Nothing,
+                  Task.taskDependencies = [],
+                  Task.taskDescription = "Fix the thing",
+                  Task.taskComments = [],
+                  Task.taskTags = Task.Tags [],
+                  Task.taskCreatedAt = Read.read "2026-01-01 00:00:00 UTC",
+                  Task.taskUpdatedAt = Read.read "2026-01-01 00:00:00 UTC"
+                }
+            prompt = Dev.buildDevPrompt "live" task 1 (Just "Build failed: missing import")
+        Test.assertBool "prompt contains feedback" (T.isInfixOf "missing import" prompt)
+        Test.assertBool "prompt mentions previous rejection" (T.isInfixOf "previous patchset was rejected" (T.toLower prompt))
+    ]
+
+-- ============================================================================
+-- CLI
+-- ============================================================================
+
+parser :: Cli.Parser (IO ())
+parser =
+  Cli.subparser
+    ( Cli.command "run" (Cli.info runParser (Cli.progDesc "Start the pipeline"))
+        <> Cli.command "status" (Cli.info statusParser (Cli.progDesc "Show pipeline health"))
+    )
+
+runParser :: Cli.Parser (IO ())
+runParser =
+  doRun
+    </ Cli.strOption (Cli.long "root" <> Cli.value "_/pipeline" <> Cli.help "Workspace root")
+    <*> Cli.strOption (Cli.long "base" <> Cli.value "live" <> Cli.help "Base branch")
+    <*> Cli.option Cli.auto (Cli.long "concurrency" <> Cli.value 2 <> Cli.help "Max concurrent dev agents")
+    <*> Cli.option Cli.auto (Cli.long "interval" <> Cli.value 10 <> Cli.help "Poll interval (seconds)")
+    <*> Cli.option Cli.auto (Cli.long "max-retries" <> Cli.value 5 <> Cli.help "Max retries per patchset")
+    <*> Cli.option Cli.auto (Cli.long "max-task-cost" <> Cli.value 0 <> Cli.help "Cost cap per task (cents, 0=unlimited)")
+    <*> Cli.option Cli.auto (Cli.long "timeout" <> Cli.value 1800 <> Cli.help "Agent timeout (seconds)")
+    <*> Cli.option Cli.auto (Cli.long "max-iter" <> Cli.value 80 <> Cli.help "Agent max iterations")
+    <*> Cli.option Cli.auto (Cli.long "max-cost" <> Cli.value 300 <> Cli.help "Agent max cost per run (cents)")
+    <*> Cli.optional (Cli.strOption (Cli.long "provider" <> Cli.help "Agent provider"))
+    <*> Cli.optional (Cli.strOption (Cli.long "parent" <> Cli.help "Only process tasks under this epic"))
+    <*> Cli.optional (Cli.strOption (Cli.long "task-id" <> Cli.help "Only process this one task"))
+    <*> Cli.switch (Cli.long "once" <> Cli.help "Process one cycle then exit")
+    <*> Cli.switch (Cli.long "dry-run" <> Cli.help "Print what would happen")
+
+statusParser :: Cli.Parser (IO ())
+statusParser =
+  doStatus
+    </ Cli.strOption (Cli.long "root" <> Cli.value "_/pipeline" <> Cli.help "Workspace root")
+    <*> Cli.switch (Cli.long "json" <> Cli.help "JSON output")
+
+-- ============================================================================
+-- Run command
+-- ============================================================================
+
+doRun ::
+  String ->
+  String ->
+  Int ->
+  Int ->
+  Int ->
+  Int ->
+  Int ->
+  Int ->
+  Int ->
+  Maybe String ->
+  Maybe String ->
+  Maybe String ->
+  Bool ->
+  Bool ->
+  IO ()
+doRun root base concurrency interval maxRetries maxTaskCost timeout maxIter maxCost provider parent taskFilter once dryRun = do
+  repoRoot <- Env.getEnv "CODEROOT"
+  let cfg =
+        Core.PipelineConfig
+          { Core.pcRoot = root,
+            Core.pcRepoRoot = repoRoot,
+            Core.pcBaseBranch = T.pack base,
+            Core.pcConcurrency = concurrency,
+            Core.pcPollInterval = interval,
+            Core.pcMaxRetries = maxRetries,
+            Core.pcMaxTaskCost = maxTaskCost,
+            Core.pcAgentTimeout = timeout,
+            Core.pcAgentMaxIter = maxIter,
+            Core.pcAgentMaxCost = maxCost,
+            Core.pcAgentProvider = T.pack </ provider,
+            Core.pcParentFilter = T.pack </ parent,
+            Core.pcTaskFilter = T.pack </ taskFilter,
+            Core.pcDryRun = dryRun
+          }
+
+  -- Initialize
+  Dir.createDirectoryIfMissing True root
+  db <- State.initPipelineDb (root <> "/state.db")
+  pool <- Workspace.newPool cfg
+  activeDevs <- STM.newTVarIO (Map.empty :: Map.Map Text Core.ActiveDev)
+
+  logMsg "Pipeline starting"
+  logMsg ("root=" <> T.pack root <> " base=" <> T.pack base <> " concurrency=" <> T.pack (show concurrency))
+
+  -- Startup recovery
+  recoverStaleState db pool activeDevs
+
+  -- Main loop
+  let loop = do
+        runOneCycle cfg db pool activeDevs
+        if once
+          then logMsg "Single cycle complete, exiting"
+          else do
+            threadDelay (interval * 1_000_000)
+            loop
+  loop
+
+-- | Run one full pipeline cycle.
+runOneCycle ::
+  Core.PipelineConfig ->
+  SQL.Connection ->
+  Workspace.WorkspacePool ->
+  STM.TVar (Map.Map Text Core.ActiveDev) ->
+  IO ()
+runOneCycle cfg db pool activeDevs = do
+  -- Load all tasks once per cycle
+  allTasks <- Task.loadTasks
+
+  let workTasks = filter isEligible allTasks
+      byStatus s = filter (\t -> Task.taskStatus t == s) workTasks
+
+  -- 1. INTEGRATE: process Verified tasks
+  let verified = byStatus Task.Verified
+  forM_ verified <| \task -> do
+    let tid = Task.taskId task
+    logMsg ("Integrating: " <> tid)
+    if Core.pcDryRun cfg
+      then logMsg ("DRY RUN: would integrate " <> tid)
+      else doIntegrate cfg db pool task
+
+  -- 2. HARVEST + VERIFY: check running dev agents
+  active <- STM.readTVarIO activeDevs
+  forM_ (Map.toList active) <| \(tid, ad) -> do
+    result <- Dev.checkDevStatus ad
+    case result of
+      Core.StillRunning -> pure ()
+      Core.DevSuccess maybeSha -> do
+        logMsg ("Dev completed for " <> tid <> " (sha=" <> fromMaybe "?" maybeSha <> ")")
+        cost <- Dev.getRunCost (Core.adRunId ad)
+        State.markRunFinished db (Core.adRunDbId ad) Core.Success (Just cost) Nothing
+        _ <- Task.addComment tid ("Pipeline: dev completed (run=" <> Core.adRunId ad <> ", cost=" <> T.pack (show cost) <> "c)") Task.System
+        -- Remove from active
+        STM.atomically <| STM.modifyTVar' activeDevs (Map.delete tid)
+        -- Immediately verify
+        doVerify cfg db ad
+      Core.DevFailed err -> do
+        logMsg ("Dev failed for " <> tid <> ": " <> err)
+        cost <- Dev.getRunCost (Core.adRunId ad)
+        State.markRunFinished db (Core.adRunDbId ad) Core.Failure (Just cost) (Just err)
+        _ <- Task.addComment tid ("Pipeline: dev failed (run=" <> Core.adRunId ad <> "): " <> err) Task.System
+        -- Remove from active, reset task to Open
+        STM.atomically <| STM.modifyTVar' activeDevs (Map.delete tid)
+        Task.updateTaskStatus tid Task.Open []
+
+  -- 3. DEVELOP: spawn agents for Open tasks
+  let open = byStatus Task.Open
+  slots <- Workspace.slotsAvailable pool
+  currentActive <- STM.readTVarIO activeDevs
+  let availableSlots = slots - Map.size currentActive
+  forM_ (take availableSlots open) <| \task -> do
+    let tid = Task.taskId task
+    -- Check retry/cost limits
+    canRetry <- checkRetryLimits cfg db task
+    when canRetry <| do
+      if Core.pcDryRun cfg
+        then logMsg ("DRY RUN: would start dev for " <> tid)
+        else doSpawnDev cfg db pool activeDevs task
+  where
+    isEligible task =
+      Task.taskType task
+        == Task.WorkTask
+        && matchesFilter (Core.pcTaskFilter cfg) (Task.taskId task)
+        && matchesParent (Core.pcParentFilter cfg) (Task.taskParent task)
+
+    matchesFilter Nothing _ = True
+    matchesFilter (Just f) tid = f == tid
+
+    matchesParent Nothing _ = True
+    matchesParent (Just f) parent = parent == Just f
+
+-- | Integrate a verified task.
+doIntegrate ::
+  Core.PipelineConfig ->
+  SQL.Connection ->
+  Workspace.WorkspacePool ->
+  Task.Task ->
+  IO ()
+doIntegrate cfg db pool task = do
+  let tid = Task.taskId task
+      baseBranch = Core.pcBaseBranch cfg
+      repoRoot = Core.pcRepoRoot cfg
+      ns = Task.taskNamespace task
+  now <- getCurrentTime
+
+  -- Record the integration attempt
+  rowId <-
+    State.recordRun
+      db
+      Core.RunRecord
+        { Core.rrId = Nothing,
+          Core.rrTaskId = tid,
+          Core.rrPhase = Core.Integrate,
+          Core.rrPatchset = Task.taskPatchsetCount task,
+          Core.rrAgentdRunId = Nothing,
+          Core.rrStatus = Core.Running,
+          Core.rrCostCents = 0,
+          Core.rrStartedAt = now,
+          Core.rrFinishedAt = Nothing,
+          Core.rrError = Nothing
+        }
+
+  result <- Integrate.integrateTask pool repoRoot tid baseBranch ns
+  case result of
+    Core.Integrated commitHash -> do
+      State.markRunFinished db rowId Core.Success Nothing Nothing
+      Task.updateTaskStatus tid Task.Done []
+      _ <- Task.addComment tid ("Pipeline: integrated into " <> baseBranch <> " at " <> commitHash) Task.System
+      Workspace.releaseWorkspace pool tid
+      logMsg ("Integrated " <> tid <> " at " <> commitHash)
+    Core.Conflict details -> do
+      State.markRunFinished db rowId Core.Failure Nothing (Just details)
+      Task.updateTaskStatus tid Task.Open []
+      _ <- Task.addComment tid ("Pipeline: integration conflict, resetting for re-dev: " <> details) Task.System
+      -- Reset workspace so dev starts fresh
+      _ <- Workspace.resetWorkspace pool tid
+      logMsg ("Integration conflict for " <> tid <> ": " <> details)
+    Core.IntegrateBuildFail details -> do
+      State.markRunFinished db rowId Core.Failure Nothing (Just details)
+      Task.updateTaskStatus tid Task.Open []
+      _ <- Task.addComment tid ("Pipeline: post-integration build failed, resetting: " <> details) Task.System
+      _ <- Workspace.resetWorkspace pool tid
+      logMsg ("Integration build failed for " <> tid <> ": " <> details)
+
+-- | Verify a completed dev run.
+doVerify ::
+  Core.PipelineConfig ->
+  SQL.Connection ->
+  Core.ActiveDev ->
+  IO ()
+doVerify cfg db ad = do
+  let tid = Core.adTaskId ad
+      baseBranch = Core.pcBaseBranch cfg
+      workspace = Core.adWorkspace ad
+      ns = Core.adNamespace ad
+  now <- getCurrentTime
+
+  rowId <-
+    State.recordRun
+      db
+      Core.RunRecord
+        { Core.rrId = Nothing,
+          Core.rrTaskId = tid,
+          Core.rrPhase = Core.Verify,
+          Core.rrPatchset = Core.adPatchset ad,
+          Core.rrAgentdRunId = Nothing,
+          Core.rrStatus = Core.Running,
+          Core.rrCostCents = 0,
+          Core.rrStartedAt = now,
+          Core.rrFinishedAt = Nothing,
+          Core.rrError = Nothing
+        }
+
+  result <- Verify.verifyTask workspace tid baseBranch ns
+  case result of
+    Core.VerifyPass -> do
+      State.markRunFinished db rowId Core.Success Nothing Nothing
+      Task.updateTaskStatus tid Task.Verified []
+      _ <- Task.addComment tid "Pipeline: build verification passed" Task.System
+      logMsg ("Verified " <> tid)
+    Core.VerifySkip reason -> do
+      State.markRunFinished db rowId Core.Success Nothing (Just reason)
+      Task.updateTaskStatus tid Task.Verified []
+      _ <- Task.addComment tid ("Pipeline: verification skipped (" <> reason <> "), promoting") Task.System
+      logMsg ("Verified (skipped) " <> tid <> ": " <> reason)
+    Core.VerifyFail reason -> do
+      State.markRunFinished db rowId Core.Failure Nothing (Just reason)
+      Task.updateTaskStatus tid Task.Open []
+      _ <- Task.addComment tid ("Pipeline: verification failed:\n" <> reason) Task.System
+      logMsg ("Verify failed for " <> tid <> ": " <> T.take 200 reason)
+
+-- | Spawn a dev agent for a task.
+doSpawnDev ::
+  Core.PipelineConfig ->
+  SQL.Connection ->
+  Workspace.WorkspacePool ->
+  STM.TVar (Map.Map Text Core.ActiveDev) ->
+  Task.Task ->
+  IO ()
+doSpawnDev cfg db pool activeDevs task = do
+  let tid = Task.taskId task
+      patchset = Task.taskPatchsetCount task
+  now <- getCurrentTime
+
+  -- Claim the task
+  Task.updateTaskStatus tid Task.InProgress []
+
+  -- Get or create workspace
+  wsResult <- Workspace.acquireWorkspace pool tid
+  case wsResult of
+    Left e -> do
+      logMsg ("Failed to acquire workspace for " <> tid <> ": " <> e)
+      Task.updateTaskStatus tid Task.Open []
+      pure ()
+    Right workspace -> do
+      -- Get review feedback from last verify failure
+      reviewFeedback <- State.getLastVerifyFailure db tid
+
+      -- Record the run
+      rowId <-
+        State.recordRun
+          db
+          Core.RunRecord
+            { Core.rrId = Nothing,
+              Core.rrTaskId = tid,
+              Core.rrPhase = Core.Dev,
+              Core.rrPatchset = patchset,
+              Core.rrAgentdRunId = Nothing,
+              Core.rrStatus = Core.Running,
+              Core.rrCostCents = 0,
+              Core.rrStartedAt = now,
+              Core.rrFinishedAt = Nothing,
+              Core.rrError = Nothing
+            }
+
+      -- Spawn the agent
+      result <- Dev.spawnDev cfg workspace task patchset reviewFeedback rowId
+      case result of
+        Left e -> do
+          State.markRunFinished db rowId Core.Failure Nothing (Just e)
+          Task.updateTaskStatus tid Task.Open []
+          _ <- Task.addComment tid ("Pipeline: failed to spawn dev agent: " <> e) Task.System
+          logMsg ("Failed to spawn dev for " <> tid <> ": " <> e)
+        Right ad -> do
+          -- Update the run record with the agentd run ID
+          -- (we didn't have it when we created the record)
+          STM.atomically <| STM.modifyTVar' activeDevs (Map.insert tid ad)
+          logMsg ("Spawned dev for " <> tid <> " (run=" <> Core.adRunId ad <> ")")
+
+-- | Check if a task can be retried (within retry and cost limits).
+checkRetryLimits ::
+  Core.PipelineConfig ->
+  SQL.Connection ->
+  Task.Task ->
+  IO Bool
+checkRetryLimits cfg db task = do
+  let tid = Task.taskId task
+      patchset = Task.taskPatchsetCount task
+      maxRetries = Core.pcMaxRetries cfg
+      maxCost = Core.pcMaxTaskCost cfg
+
+  -- Check retry count
+  retryCount <- State.getRetryCount db tid Core.Dev patchset
+  when (maxRetries > 0 && retryCount >= maxRetries) <| do
+    logMsg ("Task " <> tid <> " exceeded max retries (" <> T.pack (show retryCount) <> "/" <> T.pack (show maxRetries) <> ")")
+    Task.updateTaskStatus tid Task.NeedsHelp []
+    _ <- Task.addComment tid ("Pipeline: exceeded " <> T.pack (show maxRetries) <> " retries on patchset " <> T.pack (show patchset)) Task.System
+    pure ()
+
+  if maxRetries > 0 && retryCount >= maxRetries
+    then pure False
+    else do
+      -- Check cost cap
+      if maxCost > 0
+        then do
+          cost <- State.getCumulativeCost db tid
+          if cost >= fromIntegral maxCost
+            then do
+              logMsg ("Task " <> tid <> " exceeded cost cap (" <> T.pack (show cost) <> "c)")
+              Task.updateTaskStatus tid Task.NeedsHelp []
+              _ <- Task.addComment tid ("Pipeline: exceeded cost cap of " <> T.pack (show maxCost) <> "c") Task.System
+              pure False
+            else checkBackoff cfg db tid patchset retryCount
+        else checkBackoff cfg db tid patchset retryCount
+
+-- | Check exponential backoff.
+checkBackoff ::
+  Core.PipelineConfig ->
+  SQL.Connection ->
+  Text ->
+  Int ->
+  Int ->
+  IO Bool
+checkBackoff cfg db tid patchset retryCount = do
+  if retryCount == 0
+    then pure True
+    else do
+      lastFail <- State.getLastFailureTime db tid patchset
+      case lastFail of
+        Nothing -> pure True
+        Just failTime -> do
+          now <- getCurrentTime
+          let baseInterval = fromIntegral (Core.pcPollInterval cfg) :: Double
+              delay = min 600 (baseInterval * (2 ^ retryCount))
+              elapsed = realToFrac (diffUTCTime now failTime) :: Double
+          if elapsed >= delay
+            then pure True
+            else do
+              logMsg ("Task " <> tid <> " in backoff (" <> T.pack (show (round (delay - elapsed) :: Int)) <> "s remaining)")
+              pure False
+
+-- | Recover stale state on startup.
+recoverStaleState ::
+  SQL.Connection ->
+  Workspace.WorkspacePool ->
+  STM.TVar (Map.Map Text Core.ActiveDev) ->
+  IO ()
+recoverStaleState db pool _activeDevs = do
+  -- Find worktrees that exist but have no running agent
+  staleTasks <- Workspace.findStaleWorktrees pool
+  forM_ staleTasks <| \tid -> do
+    hasRun <- State.hasActiveRun db tid
+    unless hasRun <| do
+      logMsg ("Recovering stale worktree for " <> tid)
+      -- Check task status — if InProgress with no agent, reset to Open
+      tasks <- Task.loadTasks
+      case Task.findTask tid tasks of
+        Nothing -> do
+          -- Unknown task, just clean up
+          Workspace.releaseWorkspace pool tid
+        Just task -> do
+          when (Task.taskStatus task == Task.InProgress) <| do
+            Task.updateTaskStatus tid Task.Open []
+            _ <- Task.addComment tid "Pipeline: recovered stale InProgress task on startup" Task.System
+            pure ()
+
+-- ============================================================================
+-- Status command
+-- ============================================================================
+
+doStatus :: String -> Bool -> IO ()
+doStatus root asJson = do
+  let dbPath = root <> "/state.db"
+  exists <- Dir.doesFileExist dbPath
+  unless exists <| do
+    TIO.putStrLn "Pipeline state DB not found. Has the pipeline been started?"
+    pure ()
+  when exists <| do
+    db <- State.initPipelineDb dbPath
+    allTasks <- Task.loadTasks
+
+    let workTasks = filter (\t -> Task.taskType t == Task.WorkTask) allTasks
+        countByStatus s = length (filter (\t -> Task.taskStatus t == s) workTasks)
+
+    activeRuns <- State.getActiveDevRuns db
+
+    if asJson
+      then do
+        -- Simple JSON output
+        TIO.putStrLn "{"
+        TIO.putStrLn ("  \"open\": " <> T.pack (show (countByStatus Task.Open)) <> ",")
+        TIO.putStrLn ("  \"in_progress\": " <> T.pack (show (countByStatus Task.InProgress)) <> ",")
+        TIO.putStrLn ("  \"verified\": " <> T.pack (show (countByStatus Task.Verified)) <> ",")
+        TIO.putStrLn ("  \"done\": " <> T.pack (show (countByStatus Task.Done)) <> ",")
+        TIO.putStrLn ("  \"needs_help\": " <> T.pack (show (countByStatus Task.NeedsHelp)) <> ",")
+        TIO.putStrLn ("  \"active_dev_runs\": " <> T.pack (show (length activeRuns)))
+        TIO.putStrLn "}"
+      else do
+        TIO.putStrLn ("Pipeline status (root=" <> T.pack root <> ")")
+        TIO.putStrLn ""
+        TIO.putStrLn "Tasks:"
+        TIO.putStrLn ("  Open:        " <> T.pack (show (countByStatus Task.Open)))
+        TIO.putStrLn ("  InProgress:  " <> T.pack (show (countByStatus Task.InProgress)))
+        TIO.putStrLn ("  Verified:    " <> T.pack (show (countByStatus Task.Verified)))
+        TIO.putStrLn ("  Done:        " <> T.pack (show (countByStatus Task.Done)))
+        TIO.putStrLn ("  NeedsHelp:   " <> T.pack (show (countByStatus Task.NeedsHelp)))
+        TIO.putStrLn ""
+        TIO.putStrLn ("Active dev runs: " <> T.pack (show (length activeRuns)))
+
+-- ============================================================================
+-- Helpers
+-- ============================================================================
+
+logMsg :: Text -> IO ()
+logMsg msg = do
+  now <- getCurrentTime
+  TIO.putStrLn ("[" <> T.pack (show now) <> "] " <> msg)
diff --git a/Omni/Pipeline/Core.hs b/Omni/Pipeline/Core.hs
new file mode 100644
index 00000000..8b26be64
--- /dev/null
+++ b/Omni/Pipeline/Core.hs
@@ -0,0 +1,147 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Core types and configuration for the pipeline.
+module Omni.Pipeline.Core where
+
+import Alpha
+import Data.Aeson (FromJSON, ToJSON)
+import qualified Data.Aeson as Aeson
+import qualified Data.Text as T
+import Data.Time (UTCTime)
+
+-- | Pipeline configuration, parsed from CLI flags.
+data PipelineConfig = PipelineConfig
+  { pcRoot :: FilePath,
+    pcRepoRoot :: FilePath,
+    pcBaseBranch :: Text,
+    pcConcurrency :: Int,
+    pcPollInterval :: Int, -- seconds
+    pcMaxRetries :: Int, -- per patchset
+    pcMaxTaskCost :: Int, -- cents, 0 = unlimited
+    pcAgentTimeout :: Int, -- seconds
+    pcAgentMaxIter :: Int,
+    pcAgentMaxCost :: Int, -- cents per run
+    pcAgentProvider :: Maybe Text,
+    pcParentFilter :: Maybe Text,
+    pcTaskFilter :: Maybe Text,
+    pcDryRun :: Bool
+  }
+  deriving (Show, Eq, Generic)
+
+defaultConfig :: PipelineConfig
+defaultConfig =
+  PipelineConfig
+    { pcRoot = "_/pipeline",
+      pcRepoRoot = ".",
+      pcBaseBranch = "live",
+      pcConcurrency = 2,
+      pcPollInterval = 10,
+      pcMaxRetries = 5,
+      pcMaxTaskCost = 0,
+      pcAgentTimeout = 1800,
+      pcAgentMaxIter = 80,
+      pcAgentMaxCost = 300,
+      pcAgentProvider = Nothing,
+      pcParentFilter = Nothing,
+      pcTaskFilter = Nothing,
+      pcDryRun = False
+    }
+
+-- | Pipeline execution phases.
+data Phase = Dev | Verify | Integrate
+  deriving (Show, Eq, Read, Generic)
+
+instance ToJSON Phase
+
+instance FromJSON Phase
+
+phaseToText :: Phase -> Text
+phaseToText Dev = "dev"
+phaseToText Verify = "verify"
+phaseToText Integrate = "integrate"
+
+phaseFromText :: Text -> Maybe Phase
+phaseFromText "dev" = Just Dev
+phaseFromText "verify" = Just Verify
+phaseFromText "integrate" = Just Integrate
+phaseFromText _ = Nothing
+
+-- | Status of a pipeline run.
+data RunStatus = Running | Success | Failure
+  deriving (Show, Eq, Read, Generic)
+
+instance ToJSON RunStatus
+
+instance FromJSON RunStatus
+
+runStatusToText :: RunStatus -> Text
+runStatusToText Running = "running"
+runStatusToText Success = "success"
+runStatusToText Failure = "failure"
+
+runStatusFromText :: Text -> Maybe RunStatus
+runStatusFromText "running" = Just Running
+runStatusFromText "success" = Just Success
+runStatusFromText "failure" = Just Failure
+runStatusFromText _ = Nothing
+
+-- | A record of one pipeline run (dev agent, verify, or integrate).
+data RunRecord = RunRecord
+  { rrId :: Maybe Int64,
+    rrTaskId :: Text,
+    rrPhase :: Phase,
+    rrPatchset :: Int,
+    rrAgentdRunId :: Maybe Text,
+    rrStatus :: RunStatus,
+    rrCostCents :: Double,
+    rrStartedAt :: UTCTime,
+    rrFinishedAt :: Maybe UTCTime,
+    rrError :: Maybe Text
+  }
+  deriving (Show, Eq, Generic)
+
+instance ToJSON RunRecord
+
+-- | Tracks an in-flight dev agent.
+data ActiveDev = ActiveDev
+  { adTaskId :: Text,
+    adRunId :: Text,
+    adWorkspace :: FilePath,
+    adStartedAt :: UTCTime,
+    adBeforeSha :: Maybe Text,
+    adPatchset :: Int,
+    adNamespace :: Maybe Text,
+    adRunDbId :: Int64
+  }
+  deriving (Show, Eq)
+
+-- | Result of build verification.
+data VerifyResult
+  = VerifyPass
+  | VerifyFail Text
+  | VerifySkip Text
+  deriving (Show, Eq)
+
+-- | Result of integration attempt.
+data IntegrateResult
+  = Integrated Text -- commit hash
+  | Conflict Text -- conflict details
+  | IntegrateBuildFail Text -- post-cherry-pick verify failed
+  deriving (Show, Eq)
+
+-- | Result of checking a dev agent's status.
+data DevResult
+  = StillRunning
+  | DevSuccess (Maybe Text) -- new commit SHA, if changed
+  | DevFailed Text -- error message
+  deriving (Show, Eq)
+
+-- | Workspace path for the integration worktree.
+integrationDir :: PipelineConfig -> FilePath
+integrationDir cfg = pcRoot cfg <> "/integration"
+
+-- | Workspace path for a task's dev worktree.
+taskDir :: PipelineConfig -> Text -> FilePath
+taskDir cfg tid = pcRoot cfg <> "/" <> T.unpack tid
diff --git a/Omni/Pipeline/Dev.hs b/Omni/Pipeline/Dev.hs
new file mode 100644
index 00000000..8bb6fe48
--- /dev/null
+++ b/Omni/Pipeline/Dev.hs
@@ -0,0 +1,204 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Dev phase: agent spawning and prompt building.
+--
+-- The only phase that uses an LLM agent. Builds a simplified prompt
+-- (agent just writes code, pipeline owns status transitions) and
+-- manages the agent lifecycle via agentd.
+module Omni.Pipeline.Dev where
+
+import Alpha
+import qualified Data.List as List
+import qualified Data.Text as T
+import qualified Data.Text.IO as TIO
+import Data.Time (UTCTime, getCurrentTime, toModifiedJulianDay, utctDay, utctDayTime)
+import qualified Omni.Pipeline.Core as Core
+import qualified Omni.Pipeline.Git as Git
+import qualified Omni.Task.Core as Task
+import qualified System.Directory as Dir
+import qualified System.Exit as Exit
+import qualified System.Process as Process
+import qualified Text.Read as Read
+
+-- | Build the dev prompt. Pure function — easy to test.
+buildDevPrompt :: Text -> Task.Task -> Int -> Maybe Text -> Text
+buildDevPrompt baseBranch task patchset maybeReviewFeedback =
+  T.unlines
+    [ "# Dev Task",
+      "",
+      "You are a coder. Implement the task below.",
+      "",
+      "## Rules",
+      "1. Work only in this workspace.",
+      "2. One commit on branch `" <> Task.taskId task <> "` with trailer `Task-Id: " <> Task.taskId task <> "`.",
+      "3. If a commit already exists (patchset > 0), amend it with `git commit --amend`.",
+      "4. Verify: `typecheck.sh " <> ns <> "` then `bild " <> ns <> "` then `bild --test " <> ns <> "`.",
+      "5. **Do NOT change task status.** The pipeline handles transitions.",
+      "6. **Do NOT push.**",
+      "7. Follow repository conventions from AGENTS.md.",
+      "",
+      "## Task",
+      "- ID: " <> Task.taskId task,
+      "- Branch: " <> Task.taskId task,
+      "- Base: " <> baseBranch,
+      "- Patchset: " <> T.pack (show patchset),
+      "- Namespace: " <> ns,
+      "",
+      "### Title",
+      Task.taskTitle task,
+      "",
+      "### Description",
+      Task.taskDescription task,
+      "",
+      feedback,
+      "### Constraints",
+      "- Work only in the provided workspace.",
+      "- Keep one-task-one-commit discipline.",
+      "- Commit message must include: `Task-Id: " <> Task.taskId task <> "`"
+    ]
+  where
+    ns = fromMaybe "(none)" (Task.taskNamespace task)
+    feedback = case maybeReviewFeedback of
+      Nothing -> ""
+      Just fb ->
+        T.unlines
+          [ "### Previous Review Feedback",
+            "The previous patchset was rejected. Fix the issues below:",
+            "",
+            fb,
+            ""
+          ]
+
+-- | Spawn a dev agent via agentd. Returns an ActiveDev record.
+spawnDev ::
+  Core.PipelineConfig ->
+  FilePath ->
+  Task.Task ->
+  Int ->
+  Maybe Text ->
+  Int64 ->
+  IO (Either Text Core.ActiveDev)
+spawnDev cfg workspace task patchset maybeReviewFeedback dbRowId = do
+  let taskId = Task.taskId task
+      baseBranch = Core.pcBaseBranch cfg
+
+  -- Get commit SHA before agent runs (to detect changes later)
+  beforeSha <- Git.branchSha workspace taskId
+
+  -- Build and write prompt
+  let prompt = buildDevPrompt baseBranch task patchset maybeReviewFeedback
+  now <- getCurrentTime
+  let runId = "dev-" <> T.unpack taskId <> "-" <> show (floor (utcTimeToPOSIXSeconds now) :: Integer)
+      promptDir = workspace <> "/_/tmp/pipeline"
+      promptFile = promptDir <> "/" <> runId <> ".md"
+  Dir.createDirectoryIfMissing True promptDir
+  TIO.writeFile promptFile prompt
+
+  -- Build agentd command
+  let args =
+        ["run", promptFile, "-n", runId, "--timeout", show (Core.pcAgentTimeout cfg), "--max-iter", show (Core.pcAgentMaxIter cfg), "--max-cost", show (Core.pcAgentMaxCost cfg)]
+          ++ maybe [] (\p -> ["-p", T.unpack p]) (Core.pcAgentProvider cfg)
+
+  -- Spawn in background (don't use --fg)
+  (exitCode, _stdout, stderr) <-
+    Process.readCreateProcessWithExitCode
+      (Process.proc "agentd" args) {Process.cwd = Just workspace}
+      ""
+
+  case exitCode of
+    Exit.ExitSuccess ->
+      pure
+        <| Right
+          Core.ActiveDev
+            { Core.adTaskId = taskId,
+              Core.adRunId = T.pack runId,
+              Core.adWorkspace = workspace,
+              Core.adStartedAt = now,
+              Core.adBeforeSha = beforeSha,
+              Core.adPatchset = patchset,
+              Core.adNamespace = Task.taskNamespace task,
+              Core.adRunDbId = dbRowId
+            }
+    Exit.ExitFailure _ ->
+      pure (Left ("agentd spawn failed: " <> T.pack stderr))
+
+-- | Check the status of a running dev agent.
+checkDevStatus :: Core.ActiveDev -> IO Core.DevResult
+checkDevStatus ad = do
+  -- Poll agentd status
+  (exitCode, stdout, _stderr) <-
+    Process.readProcessWithExitCode
+      "agentd"
+      ["status", T.unpack (Core.adRunId ad), "--json"]
+      ""
+  case exitCode of
+    Exit.ExitFailure _ ->
+      -- Can't determine status — treat as still running
+      -- (agentd might not have started yet)
+      pure Core.StillRunning
+    Exit.ExitSuccess -> do
+      let status = extractField "status" stdout
+      case status of
+        Just "running" -> pure Core.StillRunning
+        Just "completed" -> checkForCommit ad
+        Just "failed" ->
+          pure (Core.DevFailed (T.pack (fromMaybe "Agent run failed" (extractField "error" stdout))))
+        Just "timeout" ->
+          pure (Core.DevFailed "Agent run timed out")
+        _ -> pure Core.StillRunning
+
+-- | After agent completes, check if a new commit was produced.
+checkForCommit :: Core.ActiveDev -> IO Core.DevResult
+checkForCommit ad = do
+  afterSha <- Git.branchSha (Core.adWorkspace ad) (Core.adTaskId ad)
+  case (Core.adBeforeSha ad, afterSha) of
+    (_, Nothing) -> pure (Core.DevFailed "Task branch not found after agent run")
+    (before, Just after)
+      | before == Just after -> pure (Core.DevFailed "Agent completed but made no commit")
+      | otherwise -> pure (Core.DevSuccess (Just after))
+
+-- | Get the cost of an agentd run.
+getRunCost :: Text -> IO Double
+getRunCost runId = do
+  (exitCode, stdout, _) <-
+    Process.readProcessWithExitCode
+      "agentd"
+      ["status", T.unpack runId, "--json"]
+      ""
+  case exitCode of
+    Exit.ExitSuccess ->
+      case extractField "cost_cents" stdout of
+        Just s -> pure (fromMaybe 0 (Read.readMaybe s))
+        Nothing -> pure 0
+    _ -> pure 0
+
+-- | Simple field extraction from JSON output (avoids aeson dependency for
+-- quick status checks). Looks for "field": "value" or "field": number.
+extractField :: String -> String -> Maybe String
+extractField field input =
+  case matchingLine of
+    Nothing -> Nothing
+    Just line ->
+      let (_, rest) = List.break (== ':') line
+       in case rest of
+            ':' : val -> Just (List.filter (\c -> c /= '"' && c /= ',' && c /= ' ') val)
+            _ -> Nothing
+  where
+    matchingLine = List.find (List.isInfixOf ("\"" <> field <> "\"")) (splitOn '\n' input)
+
+-- | Split a String on a delimiter character.
+splitOn :: Char -> String -> [String]
+splitOn _ [] = []
+splitOn c s =
+  let (before, after) = List.break (== c) s
+   in before : case after of
+        [] -> []
+        (_ : rest) -> splitOn c rest
+
+-- | Convert UTCTime to POSIX seconds.
+utcTimeToPOSIXSeconds :: UTCTime -> Double
+utcTimeToPOSIXSeconds t =
+  let day = toModifiedJulianDay (utctDay t)
+      secs = realToFrac (utctDayTime t) :: Double
+   in fromIntegral (day - 40587) * 86400 + secs
diff --git a/Omni/Pipeline/Git.hs b/Omni/Pipeline/Git.hs
new file mode 100644
index 00000000..eb3fc9b6
--- /dev/null
+++ b/Omni/Pipeline/Git.hs
@@ -0,0 +1,158 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Typed wrappers around git CLI operations.
+module Omni.Pipeline.Git where
+
+import Alpha
+import qualified Data.List as List
+import qualified Data.Text as T
+import qualified System.Exit as Exit
+import qualified System.Process as Process
+
+-- | Structured git error.
+data GitError = GitError
+  { geCommand :: Text,
+    geExitCode :: Int,
+    geStderr :: Text
+  }
+  deriving (Show, Eq)
+
+-- | Run a git command in a given working directory.
+runGit :: FilePath -> [String] -> IO (Either GitError String)
+runGit cwd args = do
+  (exitCode, stdout, stderr) <- Process.readProcessWithExitCode "git" (["-C", cwd] ++ args) ""
+  case exitCode of
+    Exit.ExitSuccess -> pure (Right (strip stdout))
+    Exit.ExitFailure n ->
+      pure
+        (Left (GitError (T.unwords (map T.pack ("git" : args))) (fromIntegral n) (T.pack stderr)))
+  where
+    strip = List.reverse <. List.dropWhile (== '\n') <. List.reverse
+
+-- | Run git in the repo root (no -C flag), useful for worktree commands
+-- that need to operate on the main repo.
+runGitRepo :: FilePath -> [String] -> IO (Either GitError String)
+runGitRepo = runGit
+
+-- | Create a new worktree with a new branch from a start point.
+createWorktree :: FilePath -> FilePath -> Text -> Text -> IO (Either GitError ())
+createWorktree repoRoot path branch startPoint =
+  fmap
+    (() <$)
+    (runGitRepo repoRoot ["worktree", "add", "-b", T.unpack branch, path, T.unpack startPoint])
+
+-- | Create a worktree from an existing branch.
+createWorktreeExisting :: FilePath -> FilePath -> Text -> IO (Either GitError ())
+createWorktreeExisting repoRoot path branch =
+  fmap
+    (() <$)
+    (runGitRepo repoRoot ["worktree", "add", path, T.unpack branch])
+
+-- | Remove a worktree.
+removeWorktree :: FilePath -> FilePath -> IO (Either GitError ())
+removeWorktree repoRoot path =
+  fmap (() <$) (runGitRepo repoRoot ["worktree", "remove", "--force", path])
+
+-- | Check out a branch in a workspace.
+checkoutBranch :: FilePath -> Text -> IO (Either GitError ())
+checkoutBranch workspace branch =
+  fmap (() <$) (runGit workspace ["checkout", T.unpack branch])
+
+-- | Create a new branch from a start point.
+createBranch :: FilePath -> Text -> Text -> IO (Either GitError ())
+createBranch workspace branch startPoint =
+  fmap
+    (() <$)
+    (runGit workspace ["checkout", "-b", T.unpack branch, T.unpack startPoint])
+
+-- | Delete a local branch.
+deleteBranch :: FilePath -> Text -> IO (Either GitError ())
+deleteBranch repoRoot branch =
+  fmap (() <$) (runGitRepo repoRoot ["branch", "-D", T.unpack branch])
+
+-- | Cherry-pick a commit (by branch name or SHA). Returns the new commit hash.
+cherryPick :: FilePath -> Text -> IO (Either GitError Text)
+cherryPick workspace ref = do
+  result <- runGit workspace ["cherry-pick", T.unpack ref]
+  case result of
+    Left e -> pure (Left e)
+    Right _ -> do
+      sha <- runGit workspace ["rev-parse", "HEAD"]
+      pure (T.pack </ sha)
+
+-- | Abort an in-progress cherry-pick.
+abortCherryPick :: FilePath -> IO (Either GitError ())
+abortCherryPick workspace =
+  fmap (() <$) (runGit workspace ["cherry-pick", "--abort"])
+
+-- | Count commits between two refs: `git rev-list --count base..head`
+commitCountBetween :: FilePath -> Text -> Text -> IO (Either GitError Int)
+commitCountBetween workspace base headRef = do
+  result <- runGit workspace ["rev-list", "--count", T.unpack base <> ".." <> T.unpack headRef]
+  case result of
+    Left e -> pure (Left e)
+    Right s -> case reads s of
+      [(n, _)] -> pure (Right n)
+      _ -> pure (Left (GitError "rev-list --count parse" 1 ("Could not parse: " <> T.pack s)))
+
+-- | Get the SHA of a branch tip, or Nothing if it doesn't exist.
+branchSha :: FilePath -> Text -> IO (Maybe Text)
+branchSha workspace ref = do
+  result <- runGit workspace ["rev-parse", "--verify", T.unpack ref <> "^{commit}"]
+  case result of
+    Right sha -> pure (Just (T.pack sha))
+    Left _ -> pure Nothing
+
+-- | Check if a workspace has no uncommitted changes.
+isCleanWorkspace :: FilePath -> IO Bool
+isCleanWorkspace workspace = do
+  result <- runGit workspace ["status", "--porcelain"]
+  case result of
+    Right "" -> pure True
+    Right _ -> pure False
+    Left _ -> pure False
+
+-- | Check if a path is a valid git worktree.
+worktreeExists :: FilePath -> IO Bool
+worktreeExists path = do
+  result <- runGit path ["rev-parse", "--is-inside-work-tree"]
+  case result of
+    Right "true" -> pure True
+    _ -> pure False
+
+-- | Revert the last commit (keep it in the history as a revert).
+revertLastCommit :: FilePath -> IO (Either GitError ())
+revertLastCommit workspace =
+  fmap (() <$) (runGit workspace ["revert", "--no-edit", "HEAD"])
+
+-- | Reset the current branch to a given ref (hard reset).
+hardReset :: FilePath -> Text -> IO (Either GitError ())
+hardReset workspace ref =
+  fmap (() <$) (runGit workspace ["reset", "--hard", T.unpack ref])
+
+-- | Get the current branch name.
+currentBranch :: FilePath -> IO (Maybe Text)
+currentBranch workspace = do
+  result <- runGit workspace ["rev-parse", "--abbrev-ref", "HEAD"]
+  case result of
+    Right b -> pure (Just (T.pack b))
+    Left _ -> pure Nothing
+
+-- | Fast-forward pull (for the integration worktree).
+pullFf :: FilePath -> IO (Either GitError ())
+pullFf workspace =
+  fmap (() <$) (runGit workspace ["pull", "--ff-only"])
+
+-- | Check if a branch exists in the repo.
+branchExists :: FilePath -> Text -> IO Bool
+branchExists repoRoot branch = do
+  result <- runGitRepo repoRoot ["show-ref", "--verify", "--quiet", "refs/heads/" <> T.unpack branch]
+  case result of
+    Right _ -> pure True
+    Left _ -> pure False
+
+-- | Fetch to ensure we have latest refs (useful when worktrees may be stale).
+fetch :: FilePath -> IO (Either GitError ())
+fetch workspace =
+  fmap (() <$) (runGit workspace ["fetch", "--quiet"])
diff --git a/Omni/Pipeline/Integrate.hs b/Omni/Pipeline/Integrate.hs
new file mode 100644
index 00000000..9327842f
--- /dev/null
+++ b/Omni/Pipeline/Integrate.hs
@@ -0,0 +1,71 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Deterministic integration: cherry-pick task commits into the base branch.
+--
+-- Replaces the LLM integrator. No agent involved.
+module Omni.Pipeline.Integrate where
+
+import Alpha
+import qualified Data.Text as T
+import qualified Omni.Pipeline.Core as Core
+import qualified Omni.Pipeline.Git as Git
+import qualified Omni.Pipeline.Verify as Verify
+import qualified Omni.Pipeline.Workspace as Workspace
+
+-- | Integrate a verified task into the base branch.
+--
+-- 1. Ensure integration worktree is on baseBranch
+-- 2. Cherry-pick the task commit
+-- 3. Verify the build on the base branch
+-- 4. Return result (caller handles task status + cleanup)
+integrateTask ::
+  Workspace.WorkspacePool ->
+  FilePath ->
+  Text ->
+  Text ->
+  Maybe Text ->
+  IO Core.IntegrateResult
+integrateTask pool repoRoot taskId baseBranch maybeNamespace = do
+  -- 1. Ensure integration worktree
+  wsResult <- Workspace.ensureIntegrationWorktree pool
+  case wsResult of
+    Left e -> pure (Core.Conflict ("Integration worktree setup failed: " <> e))
+    Right workspace -> do
+      -- Make sure we're up to date
+      cur <- Git.currentBranch workspace
+      case cur of
+        Nothing -> pure (Core.Conflict "Cannot determine current branch in integration worktree")
+        Just branch -> do
+          -- Verify we're on the right branch (or a tracking branch)
+          when (branch /= baseBranch) <| do
+            _ <- Git.checkoutBranch workspace baseBranch
+            pure ()
+
+          -- 2. Cherry-pick the task commit
+          pickResult <- Git.cherryPick workspace taskId
+          case pickResult of
+            Left e -> do
+              -- Conflict — abort and report
+              _ <- Git.abortCherryPick workspace
+              pure
+                ( Core.Conflict
+                    ( "Cherry-pick of "
+                        <> taskId
+                        <> " failed: "
+                        <> Git.geStderr e
+                    )
+                )
+            Right commitHash -> do
+              -- 3. Verify build on base branch
+              verifyResult <- Verify.verifyOnBase workspace maybeNamespace
+              case verifyResult of
+                Core.VerifyPass ->
+                  pure (Core.Integrated commitHash)
+                Core.VerifySkip _ ->
+                  -- No buildable namespace = trust the pre-integration verify
+                  pure (Core.Integrated commitHash)
+                Core.VerifyFail reason -> do
+                  -- Revert the cherry-pick
+                  _ <- Git.revertLastCommit workspace
+                  pure (Core.IntegrateBuildFail reason)
diff --git a/Omni/Pipeline/State.hs b/Omni/Pipeline/State.hs
new file mode 100644
index 00000000..5aeb1579
--- /dev/null
+++ b/Omni/Pipeline/State.hs
@@ -0,0 +1,161 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Pipeline state persistence in SQLite.
+--
+-- Tracks per-phase run records: dev agent runs, verify results, and
+-- integration attempts. Replaces the comment-mining approach.
+module Omni.Pipeline.State where
+
+import Alpha
+import qualified Data.Text as T
+import Data.Time (UTCTime, getCurrentTime)
+import qualified Database.SQLite.Simple as SQL
+import qualified Database.SQLite.Simple.FromField as SQL
+import qualified Database.SQLite.Simple.ToField as SQL
+import qualified Omni.Pipeline.Core as Core
+
+-- | Open (or create) the pipeline state DB and ensure schema exists.
+initPipelineDb :: FilePath -> IO SQL.Connection
+initPipelineDb dbPath = do
+  conn <- SQL.open dbPath
+  SQL.execute_ conn "PRAGMA journal_mode=WAL"
+  SQL.execute_
+    conn
+    "CREATE TABLE IF NOT EXISTS pipeline_runs (\
+    \ id            INTEGER PRIMARY KEY AUTOINCREMENT, \
+    \ task_id       TEXT NOT NULL, \
+    \ phase         TEXT NOT NULL, \
+    \ patchset      INTEGER NOT NULL, \
+    \ agentd_run_id TEXT, \
+    \ status        TEXT NOT NULL, \
+    \ cost_cents    REAL DEFAULT 0, \
+    \ started_at    TIMESTAMP NOT NULL, \
+    \ finished_at   TIMESTAMP, \
+    \ error_summary TEXT \
+    \)"
+  SQL.execute_
+    conn
+    "CREATE INDEX IF NOT EXISTS idx_pipeline_runs_task \
+    \ ON pipeline_runs(task_id)"
+  SQL.execute_
+    conn
+    "CREATE INDEX IF NOT EXISTS idx_pipeline_runs_status \
+    \ ON pipeline_runs(status)"
+  pure conn
+
+-- | Record a new pipeline run. Returns the row ID.
+recordRun :: SQL.Connection -> Core.RunRecord -> IO Int64
+recordRun conn rr = do
+  SQL.execute
+    conn
+    "INSERT INTO pipeline_runs (task_id, phase, patchset, agentd_run_id, status, cost_cents, started_at, finished_at, error_summary) \
+    \ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
+    ( Core.rrTaskId rr,
+      Core.phaseToText (Core.rrPhase rr),
+      Core.rrPatchset rr,
+      Core.rrAgentdRunId rr,
+      Core.runStatusToText (Core.rrStatus rr),
+      Core.rrCostCents rr,
+      Core.rrStartedAt rr,
+      Core.rrFinishedAt rr,
+      Core.rrError rr
+    )
+  SQL.lastInsertRowId conn
+
+-- | Mark a run as finished with a status, optional cost, and optional error.
+markRunFinished :: SQL.Connection -> Int64 -> Core.RunStatus -> Maybe Double -> Maybe Text -> IO ()
+markRunFinished conn rowId status maybeCost maybeError = do
+  now <- getCurrentTime
+  SQL.execute
+    conn
+    "UPDATE pipeline_runs SET status = ?, finished_at = ?, cost_cents = COALESCE(?, cost_cents), error_summary = COALESCE(?, error_summary) WHERE id = ?"
+    (Core.runStatusToText status, now, maybeCost, maybeError, rowId)
+
+-- | Count failed dev runs for a task at a given patchset.
+getRetryCount :: SQL.Connection -> Text -> Core.Phase -> Int -> IO Int
+getRetryCount conn taskId phase patchset = do
+  results <-
+    SQL.query
+      conn
+      "SELECT COUNT(*) FROM pipeline_runs WHERE task_id = ? AND phase = ? AND patchset = ? AND status = 'failure'"
+      (taskId, Core.phaseToText phase, patchset) ::
+      IO [SQL.Only Int]
+  case results of
+    [SQL.Only n] -> pure n
+    _ -> pure 0
+
+-- | Get cumulative cost across all runs for a task.
+getCumulativeCost :: SQL.Connection -> Text -> IO Double
+getCumulativeCost conn taskId = do
+  results <-
+    SQL.query
+      conn
+      "SELECT COALESCE(SUM(cost_cents), 0) FROM pipeline_runs WHERE task_id = ?"
+      (SQL.Only taskId) ::
+      IO [SQL.Only Double]
+  case results of
+    [SQL.Only n] -> pure n
+    _ -> pure 0
+
+-- | Get the error from the most recent verify failure for a task.
+-- Used to build review feedback for the next dev prompt.
+getLastVerifyFailure :: SQL.Connection -> Text -> IO (Maybe Text)
+getLastVerifyFailure conn taskId = do
+  results <-
+    SQL.query
+      conn
+      "SELECT error_summary FROM pipeline_runs WHERE task_id = ? AND phase = 'verify' AND status = 'failure' ORDER BY finished_at DESC LIMIT 1"
+      (SQL.Only taskId) ::
+      IO [SQL.Only (Maybe Text)]
+  case results of
+    [SQL.Only e] -> pure e
+    _ -> pure Nothing
+
+-- | Get all currently running dev runs.
+getActiveDevRuns :: SQL.Connection -> IO [(Int64, Text, Text, UTCTime)]
+getActiveDevRuns conn =
+  SQL.query_
+    conn
+    "SELECT id, task_id, agentd_run_id, started_at FROM pipeline_runs WHERE phase = 'dev' AND status = 'running' ORDER BY started_at ASC"
+
+-- | Get the last dev failure error for a task (any patchset).
+-- Used for general failure context.
+getLastDevFailure :: SQL.Connection -> Text -> IO (Maybe Text)
+getLastDevFailure conn taskId = do
+  results <-
+    SQL.query
+      conn
+      "SELECT error_summary FROM pipeline_runs WHERE task_id = ? AND phase = 'dev' AND status = 'failure' ORDER BY finished_at DESC LIMIT 1"
+      (SQL.Only taskId) ::
+      IO [SQL.Only (Maybe Text)]
+  case results of
+    [SQL.Only e] -> pure e
+    _ -> pure Nothing
+
+-- | Check if a task has any running dev runs.
+hasActiveRun :: SQL.Connection -> Text -> IO Bool
+hasActiveRun conn taskId = do
+  results <-
+    SQL.query
+      conn
+      "SELECT COUNT(*) FROM pipeline_runs WHERE task_id = ? AND status = 'running'"
+      (SQL.Only taskId) ::
+      IO [SQL.Only Int]
+  case results of
+    [SQL.Only n] -> pure (n > 0)
+    _ -> pure False
+
+-- | Get the most recent finished_at for a failed dev run on a given patchset.
+-- Used for exponential backoff calculation.
+getLastFailureTime :: SQL.Connection -> Text -> Int -> IO (Maybe UTCTime)
+getLastFailureTime conn taskId patchset = do
+  results <-
+    SQL.query
+      conn
+      "SELECT finished_at FROM pipeline_runs WHERE task_id = ? AND phase = 'dev' AND patchset = ? AND status = 'failure' ORDER BY finished_at DESC LIMIT 1"
+      (taskId, patchset) ::
+      IO [SQL.Only (Maybe UTCTime)]
+  case results of
+    [SQL.Only t] -> pure t
+    _ -> pure Nothing
diff --git a/Omni/Pipeline/Verify.hs b/Omni/Pipeline/Verify.hs
new file mode 100644
index 00000000..85055d54
--- /dev/null
+++ b/Omni/Pipeline/Verify.hs
@@ -0,0 +1,139 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Deterministic build verification.
+--
+-- Replaces the LLM reviewer. Runs bild and tests, returns structured results.
+module Omni.Pipeline.Verify where
+
+import Alpha
+import qualified Data.Text as T
+import qualified Omni.Pipeline.Core as Core
+import qualified Omni.Pipeline.Git as Git
+import qualified System.Exit as Exit
+import qualified System.Process as Process
+
+-- | Result of running bild on a namespace.
+data BildResult
+  = BildPass
+  | BildFail Int Text -- exit code, output
+  | BildNotBuildable -- exit code 2
+  deriving (Show, Eq)
+
+-- | Verify a task: check branch shape, run bild, run tests.
+--
+-- Runs in the dev worktree where the commit already exists.
+verifyTask :: FilePath -> Text -> Text -> Maybe Text -> IO Core.VerifyResult
+verifyTask workspace taskId baseBranch maybeNamespace = do
+  -- 1. Check branch shape: exactly 1 commit between base and task
+  countResult <- Git.commitCountBetween workspace baseBranch taskId
+  case countResult of
+    Left e ->
+      pure (Core.VerifyFail ("Branch shape check failed: " <> T.pack (show e)))
+    Right n
+      | n /= 1 ->
+          pure
+            ( Core.VerifyFail
+                ( "Expected 1 commit on "
+                    <> taskId
+                    <> " relative to "
+                    <> baseBranch
+                    <> ", got "
+                    <> T.pack (show n)
+                )
+            )
+      | otherwise -> do
+          -- 2. Run build verification
+          case maybeNamespace of
+            Nothing -> pure (Core.VerifySkip "No namespace set, skipping build verification")
+            Just ns -> do
+              buildResult <- runBild workspace ns
+              case buildResult of
+                BildNotBuildable ->
+                  pure (Core.VerifySkip ("Namespace " <> ns <> " is not buildable (bild exit 2)"))
+                BildFail code output ->
+                  pure
+                    ( Core.VerifyFail
+                        ( "Build failed for "
+                            <> ns
+                            <> " (exit "
+                            <> T.pack (show code)
+                            <> "):\n"
+                            <> output
+                        )
+                    )
+                BildPass -> do
+                  -- 3. Run tests
+                  testResult <- runBildTest workspace ns
+                  case testResult of
+                    BildNotBuildable -> pure Core.VerifyPass -- no tests to run
+                    BildFail code output ->
+                      pure
+                        ( Core.VerifyFail
+                            ( "Tests failed for "
+                                <> ns
+                                <> " (exit "
+                                <> T.pack (show code)
+                                <> "):\n"
+                                <> output
+                            )
+                        )
+                    BildPass -> pure Core.VerifyPass
+
+-- | Post-integration verification on the base branch.
+-- Just runs bild + tests, no branch shape check.
+verifyOnBase :: FilePath -> Maybe Text -> IO Core.VerifyResult
+verifyOnBase workspace maybeNamespace =
+  case maybeNamespace of
+    Nothing -> pure (Core.VerifySkip "No namespace, skipping post-integration verify")
+    Just ns -> do
+      buildResult <- runBild workspace ns
+      case buildResult of
+        BildNotBuildable -> pure (Core.VerifySkip ("Not buildable: " <> ns))
+        BildFail code output ->
+          pure
+            ( Core.VerifyFail
+                ( "Post-integration build failed for "
+                    <> ns
+                    <> " (exit "
+                    <> T.pack (show code)
+                    <> "):\n"
+                    <> output
+                )
+            )
+        BildPass -> do
+          testResult <- runBildTest workspace ns
+          case testResult of
+            BildNotBuildable -> pure Core.VerifyPass
+            BildFail code output ->
+              pure
+                ( Core.VerifyFail
+                    ( "Post-integration tests failed for "
+                        <> ns
+                        <> " (exit "
+                        <> T.pack (show code)
+                        <> "):\n"
+                        <> output
+                    )
+                )
+            BildPass -> pure Core.VerifyPass
+
+-- | Run `bild <namespace>` and interpret the exit code.
+runBild :: FilePath -> Text -> IO BildResult
+runBild workspace ns = runBildCmd workspace [T.unpack ns]
+
+-- | Run `bild --test <namespace>`.
+runBildTest :: FilePath -> Text -> IO BildResult
+runBildTest workspace ns = runBildCmd workspace ["--test", T.unpack ns]
+
+-- | Run a bild command and classify the result.
+runBildCmd :: FilePath -> [String] -> IO BildResult
+runBildCmd workspace args = do
+  (exitCode, stdout, stderr) <-
+    Process.readCreateProcessWithExitCode
+      (Process.proc "bild" args) {Process.cwd = Just workspace}
+      ""
+  case exitCode of
+    Exit.ExitSuccess -> pure BildPass
+    Exit.ExitFailure 2 -> pure BildNotBuildable
+    Exit.ExitFailure n -> pure (BildFail (fromIntegral n) (T.pack (stdout <> stderr)))
diff --git a/Omni/Pipeline/Workspace.hs b/Omni/Pipeline/Workspace.hs
new file mode 100644
index 00000000..a7abd566
--- /dev/null
+++ b/Omni/Pipeline/Workspace.hs
@@ -0,0 +1,137 @@
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Worktree pool management for the pipeline.
+--
+-- Manages per-task dev worktrees and the persistent integration worktree.
+module Omni.Pipeline.Workspace where
+
+import Alpha
+import qualified Control.Concurrent.STM as STM
+import qualified Data.Map.Strict as Map
+import qualified Data.Text as T
+import qualified Omni.Pipeline.Core as Core
+import qualified Omni.Pipeline.Git as Git
+import qualified System.Directory as Dir
+
+-- | The workspace pool tracks active dev worktrees.
+data WorkspacePool = WorkspacePool
+  { wpActive :: STM.TVar (Map.Map Text FilePath),
+    wpConfig :: Core.PipelineConfig
+  }
+
+-- | Create a new workspace pool.
+newPool :: Core.PipelineConfig -> IO WorkspacePool
+newPool cfg = do
+  active <- STM.newTVarIO Map.empty
+  Dir.createDirectoryIfMissing True (Core.pcRoot cfg)
+  pure (WorkspacePool active cfg)
+
+-- | How many dev slots are available?
+slotsAvailable :: WorkspacePool -> IO Int
+slotsAvailable pool = do
+  active <- STM.readTVarIO (wpActive pool)
+  pure (Core.pcConcurrency (wpConfig pool) - Map.size active)
+
+-- | Acquire a dev worktree for a task. Creates the branch and worktree.
+-- Returns the workspace path, or an error message.
+acquireWorkspace :: WorkspacePool -> Text -> IO (Either Text FilePath)
+acquireWorkspace pool taskId = do
+  let cfg = wpConfig pool
+      workspace = Core.taskDir cfg taskId
+      repoRoot = Core.pcRepoRoot cfg
+      baseBranch = Core.pcBaseBranch cfg
+
+  slots <- slotsAvailable pool
+  if slots <= 0
+    then pure (Left "No workspace slots available")
+    else do
+      exists <- Git.worktreeExists workspace
+      if exists
+        then do
+          -- Worktree already exists (retry scenario). Just register it.
+          STM.atomically
+            <| STM.modifyTVar' (wpActive pool) (Map.insert taskId workspace)
+          pure (Right workspace)
+        else do
+          -- Check if branch already exists
+          hasBranch <- Git.branchExists repoRoot taskId
+          result <-
+            if hasBranch
+              then Git.createWorktreeExisting repoRoot workspace taskId
+              else Git.createWorktree repoRoot workspace taskId baseBranch
+          case result of
+            Left e -> pure (Left (T.pack (show e)))
+            Right () -> do
+              STM.atomically
+                <| STM.modifyTVar' (wpActive pool) (Map.insert taskId workspace)
+              pure (Right workspace)
+
+-- | Release a dev worktree: remove it and delete the task branch.
+releaseWorkspace :: WorkspacePool -> Text -> IO ()
+releaseWorkspace pool taskId = do
+  let cfg = wpConfig pool
+      workspace = Core.taskDir cfg taskId
+      repoRoot = Core.pcRepoRoot cfg
+
+  -- Remove from pool first
+  STM.atomically
+    <| STM.modifyTVar' (wpActive pool) (Map.delete taskId)
+
+  -- Remove worktree (ignore errors — might already be gone)
+  _ <- Git.removeWorktree repoRoot workspace
+
+  -- Delete branch (ignore errors — might not exist)
+  _ <- Git.deleteBranch repoRoot taskId
+
+  pure ()
+
+-- | Reset a workspace for retry: delete the branch, recreate from base.
+resetWorkspace :: WorkspacePool -> Text -> IO (Either Text FilePath)
+resetWorkspace pool taskId = do
+  releaseWorkspace pool taskId
+  acquireWorkspace pool taskId
+
+-- | Ensure the integration worktree exists and is on the base branch.
+ensureIntegrationWorktree :: WorkspacePool -> IO (Either Text FilePath)
+ensureIntegrationWorktree pool = do
+  let cfg = wpConfig pool
+      workspace = Core.integrationDir cfg
+      repoRoot = Core.pcRepoRoot cfg
+      baseBranch = Core.pcBaseBranch cfg
+      branch = baseBranch <> "-integration"
+
+  exists <- Git.worktreeExists workspace
+  if exists
+    then do
+      -- Make sure it's on the right branch
+      cur <- Git.currentBranch workspace
+      case cur of
+        Just b | b == branch || b == baseBranch -> pure (Right workspace)
+        _ -> do
+          result <- Git.checkoutBranch workspace baseBranch
+          case result of
+            Right () -> pure (Right workspace)
+            Left e -> pure (Left ("Integration worktree on wrong branch: " <> T.pack (show e)))
+    else do
+      hasBranch <- Git.branchExists repoRoot branch
+      result <-
+        if hasBranch
+          then Git.createWorktreeExisting repoRoot workspace branch
+          else Git.createWorktree repoRoot workspace branch baseBranch
+      case result of
+        Left e -> pure (Left ("Failed to create integration worktree: " <> T.pack (show e)))
+        Right () -> pure (Right workspace)
+
+-- | Startup recovery: scan for stale worktrees and return their task IDs.
+-- Does NOT clean them up — the caller decides what to do.
+findStaleWorktrees :: WorkspacePool -> IO [Text]
+findStaleWorktrees pool = do
+  let root = Core.pcRoot (wpConfig pool)
+  exists <- Dir.doesDirectoryExist root
+  if not exists
+    then pure []
+    else do
+      entries <- Dir.listDirectory root
+      let taskDirs = filter (\e -> e /= "integration" && e /= "state.db") entries
+      pure (map T.pack taskDirs)