commit 47412d56314053830834a51999facd59e549f363
Author: Ben Sima <ben@bensima.com>
Date: Wed Dec 31 16:02:54 2025
Add Telegram Orchestrator module for streaming task progress
New Omni/Agent/Telegram/Orchestrator.hs module that:
- Runs coder/reviewer loop as async Haskell thread
- Creates a status message that gets edited in-place with progress
- Sends final result as discrete message when complete
- Calls pi-code.sh and pi-review.sh as subprocesses
This enables non-blocking task execution where the main chat thread
continues flowing while tasks run in background.
Next: integrate with work_on_task tool in Omni/Agent/Tools/Tasks.hs
Task-Id: t-298.2
diff --git a/Omni/Agent/Telegram/Orchestrator.hs b/Omni/Agent/Telegram/Orchestrator.hs
index 465551b1..1f42ecf2 100644
--- a/Omni/Agent/Telegram/Orchestrator.hs
+++ b/Omni/Agent/Telegram/Orchestrator.hs
@@ -1,29 +1,36 @@
+{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
+{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude #-}
--- | Orchestrator subprocess management for Telegram bot.
+-- | Orchestrator for running coder/reviewer loop with Telegram status updates.
--
--- Handles spawning pi-orchestrate.sh and reporting progress to users.
+-- Spawns as an async thread that:
+-- 1. Creates a status message in Telegram
+-- 2. Runs pi-code.sh and pi-review.sh as subprocesses
+-- 3. Updates the status message as phases complete
+-- 4. Sends final result as a new message
+--
+-- The main chat thread remains unblocked during execution.
--
-- : out omni-agent-telegram-orchestrator
-- : dep aeson
--- : dep process
+-- : dep async
+-- : dep http-conduit
module Omni.Agent.Telegram.Orchestrator
- ( -- * Types
+ ( -- * Configuration
OrchestratorConfig (..),
- OrchestratorResult (..),
+ defaultConfig,
+
+ -- * Status
+ OrchestratorPhase (..),
+ formatPhase,
-- * Running
+ spawnOrchestrator,
runOrchestrator,
- -- * Parsing
- parseWorkOnCommand,
-
- -- * Task validation
- validateTaskForWork,
- TaskInfo (..),
-
-- * Testing
main,
test,
@@ -31,264 +38,375 @@ module Omni.Agent.Telegram.Orchestrator
where
import Alpha
+import qualified Control.Concurrent.Async as Async
import qualified Data.Aeson as Aeson
-import qualified Data.Aeson.KeyMap as KeyMap
-import qualified Data.ByteString.Lazy as BL
+import Data.Aeson ((.=), (.:))
import qualified Data.Text as Text
-import qualified Data.Text.Encoding as TE
import qualified Data.Text.IO as TextIO
+import qualified Network.HTTP.Simple as HTTP
+import qualified Omni.Agent.Telegram.Types as Types
+import qualified Omni.Task.Core as Task
import qualified Omni.Test as Test
import qualified System.Exit as Exit
-import System.IO (hClose, hGetLine, hIsEOF)
import qualified System.Process as Process
+import qualified System.Timeout as Timeout
-main :: IO ()
-main = Test.run test
-
-test :: Test.Tree
-test =
- Test.group
- "Omni.Agent.Telegram.Orchestrator"
- [ Test.unit "parseWorkOnCommand parses 'work on t-123'" <| do
- parseWorkOnCommand "work on t-123" Test.@=? Just "t-123",
- Test.unit "parseWorkOnCommand parses 'start t-456'" <| do
- parseWorkOnCommand "start t-456" Test.@=? Just "t-456",
- Test.unit "parseWorkOnCommand parses 'work on t-280.2.1'" <| do
- parseWorkOnCommand "work on t-280.2.1" Test.@=? Just "t-280.2.1",
- Test.unit "parseWorkOnCommand handles extra whitespace" <| do
- parseWorkOnCommand " work on t-999 " Test.@=? Just "t-999",
- Test.unit "parseWorkOnCommand rejects invalid input" <| do
- parseWorkOnCommand "hello world" Test.@=? Nothing,
- Test.unit "parseWorkOnCommand rejects missing task id" <| do
- parseWorkOnCommand "work on" Test.@=? Nothing,
- Test.unit "parseWorkOnCommand finds pattern in sentence" <| do
- parseWorkOnCommand "Hey can you work on t-291 please" Test.@=? Just "t-291",
- Test.unit "parseWorkOnCommand finds start pattern in sentence" <| do
- parseWorkOnCommand "Please start t-123 now" Test.@=? Just "t-123"
- ]
-
--- | Configuration for orchestrator run
+-- | Configuration for an orchestrator run
data OrchestratorConfig = OrchestratorConfig
- { -- | Task ID to work on
- orchTaskId :: Text,
- -- | Maximum iterations (default 3)
+ { orchTaskId :: Text,
+ orchChatId :: Int,
+ orchThreadId :: Maybe Int,
orchMaxIterations :: Int,
- -- | Path to pi-orchestrate.sh
- orchScriptPath :: FilePath,
- -- | Callback for progress updates
- orchOnProgress :: Text -> IO ()
+ orchCoderTimeout :: Int, -- seconds
+ orchReviewerTimeout :: Int, -- seconds
+ orchWorkDir :: FilePath
}
- deriving (Generic)
+ deriving (Show, Generic)
--- | Result of orchestrator run
-data OrchestratorResult
- = -- | Task completed successfully with commit hash
- OrchSuccess Text
- | -- | Task needs help with reason
- OrchNeedsHelp Text
- | -- | Orchestrator failed with error
- OrchFailed Text
- | -- | Task validation failed
- OrchInvalidTask Text
- deriving (Show, Eq, Generic)
+-- | Default configuration
+defaultConfig :: Text -> Int -> OrchestratorConfig
+defaultConfig taskId chatId =
+ OrchestratorConfig
+ { orchTaskId = taskId,
+ orchChatId = chatId,
+ orchThreadId = Nothing,
+ orchMaxIterations = 3,
+ orchCoderTimeout = 600,
+ orchReviewerTimeout = 300,
+ orchWorkDir = "/home/ben/omni/ava"
+ }
--- | Info about a task
-data TaskInfo = TaskInfo
- { taskTitle :: Text,
- taskStatus :: Text
- }
+-- | Current phase of the orchestrator
+data OrchestratorPhase
+ = PhaseStarting
+ | PhaseCoder Int -- iteration
+ | PhaseCoderDone Int
+ | PhaseVerifyBuild Int
+ | PhaseBuildPassed Int
+ | PhaseBuildFailed Int Text
+ | PhaseReviewer Int
+ | PhaseReviewerApproved
+ | PhaseReviewerRejected Int Text
+ | PhaseCommitting
+ | PhaseComplete Text -- commit hash
+ | PhaseFailed Text
deriving (Show, Eq, Generic)
--- | Parse "work on t-XXX" or "start t-XXX" style commands
--- Searches for the pattern anywhere in the message
--- Returns Just taskId if pattern found, Nothing otherwise
-parseWorkOnCommand :: Text -> Maybe Text
-parseWorkOnCommand input =
- let normalized = Text.toLower input
- words' = Text.words normalized
- -- Look for "work on t-XXX" pattern
- findWorkOn [] = Nothing
- findWorkOn ("work" : "on" : taskId : _) = validateTaskId taskId
- findWorkOn ("start" : taskId : _) = validateTaskId taskId
- findWorkOn (_ : rest) = findWorkOn rest
- in findWorkOn words'
- where
- validateTaskId tid
- | "t-" `Text.isPrefixOf` tid = Just tid
- | otherwise = Nothing
-
--- | Validate a task exists and is in a workable state (Open, InProgress)
-validateTaskForWork :: Text -> IO (Either Text TaskInfo)
-validateTaskForWork taskId = do
- (exitCode, stdoutStr, stderrStr) <-
- Process.readProcessWithExitCode
- "task"
- ["show", Text.unpack taskId, "--json"]
- ""
- case exitCode of
- Exit.ExitFailure _ ->
- pure (Left ("task not found: " <> taskId <> " (" <> Text.pack stderrStr <> ")"))
- Exit.ExitSuccess -> do
- case Aeson.decode (BL.fromStrict (TE.encodeUtf8 (Text.pack stdoutStr))) of
- Nothing -> pure (Left "failed to parse task json")
- Just obj -> parseTaskJson obj
- where
- parseTaskJson :: Aeson.Value -> IO (Either Text TaskInfo)
- parseTaskJson (Aeson.Object obj) = do
- let mTitle = case KeyMap.lookup "taskTitle" obj of
- Just (Aeson.String t) -> Just t
- _ -> Nothing
- mStatus = case KeyMap.lookup "taskStatus" obj of
- Just (Aeson.String s) -> Just s
- _ -> Nothing
- case (mTitle, mStatus) of
- (Just title, Just status) ->
- if isWorkableStatus status
- then pure (Right (TaskInfo title status))
- else pure (Left ("task is in non-workable state: " <> status))
- _ -> pure (Left "task json missing title or status")
- parseTaskJson _ = pure (Left "invalid task json structure")
-
- isWorkableStatus s = s `elem` ["Open", "InProgress", "Todo"]
-
--- | Run the orchestrator for a task
--- This spawns pi-orchestrate.sh and streams progress to the callback
-runOrchestrator :: OrchestratorConfig -> IO OrchestratorResult
-runOrchestrator cfg = do
- -- First validate the task
- taskResult <- validateTaskForWork (orchTaskId cfg)
- case taskResult of
- Left err -> pure (OrchInvalidTask err)
- Right taskInfo -> do
- orchOnProgress cfg ("🔧 starting work on " <> orchTaskId cfg <> ": " <> taskTitle taskInfo)
- runOrchestratorProcess cfg
-
--- | Actually run the pi-orchestrate.sh process
-runOrchestratorProcess :: OrchestratorConfig -> IO OrchestratorResult
-runOrchestratorProcess cfg = do
- let args =
- [ orchScriptPath cfg,
- Text.unpack (orchTaskId cfg),
- "--max=" <> show (orchMaxIterations cfg)
- ]
+-- | Format a phase as user-friendly status text
+formatPhase :: Text -> Int -> OrchestratorPhase -> Text
+formatPhase tid maxIter = \case
+ PhaseStarting ->
+ "🔧 " <> tid <> ": Starting orchestrator..."
+ PhaseCoder n ->
+ "🔧 " <> tid <> ": Running coder... (iteration " <> tshow n <> "/" <> tshow maxIter <> ")"
+ PhaseCoderDone n ->
+ "🔧 " <> tid <> ": Coder complete (iteration " <> tshow n <> "/" <> tshow maxIter <> ")"
+ PhaseVerifyBuild n ->
+ "🔧 " <> tid <> ": Verifying build... (iteration " <> tshow n <> "/" <> tshow maxIter <> ")"
+ PhaseBuildPassed n ->
+ "🔧 " <> tid <> ": Build passed (iteration " <> tshow n <> "/" <> tshow maxIter <> ")"
+ PhaseBuildFailed n err ->
+ "⚠️ " <> tid <> ": Build failed (iteration " <> tshow n <> "/" <> tshow maxIter <> "): " <> Text.take 100 err
+ PhaseReviewer n ->
+ "🔧 " <> tid <> ": Running reviewer... (iteration " <> tshow n <> "/" <> tshow maxIter <> ")"
+ PhaseReviewerApproved ->
+ "🔧 " <> tid <> ": Reviewer approved, committing..."
+ PhaseReviewerRejected n reason ->
+ "⚠️ " <> tid <> ": Reviewer requested changes (iteration " <> tshow n <> "/" <> tshow maxIter <> "): " <> Text.take 100 reason
+ PhaseCommitting ->
+ "🔧 " <> tid <> ": Committing changes..."
+ PhaseComplete commit ->
+ "✅ " <> tid <> " complete! Commit: " <> commit
+ PhaseFailed err ->
+ "❌ " <> tid <> " failed: " <> Text.take 200 err
- -- Create the process with pipes for stdout/stderr
- let createProc =
- (Process.proc "bash" args)
- { Process.std_out = Process.CreatePipe,
- Process.std_err = Process.CreatePipe,
- Process.cwd = Just "/home/ben/omni/ava"
- }
-
- result <-
- try @SomeException <| do
- (_, Just hout, Just herr, ph) <- Process.createProcess createProc
-
- -- MVar to collect final result
- resultVar <- newEmptyMVar
+-- | Spawn orchestrator as async thread, returns immediately
+spawnOrchestrator :: Types.TelegramConfig -> OrchestratorConfig -> IO (Async.Async ())
+spawnOrchestrator tgCfg cfg = Async.async <| do
+ -- Create initial status message
+ mMsgId <- createStatusMessage tgCfg (orchChatId cfg) (orchThreadId cfg)
+ (formatPhase (orchTaskId cfg) (orchMaxIterations cfg) PhaseStarting)
+
+ case mMsgId of
+ Nothing -> do
+ -- Failed to create status message, send error
+ _ <- sendMessage tgCfg (orchChatId cfg) (orchThreadId cfg)
+ ("❌ Failed to start orchestrator for " <> orchTaskId cfg <> ": couldn't create status message")
+ pure ()
+ Just msgId -> do
+ -- Run orchestrator with status updates
+ result <- try @SomeException (runOrchestrator tgCfg cfg msgId)
+ case result of
+ Left err -> do
+ -- Orchestrator crashed
+ updateStatusMessage tgCfg (orchChatId cfg) msgId
+ (formatPhase (orchTaskId cfg) (orchMaxIterations cfg) (PhaseFailed (tshow err)))
+ _ <- sendMessage tgCfg (orchChatId cfg) (orchThreadId cfg)
+ ("❌ " <> orchTaskId cfg <> " orchestrator crashed: " <> Text.take 200 (tshow err))
+ pure ()
+ Right _ -> pure ()
- -- Read and process stdout in a separate thread
- _ <- forkIO <| streamOutput hout (orchOnProgress cfg) resultVar
+-- | Run the orchestrator loop
+runOrchestrator :: Types.TelegramConfig -> OrchestratorConfig -> Int -> IO ()
+runOrchestrator tgCfg cfg statusMsgId = do
+ let tid = orchTaskId cfg
+ maxIter = orchMaxIterations cfg
+ updatePhase p = updateStatusMessage tgCfg (orchChatId cfg) statusMsgId
+ (formatPhase tid maxIter p)
+ sendResult msg = sendMessage tgCfg (orchChatId cfg) (orchThreadId cfg) msg
+
+ -- Run iterations
+ result <- runLoop 1
+
+ -- Send final result as new message
+ case result of
+ Left err -> do
+ updatePhase (PhaseFailed err)
+ _ <- sendResult ("❌ " <> tid <> " failed after " <> tshow maxIter <> " iterations: " <> err)
+ pure ()
+ Right commit -> do
+ updatePhase (PhaseComplete commit)
+ _ <- sendResult ("✅ " <> tid <> " complete! Commit: " <> commit)
+ -- Mark task as done
+ Task.updateTaskStatus tid Task.Done []
+ pure ()
+ where
+ runLoop :: Int -> IO (Either Text Text)
+ runLoop iteration
+ | iteration > orchMaxIterations cfg =
+ pure (Left "Max iterations exceeded")
+ | otherwise = do
+ let updatePhase' p = updateStatusMessage tgCfg (orchChatId cfg) statusMsgId
+ (formatPhase (orchTaskId cfg) (orchMaxIterations cfg) p)
+
+ -- Phase: Coder
+ updatePhase' (PhaseCoder iteration)
+ coderResult <- runCoder cfg
+
+ case coderResult of
+ Left err -> pure (Left ("Coder failed: " <> err))
+ Right () -> do
+ updatePhase' (PhaseCoderDone iteration)
+
+ -- Phase: Verify build
+ updatePhase' (PhaseVerifyBuild iteration)
+ buildResult <- runBuild cfg
+
+ case buildResult of
+ Left err -> do
+ updatePhase' (PhaseBuildFailed iteration err)
+ -- Retry with next iteration
+ runLoop (iteration + 1)
+ Right () -> do
+ updatePhase' (PhaseBuildPassed iteration)
+
+ -- Phase: Reviewer
+ updatePhase' (PhaseReviewer iteration)
+ reviewResult <- runReviewer cfg
+
+ case reviewResult of
+ Left err -> do
+ updatePhase' (PhaseReviewerRejected iteration err)
+ -- Retry with next iteration
+ runLoop (iteration + 1)
+ Right commit -> do
+ updatePhase' PhaseReviewerApproved
+ pure (Right commit)
- -- Read stderr (just collect it, don't stream)
- stderrContent <- collectOutput herr
+-- | Run the coder subprocess (pi-code.sh)
+runCoder :: OrchestratorConfig -> IO (Either Text ())
+runCoder cfg = do
+ let proc = (Process.proc "./Omni/Ide/pi-code.sh" [Text.unpack (orchTaskId cfg)])
+ { Process.cwd = Just (orchWorkDir cfg),
+ Process.std_out = Process.CreatePipe,
+ Process.std_err = Process.CreatePipe
+ }
+
+ (_, _mOut, mErr, ph) <- Process.createProcess proc
+
+ -- Wait with timeout
+ let timeoutMicros = orchCoderTimeout cfg * 1000000
+ mExit <- Timeout.timeout timeoutMicros (Process.waitForProcess ph)
+
+ case mExit of
+ Nothing -> do
+ -- Timeout - kill process
+ Process.terminateProcess ph
+ pure (Left "Coder timeout")
+ Just Exit.ExitSuccess -> pure (Right ())
+ Just (Exit.ExitFailure code) -> do
+ errOutput <- case mErr of
+ Just h -> TextIO.hGetContents h
+ Nothing -> pure ""
+ pure (Left ("Exit code " <> tshow code <> ": " <> Text.take 500 errOutput))
- -- Wait for process to complete
+-- | Run build verification (bild)
+runBuild :: OrchestratorConfig -> IO (Either Text ())
+runBuild cfg = do
+ -- Get task namespace for build target
+ allTasks <- Task.loadTasks
+ case Task.findTask (orchTaskId cfg) allTasks of
+ Nothing -> pure (Left "Task not found")
+ Just task -> do
+ let namespace = fromMaybe "Omni/Ava.hs" (Task.taskNamespace task)
+ proc = (Process.proc "bild" [Text.unpack namespace])
+ { Process.cwd = Just (orchWorkDir cfg),
+ Process.std_out = Process.CreatePipe,
+ Process.std_err = Process.CreatePipe
+ }
+
+ (_, _, mErr, ph) <- Process.createProcess proc
exitCode <- Process.waitForProcess ph
-
- -- Get the parsed result from stdout streaming
- mParsedResult <- takeMVar resultVar
-
+
case exitCode of
- Exit.ExitSuccess ->
- case mParsedResult of
- Just r -> pure r
- Nothing -> pure (OrchSuccess "completed")
- Exit.ExitFailure _ ->
- case mParsedResult of
- Just r -> pure r
- Nothing -> pure (OrchFailed (if Text.null stderrContent then "orchestrator failed" else stderrContent))
+ Exit.ExitSuccess -> pure (Right ())
+ Exit.ExitFailure code -> do
+ errOutput <- case mErr of
+ Just h -> TextIO.hGetContents h
+ Nothing -> pure ""
+ pure (Left ("Build failed (exit " <> tshow code <> "): " <> Text.take 500 errOutput))
- case result of
- Left err -> pure (OrchFailed ("exception: " <> tshow err))
- Right r -> pure r
+-- | Run the reviewer subprocess (pi-review.sh)
+-- Returns Right commitHash on approval, Left reason on rejection
+runReviewer :: OrchestratorConfig -> IO (Either Text Text)
+runReviewer cfg = do
+ let proc = (Process.proc "./Omni/Ide/pi-review.sh" [Text.unpack (orchTaskId cfg)])
+ { Process.cwd = Just (orchWorkDir cfg),
+ Process.std_out = Process.CreatePipe,
+ Process.std_err = Process.CreatePipe
+ }
+
+ (_, _mOut, mErr, ph) <- Process.createProcess proc
+
+ -- Wait with timeout
+ let timeoutMicros = orchReviewerTimeout cfg * 1000000
+ mExit <- Timeout.timeout timeoutMicros (Process.waitForProcess ph)
+
+ case mExit of
+ Nothing -> do
+ Process.terminateProcess ph
+ pure (Left "Reviewer timeout")
+ Just Exit.ExitSuccess -> do
+ -- Get commit hash from git
+ (_, Just hOut, _, ph') <- Process.createProcess
+ (Process.proc "git" ["rev-parse", "--short", "HEAD"])
+ { Process.cwd = Just (orchWorkDir cfg),
+ Process.std_out = Process.CreatePipe
+ }
+ _ <- Process.waitForProcess ph'
+ commit <- Text.strip <$> TextIO.hGetContents hOut
+ pure (Right commit)
+ Just (Exit.ExitFailure code) -> do
+ errOutput <- case mErr of
+ Just h -> TextIO.hGetContents h
+ Nothing -> pure ""
+ pure (Left ("Reviewer rejected (exit " <> tshow code <> "): " <> Text.take 500 errOutput))
--- | Stream output from a handle, parsing for progress updates
--- Puts the final result (if detected) into the MVar
-streamOutput :: Handle -> (Text -> IO ()) -> MVar (Maybe OrchestratorResult) -> IO ()
-streamOutput h onProgress resultVar = do
- mResult <- go Nothing
- putMVar resultVar mResult
- where
- go :: Maybe OrchestratorResult -> IO (Maybe OrchestratorResult)
- go currentResult = do
- eof <- hIsEOF h
- if eof
+-- | Create a status message, returns message_id
+createStatusMessage :: Types.TelegramConfig -> Int -> Maybe Int -> Text -> IO (Maybe Int)
+createStatusMessage cfg chatId mThreadId text = do
+ let url =
+ Text.unpack (Types.tgApiBaseUrl cfg)
+ <> "/bot"
+ <> Text.unpack (Types.tgBotToken cfg)
+ <> "/sendMessage"
+ body =
+ Aeson.object <|
+ [ "chat_id" .= chatId,
+ "text" .= text
+ ]
+ ++ maybe [] (\tid -> ["message_thread_id" .= tid]) mThreadId
+
+ req0 <- HTTP.parseRequest url
+ let req =
+ HTTP.setRequestMethod "POST"
+ <| HTTP.setRequestHeader "Content-Type" ["application/json"]
+ <| HTTP.setRequestBodyLBS (Aeson.encode body)
+ <| req0
+
+ result <- try @SomeException (HTTP.httpLBS req)
+ case result of
+ Left _ -> pure Nothing
+ Right response -> do
+ let status = HTTP.getResponseStatusCode response
+ if status >= 200 && status < 300
then do
- hClose h
- pure currentResult
- else do
- line <- Text.pack </ hGetLine h
- let newResult = parseLine line currentResult
- -- Send progress for interesting lines
- when (isProgressLine line) <| onProgress (formatProgress line)
- go newResult
+ -- Parse message_id from response
+ let respBody = HTTP.getResponseBody response
+ case Aeson.decode respBody of
+ Just resp -> pure (tgRespMessageId resp)
+ Nothing -> pure Nothing
+ else pure Nothing
- parseLine :: Text -> Maybe OrchestratorResult -> Maybe OrchestratorResult
- parseLine line current
- | "=== SUCCESS ===" `Text.isInfixOf` line = Just (OrchSuccess "")
- | "=== FAILURE - Needs Help ===" `Text.isInfixOf` line = Just (OrchNeedsHelp "")
- | "=== FAILURE - Max Iterations ===" `Text.isInfixOf` line = Just (OrchFailed "max iterations reached")
- | "=== FAILURE - Total Timeout ===" `Text.isInfixOf` line = Just (OrchFailed "total timeout exceeded")
- | "Commit: " `Text.isInfixOf` line =
- let commit = Text.strip (Text.drop 8 (snd (Text.breakOn "Commit: " line)))
- in case current of
- Just (OrchSuccess _) -> Just (OrchSuccess commit)
- _ -> current
- | "requires human intervention" `Text.isInfixOf` line =
- Just (OrchNeedsHelp "requires human intervention")
- | otherwise = current
+-- | Telegram API response for sendMessage
+data TelegramResponse = TelegramResponse
+ { tgRespOk :: Bool,
+ tgRespMessageId :: Maybe Int
+ }
+ deriving (Show, Generic)
+
+instance Aeson.FromJSON TelegramResponse where
+ parseJSON = Aeson.withObject "TelegramResponse" <| \v -> do
+ ok <- v .: "ok"
+ mResult <- v Aeson..:? "result"
+ msgId <- case mResult of
+ Just res -> res Aeson..:? "message_id"
+ Nothing -> pure Nothing
+ pure TelegramResponse {tgRespOk = ok, tgRespMessageId = msgId}
- isProgressLine :: Text -> Bool
- isProgressLine line =
- any
- (`Text.isInfixOf` line)
- [ "=== Iteration",
- "Phase: CODER",
- "Phase: REVIEWER",
- "=== SUCCESS",
- "=== FAILURE",
- "Commit:",
- "timed out after"
- ]
+-- | Update an existing status message
+updateStatusMessage :: Types.TelegramConfig -> Int -> Int -> Text -> IO ()
+updateStatusMessage cfg chatId messageId text = do
+ let url =
+ Text.unpack (Types.tgApiBaseUrl cfg)
+ <> "/bot"
+ <> Text.unpack (Types.tgBotToken cfg)
+ <> "/editMessageText"
+ body =
+ Aeson.object
+ [ "chat_id" .= chatId,
+ "message_id" .= messageId,
+ "text" .= text
+ ]
+
+ req0 <- HTTP.parseRequest url
+ let req =
+ HTTP.setRequestMethod "POST"
+ <| HTTP.setRequestHeader "Content-Type" ["application/json"]
+ <| HTTP.setRequestBodyLBS (Aeson.encode body)
+ <| req0
+
+ result <- try @SomeException (HTTP.httpLBS req)
+ case result of
+ Left err -> putText <| "Edit message failed: " <> tshow err
+ Right response -> do
+ let status = HTTP.getResponseStatusCode response
+ when (status < 200 || status >= 300) <| do
+ putText <| "Edit message HTTP " <> tshow status
- formatProgress :: Text -> Text
- formatProgress line
- | "=== Iteration" `Text.isInfixOf` line =
- let iterNum = extractIterationNum line
- in "🔄 iteration " <> iterNum
- | "Phase: CODER" `Text.isInfixOf` line = "💻 coder running..."
- | "Phase: REVIEWER" `Text.isInfixOf` line = "🔍 reviewer checking..."
- | "=== SUCCESS ===" `Text.isInfixOf` line = "✅ done!"
- | "=== FAILURE" `Text.isInfixOf` line = "❌ failed"
- | "Commit:" `Text.isInfixOf` line =
- let commit = Text.strip (Text.drop 8 (snd (Text.breakOn "Commit: " line)))
- in "📝 commit: " <> commit
- | "timed out after" `Text.isInfixOf` line = "⏰ timeout!"
- | otherwise = line
+-- | Send a new message
+sendMessage :: Types.TelegramConfig -> Int -> Maybe Int -> Text -> IO (Maybe Int)
+sendMessage = createStatusMessage -- Same implementation
- extractIterationNum :: Text -> Text
- extractIterationNum line =
- -- Format is "=== Iteration 1/3 ==="
- let parts = Text.words line
- in case drop 1 parts of
- (numPart : _) -> numPart
- _ -> "?"
+-- | Main for testing
+main :: IO ()
+main = Test.run test
--- | Collect all output from a handle into Text
-collectOutput :: Handle -> IO Text
-collectOutput h = do
- content <- try @SomeException (TextIO.hGetContents h)
- case content of
- Left _ -> pure ""
- Right t -> pure t
+-- | Tests
+test :: Test.Tree
+test =
+ Test.group
+ "Omni.Agent.Telegram.Orchestrator"
+ [ Test.unit "formatPhase Starting" <| do
+ formatPhase "t-123" 3 PhaseStarting
+ Test.@?= "🔧 t-123: Starting orchestrator...",
+ Test.unit "formatPhase Coder" <| do
+ formatPhase "t-123" 3 (PhaseCoder 2)
+ Test.@?= "🔧 t-123: Running coder... (iteration 2/3)",
+ Test.unit "formatPhase Complete" <| do
+ formatPhase "t-123" 3 (PhaseComplete "abc1234")
+ Test.@?= "✅ t-123 complete! Commit: abc1234",
+ Test.unit "formatPhase Failed" <| do
+ formatPhase "t-123" 3 (PhaseFailed "some error")
+ Test.@?= "❌ t-123 failed: some error"
+ ]