Rewrite the orchestrator integration so that: 1. Orchestrator runs as an async Haskell thread (not detached shell script) 2. Progress updates are streamed to a single editable Telegram message 3. Final result is sent as a discrete message in the conversation flow 4. Main chat thread remains unblocked during orchestrator execution
User: "work on t-296"
Ava: "on it, starting the coder/reviewer loop for t-296"
[Status message appears, gets edited in-place as work progresses:]
"🔧 t-296: Starting coder... (iteration 1/3)"
"🔧 t-296: Coder complete, verifying build..."
"🔧 t-296: Build passed, running reviewer..."
"🔧 t-296: Reviewer approved, committing..."
[Final discrete message when complete:]
Ava: "✅ t-296 complete! Fixed the day rollover bug. Commit: a97dbbd"
[Conversation continues normally - user could chat during the above]
Long-running orchestrator must NOT block the main chat thread. User should be able to continue chatting with Ava while tasks run in background.
New module with types and orchestrator logic:
data OrchestratorConfig = OrchestratorConfig
{ orchTaskId :: Text
, orchChatId :: Int
, orchThreadId :: Maybe Int -- Telegram message thread
, orchMaxIterations :: Int -- default 3
, orchCoderTimeout :: Int -- seconds, default 600
, orchReviewerTimeout :: Int -- seconds, default 300
}
data OrchestratorPhase
= PhaseStarting
| PhaseCoder Int -- iteration number
| PhaseVerifyBuild Int
| PhaseReviewer Int
| PhaseCommitting
| PhaseComplete Text -- commit hash
| PhaseFailed Text -- error message
-- Spawn orchestrator as async thread, returns immediately
spawnOrchestrator :: Types.TelegramConfig -> OrchestratorConfig -> IO ThreadId
-- Format phase as user-friendly status text
formatStatus :: Text -> OrchestratorPhase -> Text
-- e.g. "🔧 t-296: Running coder... (iteration 1/3)"
-- Create initial status message, returns message_id for later edits
createStatusMessage :: Types.TelegramConfig -> Int -> Maybe Int -> Text -> IO (Maybe Int)
-- Update existing status message (edit in place)
updateStatusMessage :: Types.TelegramConfig -> Int -> Int -> Text -> IO ()
-- Send final result as NEW message (appears in conversation flow)
sendResultMessage :: Types.TelegramConfig -> Int -> Maybe Int -> Text -> IO ()
runOrchestrator :: Types.TelegramConfig -> OrchestratorConfig -> Int -> IO ()
runOrchestrator tgCfg cfg statusMsgId = do
let tid = orchTaskId cfg
chatId = orchChatId cfg
update phase = updateStatusMessage tgCfg chatId statusMsgId (formatStatus tid phase)
result <- runLoop 1
case result of
Left err -> sendResultMessage tgCfg chatId (orchThreadId cfg) ("❌ " <> tid <> " failed: " <> err)
Right commit -> sendResultMessage tgCfg chatId (orchThreadId cfg) ("✅ " <> tid <> " complete! Commit: " <> commit)
where
runLoop iteration
| iteration > orchMaxIterations cfg = pure (Left "Max iterations exceeded")
| otherwise = do
update (PhaseCoder iteration)
coderResult <- runCoderSubprocess cfg
case coderResult of
Left err -> pure (Left err)
Right () -> do
update (PhaseVerifyBuild iteration)
buildResult <- runBuildVerify cfg
case buildResult of
Left err -> runLoop (iteration + 1) -- retry
Right () -> do
update (PhaseReviewer iteration)
reviewResult <- runReviewerSubprocess cfg
case reviewResult of
ReviewerApproved commit -> pure (Right commit)
ReviewerRejected reason -> runLoop (iteration + 1)
Run existing shell scripts via System.Process:
runCoderSubprocess :: OrchestratorConfig -> IO (Either Text ())
runCoderSubprocess cfg = do
let args = [Text.unpack (orchTaskId cfg)]
proc = (Process.proc "./Omni/Ide/pi-code.sh" args)
{ Process.cwd = Just "/home/ben/omni/ava"
, Process.std_out = Process.CreatePipe
, Process.std_err = Process.CreatePipe
}
(_, _, mErr, ph) <- Process.createProcess proc
exitCode <- timeout (orchCoderTimeout cfg * 1000000) (Process.waitForProcess ph)
case exitCode of
Nothing -> pure (Left "Coder timeout")
Just ExitSuccess -> pure (Right ())
Just (ExitFailure _) -> Left <$> maybe (pure "Unknown error") hGetContents mErr
Modify Omni/Agent/Tools/Tasks.hs:
-- Need to pass TelegramConfig and chat context to the tool
executeWorkOnTask :: Types.TelegramConfig -> Int -> Maybe Int -> Aeson.Value -> IO Aeson.Value
executeWorkOnTask tgCfg chatId mThreadId v = do
case Aeson.fromJSON v of
Aeson.Error e -> pure $ Aeson.object ["error" .= e]
Aeson.Success args -> do
let tid = taskId args
-- Validate task exists and is workable (existing code)
...
-- Create status message
mStatusMsgId <- createStatusMessage tgCfg chatId mThreadId
("🔧 " <> tid <> ": Starting orchestrator...")
case mStatusMsgId of
Nothing -> pure $ Aeson.object ["error" .= "Failed to create status message"]
Just statusMsgId -> do
-- Spawn async orchestrator
let config = OrchestratorConfig tid chatId mThreadId 3 600 300
_ <- forkIO $ runOrchestrator tgCfg config statusMsgId
-- Return immediately (non-blocking)
pure $ Aeson.object
[ "success" .= True
, "task_id" .= tid
, "message" .= ("Orchestrator started for " <> tid <> ". I'll update you on progress.")
]
Omni/Agent/Telegram.editMessage - already exists for editing messagesOmni/Agent/Telegram.sendMessage - for final resultOmni/Agent/Telegram/Types.TelegramConfig - config with bot tokenOmni/Task/Core.hs - task loading/validationOmni/Ide/pi-code.sh, pi-review.sh - keep using these for actual workforkIO to spawn orchestrator thread1. CREATE Omni/Agent/Telegram/Orchestrator.hs - new module
2. MODIFY Omni/Agent/Tools/Tasks.hs - update work_on_task
3. MODIFY Omni/Agent/Telegram.hs - may need to export editMessage or add helpers
1. Unit tests for formatStatus 2. Test subprocess timeout handling 3. Manual E2E: send "work on t-XXX" via Telegram, verify:
Current state (2025-12-31): work_on_task spawns pi-orchestrate.sh as detached process. The orchestrator runs but there's no feedback to Ava/Telegram until manual inspection. editMessage function exists in Omni/Agent/Telegram.hs (line 415). Ready for implementation.
Implementation complete. Created Omni/Agent/Telegram/Orchestrator.hs and integrated with work_on_task tool. Ready for E2E testing via Telegram.
Requires architectural decision on IPC mechanism. Options: (1) pass chat_id to orchestrator and have it call Telegram API directly, (2) status file polling, (3) unix socket or named pipe. Deferring for now - the orchestrator logs are sufficient for debugging.