Stream orchestrator progress to Telegram

t-298.2·WorkTask·
·
·
·Omni/Agent/Tools/Tasks.hs
Parent:t-298·Created1 month ago·Updated1 month ago

Description

Edit

Overview

Rewrite the orchestrator integration so that: 1. Orchestrator runs as an async Haskell thread (not detached shell script) 2. Progress updates are streamed to a single editable Telegram message 3. Final result is sent as a discrete message in the conversation flow 4. Main chat thread remains unblocked during orchestrator execution

UX Flow

User: "work on t-296"
Ava: "on it, starting the coder/reviewer loop for t-296"

[Status message appears, gets edited in-place as work progresses:]
"🔧 t-296: Starting coder... (iteration 1/3)"
"🔧 t-296: Coder complete, verifying build..."
"🔧 t-296: Build passed, running reviewer..."
"🔧 t-296: Reviewer approved, committing..."

[Final discrete message when complete:]
Ava: "✅ t-296 complete! Fixed the day rollover bug. Commit: a97dbbd"

[Conversation continues normally - user could chat during the above]

Key Constraint

Long-running orchestrator must NOT block the main chat thread. User should be able to continue chatting with Ava while tasks run in background.

Implementation Plan

1. Create Omni/Agent/Telegram/Orchestrator.hs

New module with types and orchestrator logic:

data OrchestratorConfig = OrchestratorConfig
  { orchTaskId :: Text
  , orchChatId :: Int
  , orchThreadId :: Maybe Int  -- Telegram message thread
  , orchMaxIterations :: Int   -- default 3
  , orchCoderTimeout :: Int    -- seconds, default 600
  , orchReviewerTimeout :: Int -- seconds, default 300
  }

data OrchestratorPhase
  = PhaseStarting
  | PhaseCoder Int           -- iteration number  
  | PhaseVerifyBuild Int
  | PhaseReviewer Int
  | PhaseCommitting
  | PhaseComplete Text       -- commit hash
  | PhaseFailed Text         -- error message

-- Spawn orchestrator as async thread, returns immediately
spawnOrchestrator :: Types.TelegramConfig -> OrchestratorConfig -> IO ThreadId

-- Format phase as user-friendly status text
formatStatus :: Text -> OrchestratorPhase -> Text
-- e.g. "🔧 t-296: Running coder... (iteration 1/3)"

2. Status Message Management

-- Create initial status message, returns message_id for later edits
createStatusMessage :: Types.TelegramConfig -> Int -> Maybe Int -> Text -> IO (Maybe Int)

-- Update existing status message (edit in place)
updateStatusMessage :: Types.TelegramConfig -> Int -> Int -> Text -> IO ()

-- Send final result as NEW message (appears in conversation flow)
sendResultMessage :: Types.TelegramConfig -> Int -> Maybe Int -> Text -> IO ()

3. Orchestrator Loop

runOrchestrator :: Types.TelegramConfig -> OrchestratorConfig -> Int -> IO ()
runOrchestrator tgCfg cfg statusMsgId = do
  let tid = orchTaskId cfg
      chatId = orchChatId cfg
      update phase = updateStatusMessage tgCfg chatId statusMsgId (formatStatus tid phase)
  
  result <- runLoop 1
  case result of
    Left err -> sendResultMessage tgCfg chatId (orchThreadId cfg) ("❌ " <> tid <> " failed: " <> err)
    Right commit -> sendResultMessage tgCfg chatId (orchThreadId cfg) ("✅ " <> tid <> " complete! Commit: " <> commit)
  where
    runLoop iteration
      | iteration > orchMaxIterations cfg = pure (Left "Max iterations exceeded")
      | otherwise = do
          update (PhaseCoder iteration)
          coderResult <- runCoderSubprocess cfg
          case coderResult of
            Left err -> pure (Left err)
            Right () -> do
              update (PhaseVerifyBuild iteration)
              buildResult <- runBuildVerify cfg
              case buildResult of
                Left err -> runLoop (iteration + 1)  -- retry
                Right () -> do
                  update (PhaseReviewer iteration)
                  reviewResult <- runReviewerSubprocess cfg
                  case reviewResult of
                    ReviewerApproved commit -> pure (Right commit)
                    ReviewerRejected reason -> runLoop (iteration + 1)

4. Subprocess Execution

Run existing shell scripts via System.Process:

runCoderSubprocess :: OrchestratorConfig -> IO (Either Text ())
runCoderSubprocess cfg = do
  let args = [Text.unpack (orchTaskId cfg)]
      proc = (Process.proc "./Omni/Ide/pi-code.sh" args)
        { Process.cwd = Just "/home/ben/omni/ava"
        , Process.std_out = Process.CreatePipe
        , Process.std_err = Process.CreatePipe
        }
  (_, _, mErr, ph) <- Process.createProcess proc
  exitCode <- timeout (orchCoderTimeout cfg * 1000000) (Process.waitForProcess ph)
  case exitCode of
    Nothing -> pure (Left "Coder timeout")
    Just ExitSuccess -> pure (Right ())
    Just (ExitFailure _) -> Left <$> maybe (pure "Unknown error") hGetContents mErr

5. Update work_on_task Tool

Modify Omni/Agent/Tools/Tasks.hs:

-- Need to pass TelegramConfig and chat context to the tool
executeWorkOnTask :: Types.TelegramConfig -> Int -> Maybe Int -> Aeson.Value -> IO Aeson.Value
executeWorkOnTask tgCfg chatId mThreadId v = do
  case Aeson.fromJSON v of
    Aeson.Error e -> pure $ Aeson.object ["error" .= e]
    Aeson.Success args -> do
      let tid = taskId args
      -- Validate task exists and is workable (existing code)
      ...
      
      -- Create status message
      mStatusMsgId <- createStatusMessage tgCfg chatId mThreadId 
        ("🔧 " <> tid <> ": Starting orchestrator...")
      
      case mStatusMsgId of
        Nothing -> pure $ Aeson.object ["error" .= "Failed to create status message"]
        Just statusMsgId -> do
          -- Spawn async orchestrator
          let config = OrchestratorConfig tid chatId mThreadId 3 600 300
          _ <- forkIO $ runOrchestrator tgCfg config statusMsgId
          
          -- Return immediately (non-blocking)
          pure $ Aeson.object
            [ "success" .= True
            , "task_id" .= tid
            , "message" .= ("Orchestrator started for " <> tid <> ". I'll update you on progress.")
            ]

Existing Code to Reuse

  • Omni/Agent/Telegram.editMessage - already exists for editing messages
  • Omni/Agent/Telegram.sendMessage - for final result
  • Omni/Agent/Telegram/Types.TelegramConfig - config with bot token
  • Omni/Task/Core.hs - task loading/validation
  • Omni/Ide/pi-code.sh, pi-review.sh - keep using these for actual work

Threading Considerations

  • Use forkIO to spawn orchestrator thread
  • Orchestrator thread has its own exception handling
  • If orchestrator crashes, send error message to chat
  • Consider tracking active orchestrators (Map TaskId ThreadId) for 'status' command later

Files to Create/Modify

1. CREATE Omni/Agent/Telegram/Orchestrator.hs - new module 2. MODIFY Omni/Agent/Tools/Tasks.hs - update work_on_task 3. MODIFY Omni/Agent/Telegram.hs - may need to export editMessage or add helpers

Testing Strategy

1. Unit tests for formatStatus 2. Test subprocess timeout handling 3. Manual E2E: send "work on t-XXX" via Telegram, verify:

  • Status message appears and updates
  • Can chat with Ava during execution
  • Final result appears as new message

Timeline (8)

💬[human]1 month ago

Requires architectural decision on IPC mechanism. Options: (1) pass chat_id to orchestrator and have it call Telegram API directly, (2) status file polling, (3) unix socket or named pipe. Deferring for now - the orchestrator logs are sufficient for debugging.

💬[human]1 month ago

Current state (2025-12-31): work_on_task spawns pi-orchestrate.sh as detached process. The orchestrator runs but there's no feedback to Ava/Telegram until manual inspection. editMessage function exists in Omni/Agent/Telegram.hs (line 415). Ready for implementation.

🔄[human]Open → InProgress1 month ago
💬[human]1 month ago

Implementation complete. Created Omni/Agent/Telegram/Orchestrator.hs and integrated with work_on_task tool. Ready for E2E testing via Telegram.

🔄[human]InProgress → Done1 month ago