Create Parallel Research Example Program

t-369.11·WorkTask·
·
·
·Omni/Agent.hs
Parent:t-369·Created1 month ago·Updated1 month ago

Dependencies

Description

Edit

Create an example program demonstrating parallel composition with Op.

Context

To validate the Op design, create a non-trivial program that uses Par for parallel web research. This exercises the parallel interpreter and CRDT state.

Deliverables

1. Create Omni/Agent/Programs/Research.hs

module Omni.Agent.Programs.Research where

import Omni.Agent.Op
import Omni.Agent.State.CRDT

-- | Parallel research across multiple topics
research :: [Topic] -> Op ResearchState Summary
research topics = do
  emit (EventCustom "research_start" (toJSON topics))
  checkpoint "init"
  
  -- Fan out: research each topic in parallel
  results <- par [ researchTopic t | t <- topics ]
  
  checkpoint "topics-complete"
  
  -- Synthesize results
  let allFacts = concatMap trFacts results
      allSources = concatMap trSources results
  
  emit (EventCustom "synthesis_start" (toJSON (length allFacts)))
  
  synthesis <- infer synthesisModel (synthesisPrompt allFacts allSources)
  
  emit (EventCustom "research_complete" (toJSON synthesis))
  
  pure (Summary synthesis allFacts allSources)

-- | Research a single topic
researchTopic :: Topic -> Op ResearchState TopicResult
researchTopic topic = do
  emit (EventCustom "topic_start" (toJSON topic))
  
  -- Search for sources
  searchResults <- tool "web_search" (toJSON topic)
  let urls = parseSearchResults searchResults
  
  -- Update state with found sources
  modify (\s -> s { rsSources = rsSources s <> GSet.fromList urls })
  
  -- Fetch and analyze sources in parallel
  analyses <- par [ analyzeSource url | url <- take 5 urls ]
  
  -- Extract facts
  let facts = concatMap saFacts analyses
  
  -- Update state with facts
  forM_ facts (\fact -> 
    modify (\s -> s { rsFacts = addFact topic fact (rsFacts s) }))
  
  emit (EventCustom "topic_complete" (toJSON topic))
  
  pure TopicResult
    { trTopic = topic
    , trSources = urls
    , trFacts = facts
    }

-- | Analyze a single source
analyzeSource :: URL -> Op ResearchState SourceAnalysis
analyzeSource url = do
  -- Fetch content
  content <- tool "http_get" (toJSON url)
  
  -- Extract key information
  analysis <- infer analysisModel (analysisPrompt url content)
  
  pure SourceAnalysis
    { saUrl = url
    , saFacts = parseFacts analysis
    , saSummary = parseSummary analysis
    }

2. Research State (CRDT)

data ResearchState = ResearchState
  { rsSources :: GSet URL
  , rsFacts :: LWWMap Topic [Fact]
  , rsContradictions :: GSet (Text, Text)
  }
  deriving (Show, Eq, Generic)

instance Semigroup ResearchState where
  a <> b = ResearchState
    { rsSources = rsSources a <> rsSources b
    , rsFacts = rsFacts a <> rsFacts b
    , rsContradictions = rsContradictions a <> rsContradictions b
    }

instance CRDT ResearchState

3. Supporting Types

type Topic = Text
type URL = Text

data Fact = Fact
  { factContent :: Text
  , factConfidence :: Double
  , factSource :: URL
  }
  deriving (Show, Eq, Generic)

data TopicResult = TopicResult
  { trTopic :: Topic
  , trSources :: [URL]
  , trFacts :: [Fact]
  }
  deriving (Show, Eq, Generic)

data SourceAnalysis = SourceAnalysis
  { saUrl :: URL
  , saFacts :: [Fact]
  , saSummary :: Text
  }
  deriving (Show, Eq, Generic)

data Summary = Summary
  { sumSynthesis :: Text
  , sumFacts :: [Fact]
  , sumSources :: [URL]
  }
  deriving (Show, Eq, Generic)

4. Runner

-- | Run research with parallel interpreter
runResearch :: Provider -> [Topic] -> IO (Either Text Summary)
runResearch provider topics = do
  let program = research topics
      config = defaultParConfig provider researchTools
  result <- runParallelCRDT config emptyResearchState program
  case result of
    Left err -> pure (Left err)
    Right (summary, trace, finalState) -> do
      -- Log final stats
      putStrLn ("Total cost: " <> show (totalCost trace))
      putStrLn ("Sources found: " <> show (GSet.size (rsSources finalState)))
      pure (Right summary)

Notes

  • Uses Par for topic-level parallelism
  • Uses Par again for source-level parallelism (nested)
  • CRDT state merges automatically at join points
  • Checkpoints at key boundaries
  • Shows composition of multiple infer and tool calls

Testing

  • Runs to completion with mock provider
  • Parallel branches actually run concurrently (timing)
  • State merges correctly
  • Trace captures fork/join structure
  • Checkpoints saved correctly

Files to Read First

  • Omni/Agent/Op.hs
  • Omni/Agent/Interpreter/Parallel.hs
  • Omni/Agent/State/CRDT.hs

Timeline (2)

🔄[human]Open → InProgress1 month ago
🔄[human]InProgress → Done1 month ago