Create Omni/Agent/Interpreter/Parallel.hs - interpreter that runs Par/Race with actual parallelism.
The Sequential interpreter runs Par branches one at a time. This interpreter uses the async library to run them concurrently.
Read Omni/Agent/ARCHITECTURE.md for full design rationale.
Create Omni/Agent/Interpreter/Parallel.hs containing:
data ParConfig = ParConfig
{ parProvider :: Provider
, parTools :: Map Text Tool
, parOnEvent :: Event -> IO ()
, parMaxConcurrency :: Maybe Int -- limit concurrent branches (Nothing = unlimited)
}
defaultParConfig :: Provider -> [Tool] -> ParConfig
runParallel :: ParConfig -> s -> Op s a -> IO (Either Text (a, Trace, s))
The key difference from Sequential is handling Par and Race:
Free (Par ops k) -> do
-- Emit fork event
branchIds <- traverse (\_ -> newTraceId) ops
let forkEvent = EventFork (length ops) branchIds <timestamp>
...
-- Run all branches concurrently
asyncs <- traverse (\(op, bid) ->
async (runParallel config { currentTraceId = bid } state op))
(zip ops branchIds)
-- Wait for all to complete
results <- traverse wait asyncs
-- Emit join event
let joinEvent = EventJoin branchIds <timestamp>
...
-- Handle results
case sequence (map fst results) of
Left err -> pure (Left err)
Right rs -> do
let (as, traces, states) = unzip3 rs
-- Merge traces (preserving branch structure)
let mergedTrace = mergeTraces traces
-- Merge states (this is where CRDT matters!)
let mergedState = mergeStates states -- need Semigroup s
interpret config mergedState (trace <> mergedTrace) (k as)
Free (Race ops k) -> do
-- Run all branches concurrently
asyncs <- traverse (\op -> async (runParallel config state op)) ops
-- Wait for first to complete
(winnerAsync, result) <- waitAny asyncs
-- Cancel the losers
traverse_ cancel (filter (/= winnerAsync) asyncs)
case result of
Left err -> pure (Left err)
Right (a, opTrace, newState) -> do
interpret config newState (trace <> opTrace) (k a)
For parallel branches, we need to merge state at join points:
-- Require Semigroup for mergeable state
runParallelWith :: (Semigroup s) => ParConfig -> s -> Op s a -> IO (Either Text (a, Trace, s))
-- Or provide explicit merge function
runParallelWithMerge :: (s -> s -> s) -> ParConfig -> s -> Op s a -> IO (Either Text (a, Trace, s))
If parMaxConcurrency is set, use a semaphore:
-- Don't start more than N branches at once
withConcurrencyLimit :: Maybe Int -> [IO a] -> IO [a]
withConcurrencyLimit Nothing ios = traverse id ios -- no limit
withConcurrencyLimit (Just n) ios = do
sem <- newQSem n
traverse (\io -> bracket_ (waitQSem sem) (signalQSem sem) io) ios
Track cost across all parallel branches:
-- Shared cost counter
type CostRef = IORef Double
runWithSharedCost :: CostRef -> Budget -> Op s a -> IO (Either Exhausted (a, Trace, s))
When any branch exceeds budget, cancel all branches.
-- : dep async
-- : dep stm