Create STM State Module

t-369.7·WorkTask·
·
·
·Omni/Agent.hs
Parent:t-369·Created1 month ago·Updated1 month ago

Dependencies

Description

Edit

Create Omni/Agent/State/STM.hs - state strategy for real-time shared state.

Context

For monitoring tasks where agents need to see current world state, STM provides transactional shared memory. Agents read fresh state before each inference and write back after.

Read Omni/Agent/ARCHITECTURE.md for full design rationale.

Deliverables

Create Omni/Agent/State/STM.hs containing:

1. STM State Wrapper

-- | Shared state via STM
data SharedState s = SharedState
  { ssState :: TVar s
  , ssLock :: TMVar ()  -- optional: for exclusive access during long operations
  }

-- | Create new shared state
newSharedState :: s -> IO (SharedState s)
newSharedState initial = SharedState 
  <$> newTVarIO initial 
  <*> newTMVarIO ()

-- | Read current state (non-blocking)
readShared :: SharedState s -> IO s
readShared ss = readTVarIO (ssState ss)

-- | Modify state atomically
modifyShared :: SharedState s -> (s -> s) -> IO ()
modifyShared ss f = atomically $ modifyTVar' (ssState ss) f

-- | Modify state with result
modifySharedR :: SharedState s -> (s -> (s, a)) -> IO a
modifySharedR ss f = atomically $ do
  s <- readTVar (ssState ss)
  let (s', a) = f s
  writeTVar (ssState ss) s'
  pure a

2. STM Interpreter Integration

-- | Run Op with STM-backed shared state
runWithSTM :: ParConfig -> SharedState s -> Op s a -> IO (Either Text (a, Trace))
runWithSTM config shared program = do
  -- Read initial state
  initialState <- readShared shared
  
  -- Run with callbacks that sync to shared state
  let onPut newState = modifyShared shared (const newState)
      onGet = readShared shared
  
  runWithStateCallbacks config onGet onPut initialState program

-- | Interpreter that uses callbacks for state access
-- This allows different state strategies to plug in
runWithStateCallbacks 
  :: ParConfig 
  -> IO s           -- get current state
  -> (s -> IO ())   -- put new state  
  -> s              -- initial state
  -> Op s a 
  -> IO (Either Text (a, Trace))

3. Optimistic vs Pessimistic

-- | Optimistic: read at start, write at end, retry on conflict
-- Good when conflicts are rare
runOptimistic :: ParConfig -> SharedState s -> Op s a -> IO (Either Text (a, Trace))

-- | Pessimistic: hold lock during entire operation
-- Good when conflicts are common or operations are short
runPessimistic :: ParConfig -> SharedState s -> Op s a -> IO (Either Text (a, Trace))
runPessimistic config shared program = 
  bracket 
    (atomically $ takeTMVar (ssLock shared))
    (\_ -> atomically $ putTMVar (ssLock shared) ())
    (\_ -> runWithSTM config shared program)

4. Monitoring-Specific State

-- | State for monitoring tasks
data MonitorState = MonitorState
  { msServices :: Map ServiceId Health
  , msAlerts :: [Alert]
  , msLastCheck :: UTCTime
  , msMetrics :: Map MetricId Double
  }
  deriving (Show, Eq, Generic)

data Health = Healthy | Degraded | Unhealthy
  deriving (Show, Eq, Generic)

data Alert = Alert
  { alertId :: Text
  , alertService :: ServiceId
  , alertMessage :: Text
  , alertSeverity :: Severity
  , alertTimestamp :: UTCTime
  }
  deriving (Show, Eq, Generic)

-- | Update service health
updateHealth :: ServiceId -> Health -> MonitorState -> MonitorState

-- | Add alert
addAlert :: Alert -> MonitorState -> MonitorState

-- | Clear resolved alerts
clearAlerts :: [AlertId] -> MonitorState -> MonitorState

5. Subscription Pattern

-- | Subscribe to state changes
-- Returns when state satisfies predicate
waitFor :: SharedState s -> (s -> Bool) -> IO s
waitFor ss predicate = atomically $ do
  s <- readTVar (ssState ss)
  unless (predicate s) retry
  pure s

-- | Watch for changes (callback on each change)
watchChanges :: SharedState s -> (s -> IO ()) -> IO (IO ())  -- returns cancel action
watchChanges ss callback = do
  lastSeen <- newTVarIO Nothing
  tid <- forkIO $ forever $ do
    s <- atomically $ do
      s <- readTVar (ssState ss)
      prev <- readTVar lastSeen
      when (Just s == prev) retry
      writeTVar lastSeen (Just s)
      pure s
    callback s
  pure (killThread tid)

Dependencies

-- : dep stm

Notes

  • STM is for when you need strong consistency
  • Reads are cheap, conflicts on write retry automatically
  • Don't hold TVars across IO - read, do IO, then write back
  • For long-running inference, optimistic is usually better
  • Pessimistic only if you need exclusive access

Testing

  • Concurrent reads work
  • Concurrent writes don't lose updates
  • waitFor blocks until predicate satisfied
  • watchChanges fires on changes
  • No deadlocks

Files to Read First

  • Omni/Agent/ARCHITECTURE.md (section on state strategies)
  • Omni/Agent/Interpreter/Parallel.hs
  • Control.Concurrent.STM documentation

Timeline (2)

🔄[human]Open → InProgress1 month ago
🔄[human]InProgress → Done1 month ago