commit 8d596006a5fc46b8b9cc7cdd5aee8416eedb567a
Author: Ben Sima <ben@bensima.com>
Date: Tue Dec 30 18:18:03 2025
Omni/Agent/Telegram.hs: Add 'work on <task-id>' command to Ava
Automated via pi-review.
Task-Id: t-280.2.1
diff --git a/Omni/Agent/Telegram.hs b/Omni/Agent/Telegram.hs
index f83fa48a..680ec15c 100644
--- a/Omni/Agent/Telegram.hs
+++ b/Omni/Agent/Telegram.hs
@@ -97,6 +97,7 @@ import qualified Omni.Agent.Telegram.Actions as Actions
import qualified Omni.Agent.Telegram.IncomingQueue as IncomingQueue
import qualified Omni.Agent.Telegram.Media as Media
import qualified Omni.Agent.Telegram.Messages as Messages
+import qualified Omni.Agent.Telegram.Orchestrator as Orchestrator
import qualified Omni.Agent.Telegram.Reminders as Reminders
import qualified Omni.Agent.Telegram.Types as Types
import qualified Omni.Agent.Tools as Tools
@@ -772,9 +773,12 @@ handleAuthorizedMessage tgConfig provider engineCfg msg uid userName chatId = do
let msgText = Types.tmText msg
threadId = Types.tmThreadId msg
- cmdHandled <- handleOutreachCommand tgConfig chatId threadId msgText
- when cmdHandled (pure ())
- unless cmdHandled <| handleAuthorizedMessageContinued tgConfig provider engineCfg msg uid userName chatId
+
+ -- Check for special commands before LLM processing
+ outreachHandled <- handleOutreachCommand tgConfig chatId threadId msgText
+ workOnHandled <- if outreachHandled then pure True else handleWorkOnCommand chatId threadId userName msgText
+
+ unless workOnHandled <| handleAuthorizedMessageContinued tgConfig provider engineCfg msg uid userName chatId
handleAuthorizedMessageContinued ::
Types.TelegramConfig ->
@@ -954,6 +958,25 @@ handleAuthorizedMessageBatch ::
handleAuthorizedMessageBatch tgConfig provider engineCfg msg uid userName chatId batchedText = do
unless (Types.isGroupChat msg) <| Reminders.recordUserChat uid chatId
+ let threadId = Types.tmThreadId msg
+
+ -- Check for special commands before LLM processing
+ outreachHandled <- handleOutreachCommand tgConfig chatId threadId batchedText
+ workOnHandled <- if outreachHandled then pure True else handleWorkOnCommand chatId threadId userName batchedText
+
+ unless workOnHandled <| handleAuthorizedMessageBatchContinued tgConfig provider engineCfg msg uid userName chatId batchedText
+
+handleAuthorizedMessageBatchContinued ::
+ Types.TelegramConfig ->
+ Provider.Provider ->
+ Engine.EngineConfig ->
+ Types.TelegramMessage ->
+ Text ->
+ Text ->
+ Int ->
+ Text ->
+ IO ()
+handleAuthorizedMessageBatchContinued tgConfig provider engineCfg msg uid userName chatId batchedText = do
pdfContent <- case Types.tmDocument msg of
Just doc | Types.isPdf doc -> do
putText <| "Processing PDF: " <> fromMaybe "(unnamed)" (Types.tdFileName doc)
@@ -1669,3 +1692,51 @@ formatDraftForReview draft =
"",
"reply `/approve " <> Outreach.draftId draft <> "` or `/reject " <> Outreach.draftId draft <> " [reason]`"
]
+
+-- | Handle "work on t-XXX" commands to run the orchestrator
+-- Only available to authorized users (Ben)
+handleWorkOnCommand :: Int -> Maybe Int -> Text -> Text -> IO Bool
+handleWorkOnCommand chatId mThreadId userName msgText =
+ case Orchestrator.parseWorkOnCommand msgText of
+ Nothing -> pure False
+ Just taskId -> do
+ if not (isBenAuthorized userName)
+ then do
+ _ <- Messages.enqueueImmediate Nothing chatId mThreadId "sorry, only ben can run the orchestrator" (Just "system") Nothing
+ pure True
+ else do
+ putText <| "Starting orchestrator for task: " <> taskId
+ -- Run orchestrator in background so we don't block the message loop
+ _ <- forkIO <| runOrchestratorForChat chatId mThreadId taskId
+ pure True
+
+-- | Run the orchestrator and report results to chat
+runOrchestratorForChat :: Int -> Maybe Int -> Text -> IO ()
+runOrchestratorForChat chatId mThreadId taskId = do
+ let scriptPath = "Omni/Ide/pi-orchestrate.sh"
+ onProgress msg = do
+ putText <| "Orchestrator progress: " <> msg
+ void <| Messages.enqueueImmediate Nothing chatId mThreadId msg (Just "orchestrator") Nothing
+
+ cfg =
+ Orchestrator.OrchestratorConfig
+ { Orchestrator.orchTaskId = taskId,
+ Orchestrator.orchMaxIterations = 3,
+ Orchestrator.orchScriptPath = scriptPath,
+ Orchestrator.orchOnProgress = onProgress
+ }
+
+ result <- Orchestrator.runOrchestrator cfg
+
+ let finalMsg = case result of
+ Orchestrator.OrchSuccess commit ->
+ "✅ done! " <> taskId <> " completed.\ncommit: " <> commit
+ Orchestrator.OrchNeedsHelp reason ->
+ "❌ " <> taskId <> " needs help: " <> reason
+ Orchestrator.OrchFailed err ->
+ "❌ orchestrator failed: " <> err
+ Orchestrator.OrchInvalidTask err ->
+ "⚠️ " <> err
+
+ _ <- Messages.enqueueImmediate Nothing chatId mThreadId finalMsg (Just "orchestrator") Nothing
+ pure ()
diff --git a/Omni/Agent/Telegram/Orchestrator.hs b/Omni/Agent/Telegram/Orchestrator.hs
new file mode 100644
index 00000000..39a3839a
--- /dev/null
+++ b/Omni/Agent/Telegram/Orchestrator.hs
@@ -0,0 +1,283 @@
+{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE NoImplicitPrelude #-}
+
+-- | Orchestrator subprocess management for Telegram bot.
+--
+-- Handles spawning pi-orchestrate.sh and reporting progress to users.
+--
+-- : out omni-agent-telegram-orchestrator
+-- : dep aeson
+-- : dep process
+module Omni.Agent.Telegram.Orchestrator
+ ( -- * Types
+ OrchestratorConfig (..),
+ OrchestratorResult (..),
+
+ -- * Running
+ runOrchestrator,
+
+ -- * Parsing
+ parseWorkOnCommand,
+
+ -- * Task validation
+ validateTaskForWork,
+ TaskInfo (..),
+
+ -- * Testing
+ main,
+ test,
+ )
+where
+
+import Alpha
+import qualified Data.Aeson as Aeson
+import qualified Data.Aeson.KeyMap as KeyMap
+import qualified Data.ByteString.Lazy as BL
+import qualified Data.Text as Text
+import qualified Data.Text.Encoding as TE
+import qualified Data.Text.IO as TextIO
+import qualified Omni.Test as Test
+import qualified System.Exit as Exit
+import System.IO (hClose, hGetLine, hIsEOF)
+import qualified System.Process as Process
+
+main :: IO ()
+main = Test.run test
+
+test :: Test.Tree
+test =
+ Test.group
+ "Omni.Agent.Telegram.Orchestrator"
+ [ Test.unit "parseWorkOnCommand parses 'work on t-123'" <| do
+ parseWorkOnCommand "work on t-123" Test.@=? Just "t-123",
+ Test.unit "parseWorkOnCommand parses 'start t-456'" <| do
+ parseWorkOnCommand "start t-456" Test.@=? Just "t-456",
+ Test.unit "parseWorkOnCommand parses 'work on t-280.2.1'" <| do
+ parseWorkOnCommand "work on t-280.2.1" Test.@=? Just "t-280.2.1",
+ Test.unit "parseWorkOnCommand handles extra whitespace" <| do
+ parseWorkOnCommand " work on t-999 " Test.@=? Just "t-999",
+ Test.unit "parseWorkOnCommand rejects invalid input" <| do
+ parseWorkOnCommand "hello world" Test.@=? Nothing,
+ Test.unit "parseWorkOnCommand rejects missing task id" <| do
+ parseWorkOnCommand "work on" Test.@=? Nothing
+ ]
+
+-- | Configuration for orchestrator run
+data OrchestratorConfig = OrchestratorConfig
+ { -- | Task ID to work on
+ orchTaskId :: Text,
+ -- | Maximum iterations (default 3)
+ orchMaxIterations :: Int,
+ -- | Path to pi-orchestrate.sh
+ orchScriptPath :: FilePath,
+ -- | Callback for progress updates
+ orchOnProgress :: Text -> IO ()
+ }
+ deriving (Generic)
+
+-- | Result of orchestrator run
+data OrchestratorResult
+ = -- | Task completed successfully with commit hash
+ OrchSuccess Text
+ | -- | Task needs help with reason
+ OrchNeedsHelp Text
+ | -- | Orchestrator failed with error
+ OrchFailed Text
+ | -- | Task validation failed
+ OrchInvalidTask Text
+ deriving (Show, Eq, Generic)
+
+-- | Info about a task
+data TaskInfo = TaskInfo
+ { taskTitle :: Text,
+ taskStatus :: Text
+ }
+ deriving (Show, Eq, Generic)
+
+-- | Parse "work on t-XXX" or "start t-XXX" style commands
+-- Returns Just taskId if pattern matches, Nothing otherwise
+parseWorkOnCommand :: Text -> Maybe Text
+parseWorkOnCommand input =
+ let normalized = Text.strip (Text.toLower input)
+ words' = Text.words normalized
+ in case words' of
+ ["work", "on", taskId] -> validateTaskId taskId
+ ["start", taskId] -> validateTaskId taskId
+ _ -> Nothing
+ where
+ validateTaskId tid
+ | "t-" `Text.isPrefixOf` tid = Just tid
+ | otherwise = Nothing
+
+-- | Validate a task exists and is in a workable state (Open, InProgress)
+validateTaskForWork :: Text -> IO (Either Text TaskInfo)
+validateTaskForWork taskId = do
+ (exitCode, stdoutStr, stderrStr) <-
+ Process.readProcessWithExitCode
+ "task"
+ ["show", Text.unpack taskId, "--json"]
+ ""
+ case exitCode of
+ Exit.ExitFailure _ ->
+ pure (Left ("task not found: " <> taskId <> " (" <> Text.pack stderrStr <> ")"))
+ Exit.ExitSuccess -> do
+ case Aeson.decode (BL.fromStrict (TE.encodeUtf8 (Text.pack stdoutStr))) of
+ Nothing -> pure (Left "failed to parse task json")
+ Just obj -> parseTaskJson obj
+ where
+ parseTaskJson :: Aeson.Value -> IO (Either Text TaskInfo)
+ parseTaskJson (Aeson.Object obj) = do
+ let mTitle = case KeyMap.lookup "taskTitle" obj of
+ Just (Aeson.String t) -> Just t
+ _ -> Nothing
+ mStatus = case KeyMap.lookup "taskStatus" obj of
+ Just (Aeson.String s) -> Just s
+ _ -> Nothing
+ case (mTitle, mStatus) of
+ (Just title, Just status) ->
+ if isWorkableStatus status
+ then pure (Right (TaskInfo title status))
+ else pure (Left ("task is in non-workable state: " <> status))
+ _ -> pure (Left "task json missing title or status")
+ parseTaskJson _ = pure (Left "invalid task json structure")
+
+ isWorkableStatus s = s `elem` ["Open", "InProgress", "Todo"]
+
+-- | Run the orchestrator for a task
+-- This spawns pi-orchestrate.sh and streams progress to the callback
+runOrchestrator :: OrchestratorConfig -> IO OrchestratorResult
+runOrchestrator cfg = do
+ -- First validate the task
+ taskResult <- validateTaskForWork (orchTaskId cfg)
+ case taskResult of
+ Left err -> pure (OrchInvalidTask err)
+ Right taskInfo -> do
+ orchOnProgress cfg ("🔧 starting work on " <> orchTaskId cfg <> ": " <> taskTitle taskInfo)
+ runOrchestratorProcess cfg
+
+-- | Actually run the pi-orchestrate.sh process
+runOrchestratorProcess :: OrchestratorConfig -> IO OrchestratorResult
+runOrchestratorProcess cfg = do
+ let args =
+ [ orchScriptPath cfg,
+ Text.unpack (orchTaskId cfg),
+ "--max=" <> show (orchMaxIterations cfg)
+ ]
+
+ -- Create the process with pipes for stdout/stderr
+ let createProc =
+ (Process.proc "bash" args)
+ { Process.std_out = Process.CreatePipe,
+ Process.std_err = Process.CreatePipe
+ }
+
+ result <-
+ try @SomeException <| do
+ (_, Just hout, Just herr, ph) <- Process.createProcess createProc
+
+ -- MVar to collect final result
+ resultVar <- newEmptyMVar
+
+ -- Read and process stdout in a separate thread
+ _ <- forkIO <| streamOutput hout (orchOnProgress cfg) resultVar
+
+ -- Read stderr (just collect it, don't stream)
+ stderrContent <- collectOutput herr
+
+ -- Wait for process to complete
+ exitCode <- Process.waitForProcess ph
+
+ -- Get the parsed result from stdout streaming
+ mParsedResult <- takeMVar resultVar
+
+ case exitCode of
+ Exit.ExitSuccess ->
+ case mParsedResult of
+ Just r -> pure r
+ Nothing -> pure (OrchSuccess "completed")
+ Exit.ExitFailure _ ->
+ case mParsedResult of
+ Just r -> pure r
+ Nothing -> pure (OrchFailed (if Text.null stderrContent then "orchestrator failed" else stderrContent))
+
+ case result of
+ Left err -> pure (OrchFailed ("exception: " <> tshow err))
+ Right r -> pure r
+
+-- | Stream output from a handle, parsing for progress updates
+-- Puts the final result (if detected) into the MVar
+streamOutput :: Handle -> (Text -> IO ()) -> MVar (Maybe OrchestratorResult) -> IO ()
+streamOutput h onProgress resultVar = do
+ mResult <- go Nothing
+ putMVar resultVar mResult
+ where
+ go :: Maybe OrchestratorResult -> IO (Maybe OrchestratorResult)
+ go currentResult = do
+ eof <- hIsEOF h
+ if eof
+ then do
+ hClose h
+ pure currentResult
+ else do
+ line <- Text.pack </ hGetLine h
+ let newResult = parseLine line currentResult
+ -- Send progress for interesting lines
+ when (isProgressLine line) <| onProgress (formatProgress line)
+ go newResult
+
+ parseLine :: Text -> Maybe OrchestratorResult -> Maybe OrchestratorResult
+ parseLine line current
+ | "=== SUCCESS ===" `Text.isInfixOf` line = Just (OrchSuccess "")
+ | "=== FAILURE - Needs Help ===" `Text.isInfixOf` line = Just (OrchNeedsHelp "")
+ | "=== FAILURE - Max Iterations ===" `Text.isInfixOf` line = Just (OrchFailed "max iterations reached")
+ | "Commit: " `Text.isInfixOf` line =
+ let commit = Text.strip (Text.drop 8 (snd (Text.breakOn "Commit: " line)))
+ in case current of
+ Just (OrchSuccess _) -> Just (OrchSuccess commit)
+ _ -> current
+ | "requires human intervention" `Text.isInfixOf` line =
+ Just (OrchNeedsHelp "requires human intervention")
+ | otherwise = current
+
+ isProgressLine :: Text -> Bool
+ isProgressLine line =
+ any
+ (`Text.isInfixOf` line)
+ [ "=== Iteration",
+ "Phase: CODER",
+ "Phase: REVIEWER",
+ "=== SUCCESS",
+ "=== FAILURE",
+ "Commit:"
+ ]
+
+ formatProgress :: Text -> Text
+ formatProgress line
+ | "=== Iteration" `Text.isInfixOf` line =
+ let iterNum = extractIterationNum line
+ in "🔄 iteration " <> iterNum
+ | "Phase: CODER" `Text.isInfixOf` line = "💻 coder running..."
+ | "Phase: REVIEWER" `Text.isInfixOf` line = "🔍 reviewer checking..."
+ | "=== SUCCESS ===" `Text.isInfixOf` line = "✅ done!"
+ | "=== FAILURE" `Text.isInfixOf` line = "❌ failed"
+ | "Commit:" `Text.isInfixOf` line =
+ let commit = Text.strip (Text.drop 8 (snd (Text.breakOn "Commit: " line)))
+ in "📝 commit: " <> commit
+ | otherwise = line
+
+ extractIterationNum :: Text -> Text
+ extractIterationNum line =
+ -- Format is "=== Iteration 1/3 ==="
+ let parts = Text.words line
+ in case drop 1 parts of
+ (numPart : _) -> numPart
+ _ -> "?"
+
+-- | Collect all output from a handle into Text
+collectOutput :: Handle -> IO Text
+collectOutput h = do
+ content <- try @SomeException (TextIO.hGetContents h)
+ case content of
+ Left _ -> pure ""
+ Right t -> pure t