Create Omni/Agent/State/EventLog.hs - append-only event sourcing state strategy.
For research and audit-heavy tasks, event sourcing provides full history. Agents emit events to a shared log, and views are materialized separately.
This is ideal when you need provenance, replay, and debugging capabilities.
Read Omni/Agent/ARCHITECTURE.md for full design rationale.
Create Omni/Agent/State/EventLog.hs containing:
-- | An event log is a channel of domain events
data EventLog e = EventLog
{ elChannel :: Chan e
, elHistory :: TVar [e] -- optional: keep history in memory
, elFile :: Maybe FilePath -- optional: persist to file
}
-- | Create new event log
newEventLog :: IO (EventLog e)
newEventLog = EventLog
<$> newChan
<*> newTVarIO []
<*> pure Nothing
-- | Create event log with file persistence
newEventLogWithFile :: FilePath -> IO (EventLog e)
-- | Emit an event (non-blocking)
emitEvent :: ToJSON e => EventLog e -> e -> IO ()
emitEvent log event = do
writeChan (elChannel log) event
atomically $ modifyTVar' (elHistory log) (event:)
case elFile log of
Nothing -> pure ()
Just path -> appendEventToFile path event
-- | Subscribe to events (blocking, receives all future events)
subscribe :: EventLog e -> IO (IO e) -- returns action to get next event
subscribe log = do
chan <- dupChan (elChannel log)
pure (readChan chan)
-- | Get all historical events
getHistory :: EventLog e -> IO [e]
getHistory log = reverse <$> readTVarIO (elHistory log)
-- | Replay history to a handler
replayHistory :: EventLog e -> (e -> IO ()) -> IO ()
replayHistory log handler = do
events <- getHistory log
traverse_ handler events
-- | A materialized view from events
data View e s = View
{ viewState :: TVar s
, viewApply :: s -> e -> s -- fold function
}
-- | Create a view with initial state and fold function
newView :: s -> (s -> e -> s) -> IO (View e s)
newView initial apply = View <$> newTVarIO initial <*> pure apply
-- | Connect view to event log (subscribes and updates on each event)
materializeView :: EventLog e -> View e s -> IO (IO ()) -- returns cancel action
materializeView log view = do
-- First replay history
events <- getHistory log
atomically $ modifyTVar' (viewState view) (\s -> foldl' (viewApply view) s events)
-- Then subscribe to new events
getNext <- subscribe log
tid <- forkIO $ forever $ do
event <- getNext
atomically $ modifyTVar' (viewState view) (\s -> viewApply view s event)
pure (killThread tid)
-- | Read current view state
readView :: View e s -> IO s
readView view = readTVarIO (viewState view)
-- | Events for research tasks
data ResearchEvent
= FoundSource URL Text UTCTime -- url, summary, when
| LearnedFact Topic Text Double UTCTime -- topic, fact, confidence, when
| FoundContradiction Text Text UTCTime -- claim1, claim2, when
| SearchStarted Query UTCTime
| SearchCompleted Query Int UTCTime -- query, result count, when
| SynthesisStarted [Topic] UTCTime
| SynthesisCompleted Text UTCTime -- summary, when
deriving (Show, Eq, Generic)
instance ToJSON ResearchEvent
instance FromJSON ResearchEvent
-- | Materialized research state from events
data ResearchView = ResearchView
{ rvSources :: Map URL Text
, rvFacts :: Map Topic [(Text, Double)] -- facts with confidence
, rvContradictions :: [(Text, Text)]
}
researchViewApply :: ResearchView -> ResearchEvent -> ResearchView
researchViewApply rv = \case
FoundSource url summary _ ->
rv { rvSources = Map.insert url summary (rvSources rv) }
LearnedFact topic fact conf _ ->
rv { rvFacts = Map.insertWith (++) topic [(fact, conf)] (rvFacts rv) }
FoundContradiction c1 c2 _ ->
rv { rvContradictions = (c1, c2) : rvContradictions rv }
_ -> rv -- other events don't affect view
-- | Run Op with event log state
-- State is reconstructed from events, mutations become events
runWithEventLog
:: (ToJSON e, s -> Op s a -> e) -- extract events from operations
-> EventLog e
-> View e s
-> Op s a
-> IO (Either Text (a, Trace))
runWithEventLog toEvent log view program = do
-- Read current state from view
initialState <- readView view
-- Run with event emission on state changes
let onPut newState oldState = do
let event = ... -- derive event from state diff
emitEvent log event
-- This is tricky - need to intercept Put operations
-- and emit events instead of mutating state directly
...