← Back to task

Commit 8d596006

commit 8d596006a5fc46b8b9cc7cdd5aee8416eedb567a
Author: Ben Sima <ben@bensima.com>
Date:   Tue Dec 30 18:18:03 2025

    Omni/Agent/Telegram.hs: Add 'work on <task-id>' command to Ava
    
    Automated via pi-review.
    
    Task-Id: t-280.2.1

diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs
index f83fa48a..680ec15c 100644
--- a/Omni/Agent/Telegram.hs
+++ b/Omni/Agent/Telegram.hs
@@ -97,6 +97,7 @@ import qualified Omni.Agent.Telegram.Actions as Actions
 import qualified Omni.Agent.Telegram.IncomingQueue as IncomingQueue
 import qualified Omni.Agent.Telegram.Media as Media
 import qualified Omni.Agent.Telegram.Messages as Messages
+import qualified Omni.Agent.Telegram.Orchestrator as Orchestrator
 import qualified Omni.Agent.Telegram.Reminders as Reminders
 import qualified Omni.Agent.Telegram.Types as Types
 import qualified Omni.Agent.Tools as Tools
@@ -772,9 +773,12 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
 
   let msgText = Types.tmText msg
       threadId = Types.tmThreadId msg
-  cmdHandled <- handleOutreachCommand tgConfig chatId threadId msgText
-  when cmdHandled (pure ())
-  unless cmdHandled <| handleAuthorizedMessageContinued tgConfig provider engineCfg msg uid userName chatId
+
+  -- Check for special commands before LLM processing
+  outreachHandled <- handleOutreachCommand tgConfig chatId threadId msgText
+  workOnHandled <- if outreachHandled then pure True else handleWorkOnCommand chatId threadId userName msgText
+
+  unless workOnHandled <| handleAuthorizedMessageContinued tgConfig provider engineCfg msg uid userName chatId
 
 handleAuthorizedMessageContinued ::
   Types.TelegramConfig ->
@@ -954,6 +958,25 @@ handleAuthorizedMessageBatch ::
 handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText = do
   unless (Types.isGroupChat msg) <| Reminders.recordUserChat uid chatId
 
+  let threadId = Types.tmThreadId msg
+
+  -- Check for special commands before LLM processing
+  outreachHandled <- handleOutreachCommand tgConfig chatId threadId batchedText
+  workOnHandled <- if outreachHandled then pure True else handleWorkOnCommand chatId threadId userName batchedText
+
+  unless workOnHandled <| handleAuthorizedMessageBatchContinued tgConfig provider engineCfg msg uid userName chatId batchedText
+
+handleAuthorizedMessageBatchContinued ::
+  Types.TelegramConfig ->
+  Provider.Provider ->
+  Engine.EngineConfig ->
+  Types.TelegramMessage ->
+  Text ->
+  Text ->
+  Int ->
+  Text ->
+  IO ()
+handleAuthorizedMessageBatchContinued tgConfig provider engineCfg msg uid userName chatId batchedText = do
   pdfContent <- case Types.tmDocument msg of
     Just doc | Types.isPdf doc -> do
       putText <| "Processing PDF: " <> fromMaybe "(unnamed)" (Types.tdFileName doc)
@@ -1669,3 +1692,51 @@ formatDraftForReview draft =
       "",
       "reply `/approve " <> Outreach.draftId draft <> "` or `/reject " <> Outreach.draftId draft <> " [reason]`"
     ]
+
+-- | Handle "work on t-XXX" commands to run the orchestrator
+-- Only available to authorized users (Ben)
+handleWorkOnCommand :: Int -> Maybe Int -> Text -> Text -> IO Bool
+handleWorkOnCommand chatId mThreadId userName msgText =
+  case Orchestrator.parseWorkOnCommand msgText of
+    Nothing -> pure False
+    Just taskId -> do
+      if not (isBenAuthorized userName)
+        then do
+          _ <- Messages.enqueueImmediate Nothing chatId mThreadId "sorry, only ben can run the orchestrator" (Just "system") Nothing
+          pure True
+        else do
+          putText <| "Starting orchestrator for task: " <> taskId
+          -- Run orchestrator in background so we don't block the message loop
+          _ <- forkIO <| runOrchestratorForChat chatId mThreadId taskId
+          pure True
+
+-- | Run the orchestrator and report results to chat
+runOrchestratorForChat :: Int -> Maybe Int -> Text -> IO ()
+runOrchestratorForChat chatId mThreadId taskId = do
+  let scriptPath = "Omni/Ide/pi-orchestrate.sh"
+      onProgress msg = do
+        putText <| "Orchestrator progress: " <> msg
+        void <| Messages.enqueueImmediate Nothing chatId mThreadId msg (Just "orchestrator") Nothing
+
+      cfg =
+        Orchestrator.OrchestratorConfig
+          { Orchestrator.orchTaskId = taskId,
+            Orchestrator.orchMaxIterations = 3,
+            Orchestrator.orchScriptPath = scriptPath,
+            Orchestrator.orchOnProgress = onProgress
+          }
+
+  result <- Orchestrator.runOrchestrator cfg
+
+  let finalMsg = case result of
+        Orchestrator.OrchSuccess commit ->
+          "✅ done! " <> taskId <> " completed.\ncommit: " <> commit
+        Orchestrator.OrchNeedsHelp reason ->
+          "❌ " <> taskId <> " needs help: " <> reason
+        Orchestrator.OrchFailed err ->
+          "❌ orchestrator failed: " <> err
+        Orchestrator.OrchInvalidTask err ->
+          "⚠️ " <> err
+
+  _ <- Messages.enqueueImmediate Nothing chatId mThreadId finalMsg (Just "orchestrator") Nothing
+  pure ()
diff --git a/Omni/Agent/Telegram/Orchestrator.hs b/Omni/Agent/Telegram/Orchestrator.hs
new file mode 100644
index 00000000..39a3839a
--- /dev/null
+++ b/Omni/Agent/Telegram/Orchestrator.hs
@@ -0,0 +1,283 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Orchestrator subprocess management for Telegram bot.
+--
+-- Handles spawning pi-orchestrate.sh and reporting progress to users.
+--
+-- : out omni-agent-telegram-orchestrator
+-- : dep aeson
+-- : dep process
+module Omni.Agent.Telegram.Orchestrator
+  ( -- * Types
+    OrchestratorConfig (..),
+    OrchestratorResult (..),
+
+    -- * Running
+    runOrchestrator,
+
+    -- * Parsing
+    parseWorkOnCommand,
+
+    -- * Task validation
+    validateTaskForWork,
+    TaskInfo (..),
+
+    -- * Testing
+    main,
+    test,
+  )
+where
+
+import Alpha
+import qualified Data.Aeson as Aeson
+import qualified Data.Aeson.KeyMap as KeyMap
+import qualified Data.ByteString.Lazy as BL
+import qualified Data.Text as Text
+import qualified Data.Text.Encoding as TE
+import qualified Data.Text.IO as TextIO
+import qualified Omni.Test as Test
+import qualified System.Exit as Exit
+import System.IO (hClose, hGetLine, hIsEOF)
+import qualified System.Process as Process
+
+main :: IO ()
+main = Test.run test
+
+test :: Test.Tree
+test =
+  Test.group
+    "Omni.Agent.Telegram.Orchestrator"
+    [ Test.unit "parseWorkOnCommand parses 'work on t-123'" <| do
+        parseWorkOnCommand "work on t-123" Test.@=? Just "t-123",
+      Test.unit "parseWorkOnCommand parses 'start t-456'" <| do
+        parseWorkOnCommand "start t-456" Test.@=? Just "t-456",
+      Test.unit "parseWorkOnCommand parses 'work on t-280.2.1'" <| do
+        parseWorkOnCommand "work on t-280.2.1" Test.@=? Just "t-280.2.1",
+      Test.unit "parseWorkOnCommand handles extra whitespace" <| do
+        parseWorkOnCommand "  work on   t-999  " Test.@=? Just "t-999",
+      Test.unit "parseWorkOnCommand rejects invalid input" <| do
+        parseWorkOnCommand "hello world" Test.@=? Nothing,
+      Test.unit "parseWorkOnCommand rejects missing task id" <| do
+        parseWorkOnCommand "work on" Test.@=? Nothing
+    ]
+
+-- | Configuration for orchestrator run
+data OrchestratorConfig = OrchestratorConfig
+  { -- | Task ID to work on
+    orchTaskId :: Text,
+    -- | Maximum iterations (default 3)
+    orchMaxIterations :: Int,
+    -- | Path to pi-orchestrate.sh
+    orchScriptPath :: FilePath,
+    -- | Callback for progress updates
+    orchOnProgress :: Text -> IO ()
+  }
+  deriving (Generic)
+
+-- | Result of orchestrator run
+data OrchestratorResult
+  = -- | Task completed successfully with commit hash
+    OrchSuccess Text
+  | -- | Task needs help with reason
+    OrchNeedsHelp Text
+  | -- | Orchestrator failed with error
+    OrchFailed Text
+  | -- | Task validation failed
+    OrchInvalidTask Text
+  deriving (Show, Eq, Generic)
+
+-- | Info about a task
+data TaskInfo = TaskInfo
+  { taskTitle :: Text,
+    taskStatus :: Text
+  }
+  deriving (Show, Eq, Generic)
+
+-- | Parse "work on t-XXX" or "start t-XXX" style commands
+-- Returns Just taskId if pattern matches, Nothing otherwise
+parseWorkOnCommand :: Text -> Maybe Text
+parseWorkOnCommand input =
+  let normalized = Text.strip (Text.toLower input)
+      words' = Text.words normalized
+   in case words' of
+        ["work", "on", taskId] -> validateTaskId taskId
+        ["start", taskId] -> validateTaskId taskId
+        _ -> Nothing
+  where
+    validateTaskId tid
+      | "t-" `Text.isPrefixOf` tid = Just tid
+      | otherwise = Nothing
+
+-- | Validate a task exists and is in a workable state (Open, InProgress)
+validateTaskForWork :: Text -> IO (Either Text TaskInfo)
+validateTaskForWork taskId = do
+  (exitCode, stdoutStr, stderrStr) <-
+    Process.readProcessWithExitCode
+      "task"
+      ["show", Text.unpack taskId, "--json"]
+      ""
+  case exitCode of
+    Exit.ExitFailure _ ->
+      pure (Left ("task not found: " <> taskId <> " (" <> Text.pack stderrStr <> ")"))
+    Exit.ExitSuccess -> do
+      case Aeson.decode (BL.fromStrict (TE.encodeUtf8 (Text.pack stdoutStr))) of
+        Nothing -> pure (Left "failed to parse task json")
+        Just obj -> parseTaskJson obj
+  where
+    parseTaskJson :: Aeson.Value -> IO (Either Text TaskInfo)
+    parseTaskJson (Aeson.Object obj) = do
+      let mTitle = case KeyMap.lookup "taskTitle" obj of
+            Just (Aeson.String t) -> Just t
+            _ -> Nothing
+          mStatus = case KeyMap.lookup "taskStatus" obj of
+            Just (Aeson.String s) -> Just s
+            _ -> Nothing
+      case (mTitle, mStatus) of
+        (Just title, Just status) ->
+          if isWorkableStatus status
+            then pure (Right (TaskInfo title status))
+            else pure (Left ("task is in non-workable state: " <> status))
+        _ -> pure (Left "task json missing title or status")
+    parseTaskJson _ = pure (Left "invalid task json structure")
+
+    isWorkableStatus s = s `elem` ["Open", "InProgress", "Todo"]
+
+-- | Run the orchestrator for a task
+-- This spawns pi-orchestrate.sh and streams progress to the callback
+runOrchestrator :: OrchestratorConfig -> IO OrchestratorResult
+runOrchestrator cfg = do
+  -- First validate the task
+  taskResult <- validateTaskForWork (orchTaskId cfg)
+  case taskResult of
+    Left err -> pure (OrchInvalidTask err)
+    Right taskInfo -> do
+      orchOnProgress cfg ("🔧 starting work on " <> orchTaskId cfg <> ": " <> taskTitle taskInfo)
+      runOrchestratorProcess cfg
+
+-- | Actually run the pi-orchestrate.sh process
+runOrchestratorProcess :: OrchestratorConfig -> IO OrchestratorResult
+runOrchestratorProcess cfg = do
+  let args =
+        [ orchScriptPath cfg,
+          Text.unpack (orchTaskId cfg),
+          "--max=" <> show (orchMaxIterations cfg)
+        ]
+
+  -- Create the process with pipes for stdout/stderr
+  let createProc =
+        (Process.proc "bash" args)
+          { Process.std_out = Process.CreatePipe,
+            Process.std_err = Process.CreatePipe
+          }
+
+  result <-
+    try @SomeException <| do
+      (_, Just hout, Just herr, ph) <- Process.createProcess createProc
+
+      -- MVar to collect final result
+      resultVar <- newEmptyMVar
+
+      -- Read and process stdout in a separate thread
+      _ <- forkIO <| streamOutput hout (orchOnProgress cfg) resultVar
+
+      -- Read stderr (just collect it, don't stream)
+      stderrContent <- collectOutput herr
+
+      -- Wait for process to complete
+      exitCode <- Process.waitForProcess ph
+
+      -- Get the parsed result from stdout streaming
+      mParsedResult <- takeMVar resultVar
+
+      case exitCode of
+        Exit.ExitSuccess ->
+          case mParsedResult of
+            Just r -> pure r
+            Nothing -> pure (OrchSuccess "completed")
+        Exit.ExitFailure _ ->
+          case mParsedResult of
+            Just r -> pure r
+            Nothing -> pure (OrchFailed (if Text.null stderrContent then "orchestrator failed" else stderrContent))
+
+  case result of
+    Left err -> pure (OrchFailed ("exception: " <> tshow err))
+    Right r -> pure r
+
+-- | Stream output from a handle, parsing for progress updates
+-- Puts the final result (if detected) into the MVar
+streamOutput :: Handle -> (Text -> IO ()) -> MVar (Maybe OrchestratorResult) -> IO ()
+streamOutput h onProgress resultVar = do
+  mResult <- go Nothing
+  putMVar resultVar mResult
+  where
+    go :: Maybe OrchestratorResult -> IO (Maybe OrchestratorResult)
+    go currentResult = do
+      eof <- hIsEOF h
+      if eof
+        then do
+          hClose h
+          pure currentResult
+        else do
+          line <- Text.pack </ hGetLine h
+          let newResult = parseLine line currentResult
+          -- Send progress for interesting lines
+          when (isProgressLine line) <| onProgress (formatProgress line)
+          go newResult
+
+    parseLine :: Text -> Maybe OrchestratorResult -> Maybe OrchestratorResult
+    parseLine line current
+      | "=== SUCCESS ===" `Text.isInfixOf` line = Just (OrchSuccess "")
+      | "=== FAILURE - Needs Help ===" `Text.isInfixOf` line = Just (OrchNeedsHelp "")
+      | "=== FAILURE - Max Iterations ===" `Text.isInfixOf` line = Just (OrchFailed "max iterations reached")
+      | "Commit: " `Text.isInfixOf` line =
+          let commit = Text.strip (Text.drop 8 (snd (Text.breakOn "Commit: " line)))
+           in case current of
+                Just (OrchSuccess _) -> Just (OrchSuccess commit)
+                _ -> current
+      | "requires human intervention" `Text.isInfixOf` line =
+          Just (OrchNeedsHelp "requires human intervention")
+      | otherwise = current
+
+    isProgressLine :: Text -> Bool
+    isProgressLine line =
+      any
+        (`Text.isInfixOf` line)
+        [ "=== Iteration",
+          "Phase: CODER",
+          "Phase: REVIEWER",
+          "=== SUCCESS",
+          "=== FAILURE",
+          "Commit:"
+        ]
+
+    formatProgress :: Text -> Text
+    formatProgress line
+      | "=== Iteration" `Text.isInfixOf` line =
+          let iterNum = extractIterationNum line
+           in "🔄 iteration " <> iterNum
+      | "Phase: CODER" `Text.isInfixOf` line = "💻 coder running..."
+      | "Phase: REVIEWER" `Text.isInfixOf` line = "🔍 reviewer checking..."
+      | "=== SUCCESS ===" `Text.isInfixOf` line = "✅ done!"
+      | "=== FAILURE" `Text.isInfixOf` line = "❌ failed"
+      | "Commit:" `Text.isInfixOf` line =
+          let commit = Text.strip (Text.drop 8 (snd (Text.breakOn "Commit: " line)))
+           in "📝 commit: " <> commit
+      | otherwise = line
+
+    extractIterationNum :: Text -> Text
+    extractIterationNum line =
+      -- Format is "=== Iteration 1/3 ==="
+      let parts = Text.words line
+       in case drop 1 parts of
+            (numPart : _) -> numPart
+            _ -> "?"
+
+-- | Collect all output from a handle into Text
+collectOutput :: Handle -> IO Text
+collectOutput h = do
+  content <- try @SomeException (TextIO.hGetContents h)
+  case content of
+    Left _ -> pure ""
+    Right t -> pure t