Create an example program demonstrating parallel composition with Op.
To validate the Op design, create a non-trivial program that uses Par for parallel web research. This exercises the parallel interpreter and CRDT state.
module Omni.Agent.Programs.Research where
import Omni.Agent.Op
import Omni.Agent.State.CRDT
-- | Parallel research across multiple topics
research :: [Topic] -> Op ResearchState Summary
research topics = do
emit (EventCustom "research_start" (toJSON topics))
checkpoint "init"
-- Fan out: research each topic in parallel
results <- par [ researchTopic t | t <- topics ]
checkpoint "topics-complete"
-- Synthesize results
let allFacts = concatMap trFacts results
allSources = concatMap trSources results
emit (EventCustom "synthesis_start" (toJSON (length allFacts)))
synthesis <- infer synthesisModel (synthesisPrompt allFacts allSources)
emit (EventCustom "research_complete" (toJSON synthesis))
pure (Summary synthesis allFacts allSources)
-- | Research a single topic
researchTopic :: Topic -> Op ResearchState TopicResult
researchTopic topic = do
emit (EventCustom "topic_start" (toJSON topic))
-- Search for sources
searchResults <- tool "web_search" (toJSON topic)
let urls = parseSearchResults searchResults
-- Update state with found sources
modify (\s -> s { rsSources = rsSources s <> GSet.fromList urls })
-- Fetch and analyze sources in parallel
analyses <- par [ analyzeSource url | url <- take 5 urls ]
-- Extract facts
let facts = concatMap saFacts analyses
-- Update state with facts
forM_ facts (\fact ->
modify (\s -> s { rsFacts = addFact topic fact (rsFacts s) }))
emit (EventCustom "topic_complete" (toJSON topic))
pure TopicResult
{ trTopic = topic
, trSources = urls
, trFacts = facts
}
-- | Analyze a single source
analyzeSource :: URL -> Op ResearchState SourceAnalysis
analyzeSource url = do
-- Fetch content
content <- tool "http_get" (toJSON url)
-- Extract key information
analysis <- infer analysisModel (analysisPrompt url content)
pure SourceAnalysis
{ saUrl = url
, saFacts = parseFacts analysis
, saSummary = parseSummary analysis
}
data ResearchState = ResearchState
{ rsSources :: GSet URL
, rsFacts :: LWWMap Topic [Fact]
, rsContradictions :: GSet (Text, Text)
}
deriving (Show, Eq, Generic)
instance Semigroup ResearchState where
a <> b = ResearchState
{ rsSources = rsSources a <> rsSources b
, rsFacts = rsFacts a <> rsFacts b
, rsContradictions = rsContradictions a <> rsContradictions b
}
instance CRDT ResearchState
type Topic = Text
type URL = Text
data Fact = Fact
{ factContent :: Text
, factConfidence :: Double
, factSource :: URL
}
deriving (Show, Eq, Generic)
data TopicResult = TopicResult
{ trTopic :: Topic
, trSources :: [URL]
, trFacts :: [Fact]
}
deriving (Show, Eq, Generic)
data SourceAnalysis = SourceAnalysis
{ saUrl :: URL
, saFacts :: [Fact]
, saSummary :: Text
}
deriving (Show, Eq, Generic)
data Summary = Summary
{ sumSynthesis :: Text
, sumFacts :: [Fact]
, sumSources :: [URL]
}
deriving (Show, Eq, Generic)
-- | Run research with parallel interpreter
runResearch :: Provider -> [Topic] -> IO (Either Text Summary)
runResearch provider topics = do
let program = research topics
config = defaultParConfig provider researchTools
result <- runParallelCRDT config emptyResearchState program
case result of
Left err -> pure (Left err)
Right (summary, trace, finalState) -> do
-- Log final stats
putStrLn ("Total cost: " <> show (totalCost trace))
putStrLn ("Sources found: " <> show (GSet.size (rsSources finalState)))
pure (Right summary)