Create Event Log State Module

t-369.8·WorkTask·
·
·
·Omni/Agent.hs
Parent:t-369·Created1 month ago·Updated1 month ago

Dependencies

Description

Edit

Create Omni/Agent/State/EventLog.hs - append-only event sourcing state strategy.

Context

For research and audit-heavy tasks, event sourcing provides full history. Agents emit events to a shared log, and views are materialized separately.

This is ideal when you need provenance, replay, and debugging capabilities.

Read Omni/Agent/ARCHITECTURE.md for full design rationale.

Deliverables

Create Omni/Agent/State/EventLog.hs containing:

1. Event Log Types

-- | An event log is a channel of domain events
data EventLog e = EventLog
  { elChannel :: Chan e
  , elHistory :: TVar [e]  -- optional: keep history in memory
  , elFile :: Maybe FilePath  -- optional: persist to file
  }

-- | Create new event log
newEventLog :: IO (EventLog e)
newEventLog = EventLog 
  <$> newChan 
  <*> newTVarIO []
  <*> pure Nothing

-- | Create event log with file persistence
newEventLogWithFile :: FilePath -> IO (EventLog e)

2. Event Operations

-- | Emit an event (non-blocking)
emitEvent :: ToJSON e => EventLog e -> e -> IO ()
emitEvent log event = do
  writeChan (elChannel log) event
  atomically $ modifyTVar' (elHistory log) (event:)
  case elFile log of
    Nothing -> pure ()
    Just path -> appendEventToFile path event

-- | Subscribe to events (blocking, receives all future events)
subscribe :: EventLog e -> IO (IO e)  -- returns action to get next event
subscribe log = do
  chan <- dupChan (elChannel log)
  pure (readChan chan)

-- | Get all historical events
getHistory :: EventLog e -> IO [e]
getHistory log = reverse <$> readTVarIO (elHistory log)

-- | Replay history to a handler
replayHistory :: EventLog e -> (e -> IO ()) -> IO ()
replayHistory log handler = do
  events <- getHistory log
  traverse_ handler events

3. View Materialization

-- | A materialized view from events
data View e s = View
  { viewState :: TVar s
  , viewApply :: s -> e -> s  -- fold function
  }

-- | Create a view with initial state and fold function
newView :: s -> (s -> e -> s) -> IO (View e s)
newView initial apply = View <$> newTVarIO initial <*> pure apply

-- | Connect view to event log (subscribes and updates on each event)
materializeView :: EventLog e -> View e s -> IO (IO ())  -- returns cancel action
materializeView log view = do
  -- First replay history
  events <- getHistory log
  atomically $ modifyTVar' (viewState view) (\s -> foldl' (viewApply view) s events)
  
  -- Then subscribe to new events
  getNext <- subscribe log
  tid <- forkIO $ forever $ do
    event <- getNext
    atomically $ modifyTVar' (viewState view) (\s -> viewApply view s event)
  pure (killThread tid)

-- | Read current view state
readView :: View e s -> IO s
readView view = readTVarIO (viewState view)

4. Research-Specific Events

-- | Events for research tasks
data ResearchEvent
  = FoundSource URL Text UTCTime           -- url, summary, when
  | LearnedFact Topic Text Double UTCTime  -- topic, fact, confidence, when
  | FoundContradiction Text Text UTCTime   -- claim1, claim2, when
  | SearchStarted Query UTCTime
  | SearchCompleted Query Int UTCTime      -- query, result count, when
  | SynthesisStarted [Topic] UTCTime
  | SynthesisCompleted Text UTCTime        -- summary, when
  deriving (Show, Eq, Generic)

instance ToJSON ResearchEvent
instance FromJSON ResearchEvent

-- | Materialized research state from events
data ResearchView = ResearchView
  { rvSources :: Map URL Text
  , rvFacts :: Map Topic [(Text, Double)]  -- facts with confidence
  , rvContradictions :: [(Text, Text)]
  }

researchViewApply :: ResearchView -> ResearchEvent -> ResearchView
researchViewApply rv = \case
  FoundSource url summary _ -> 
    rv { rvSources = Map.insert url summary (rvSources rv) }
  LearnedFact topic fact conf _ ->
    rv { rvFacts = Map.insertWith (++) topic [(fact, conf)] (rvFacts rv) }
  FoundContradiction c1 c2 _ ->
    rv { rvContradictions = (c1, c2) : rvContradictions rv }
  _ -> rv  -- other events don't affect view

5. Interpreter Integration

-- | Run Op with event log state
-- State is reconstructed from events, mutations become events
runWithEventLog 
  :: (ToJSON e, s -> Op s a -> e)  -- extract events from operations
  -> EventLog e 
  -> View e s 
  -> Op s a 
  -> IO (Either Text (a, Trace))
runWithEventLog toEvent log view program = do
  -- Read current state from view
  initialState <- readView view
  
  -- Run with event emission on state changes
  let onPut newState oldState = do
        let event = ... -- derive event from state diff
        emitEvent log event
  
  -- This is tricky - need to intercept Put operations
  -- and emit events instead of mutating state directly
  ...

Notes

  • Event logs are append-only - never delete
  • Views can be rebuilt from events at any time
  • Multiple views can materialize different aspects
  • File persistence enables replay after restart
  • Keep events small - don't store full documents

Testing

  • Events are ordered correctly
  • Subscriber receives all events after subscription
  • View materializes correctly from history
  • View updates on new events
  • File persistence survives restart

Files to Read First

  • Omni/Agent/ARCHITECTURE.md (section on state strategies)
  • Event sourcing patterns (Greg Young, etc.)
  • Omni/Agent/Events.hs (similar pattern, different purpose)

Timeline (2)

🔄[human]Open → InProgress1 month ago
🔄[human]InProgress → Done1 month ago