Create Parallel Interpreter with Async

t-369.5·WorkTask·
·
·
·Omni/Agent.hs
Parent:t-369·Created1 month ago·Updated1 month ago

Dependencies

Description

Edit

Create Omni/Agent/Interpreter/Parallel.hs - interpreter that runs Par/Race with actual parallelism.

Context

The Sequential interpreter runs Par branches one at a time. This interpreter uses the async library to run them concurrently.

Read Omni/Agent/ARCHITECTURE.md for full design rationale.

Deliverables

Create Omni/Agent/Interpreter/Parallel.hs containing:

1. Parallel Configuration

data ParConfig = ParConfig
  { parProvider :: Provider
  , parTools :: Map Text Tool
  , parOnEvent :: Event -> IO ()
  , parMaxConcurrency :: Maybe Int  -- limit concurrent branches (Nothing = unlimited)
  }

defaultParConfig :: Provider -> [Tool] -> ParConfig

2. Main Interpreter Function

runParallel :: ParConfig -> s -> Op s a -> IO (Either Text (a, Trace, s))

3. Parallel Interpretation

The key difference from Sequential is handling Par and Race:

Free (Par ops k) -> do
  -- Emit fork event
  branchIds <- traverse (\_ -> newTraceId) ops
  let forkEvent = EventFork (length ops) branchIds <timestamp>
  ...
  
  -- Run all branches concurrently
  asyncs <- traverse (\(op, bid) -> 
    async (runParallel config { currentTraceId = bid } state op)) 
    (zip ops branchIds)
  
  -- Wait for all to complete
  results <- traverse wait asyncs
  
  -- Emit join event
  let joinEvent = EventJoin branchIds <timestamp>
  ...
  
  -- Handle results
  case sequence (map fst results) of
    Left err -> pure (Left err)
    Right rs -> do
      let (as, traces, states) = unzip3 rs
      -- Merge traces (preserving branch structure)
      let mergedTrace = mergeTraces traces
      -- Merge states (this is where CRDT matters!)
      let mergedState = mergeStates states  -- need Semigroup s
      interpret config mergedState (trace <> mergedTrace) (k as)

Free (Race ops k) -> do
  -- Run all branches concurrently
  asyncs <- traverse (\op -> async (runParallel config state op)) ops
  
  -- Wait for first to complete
  (winnerAsync, result) <- waitAny asyncs
  
  -- Cancel the losers
  traverse_ cancel (filter (/= winnerAsync) asyncs)
  
  case result of
    Left err -> pure (Left err)
    Right (a, opTrace, newState) -> do
      interpret config newState (trace <> opTrace) (k a)

4. State Merging

For parallel branches, we need to merge state at join points:

-- Require Semigroup for mergeable state
runParallelWith :: (Semigroup s) => ParConfig -> s -> Op s a -> IO (Either Text (a, Trace, s))

-- Or provide explicit merge function
runParallelWithMerge :: (s -> s -> s) -> ParConfig -> s -> Op s a -> IO (Either Text (a, Trace, s))

5. Concurrency Limiting

If parMaxConcurrency is set, use a semaphore:

-- Don't start more than N branches at once
withConcurrencyLimit :: Maybe Int -> [IO a] -> IO [a]
withConcurrencyLimit Nothing ios = traverse id ios  -- no limit
withConcurrencyLimit (Just n) ios = do
  sem <- newQSem n
  traverse (\io -> bracket_ (waitQSem sem) (signalQSem sem) io) ios

6. Cost Aggregation

Track cost across all parallel branches:

-- Shared cost counter
type CostRef = IORef Double

runWithSharedCost :: CostRef -> Budget -> Op s a -> IO (Either Exhausted (a, Trace, s))

When any branch exceeds budget, cancel all branches.

Dependencies

-- : dep async
-- : dep stm  

Notes

  • async for parallel execution
  • Proper cancellation on Race (don't leak threads)
  • State merging requires Semigroup (or explicit merge function)
  • Cost tracking across branches needs shared state (IORef or TVar)
  • Trace merging should preserve branch structure

Testing

  • Par actually runs concurrently (timing test)
  • Race cancels losers
  • Cost budget shared across branches
  • State merges correctly at join
  • Trace captures fork/join events
  • No thread leaks

Files to Read First

  • Omni/Agent/ARCHITECTURE.md
  • Omni/Agent/Op.hs
  • Omni/Agent/Interpreter/Sequential.hs (base to extend)
  • Control.Concurrent.Async documentation

Timeline (2)

🔄[human]Open → InProgress1 month ago
🔄[human]InProgress → Done1 month ago