Create Omni/Agent/State/STM.hs - state strategy for real-time shared state.
For monitoring tasks where agents need to see current world state, STM provides transactional shared memory. Agents read fresh state before each inference and write back after.
Read Omni/Agent/ARCHITECTURE.md for full design rationale.
Create Omni/Agent/State/STM.hs containing:
-- | Shared state via STM
data SharedState s = SharedState
{ ssState :: TVar s
, ssLock :: TMVar () -- optional: for exclusive access during long operations
}
-- | Create new shared state
newSharedState :: s -> IO (SharedState s)
newSharedState initial = SharedState
<$> newTVarIO initial
<*> newTMVarIO ()
-- | Read current state (non-blocking)
readShared :: SharedState s -> IO s
readShared ss = readTVarIO (ssState ss)
-- | Modify state atomically
modifyShared :: SharedState s -> (s -> s) -> IO ()
modifyShared ss f = atomically $ modifyTVar' (ssState ss) f
-- | Modify state with result
modifySharedR :: SharedState s -> (s -> (s, a)) -> IO a
modifySharedR ss f = atomically $ do
s <- readTVar (ssState ss)
let (s', a) = f s
writeTVar (ssState ss) s'
pure a
-- | Run Op with STM-backed shared state
runWithSTM :: ParConfig -> SharedState s -> Op s a -> IO (Either Text (a, Trace))
runWithSTM config shared program = do
-- Read initial state
initialState <- readShared shared
-- Run with callbacks that sync to shared state
let onPut newState = modifyShared shared (const newState)
onGet = readShared shared
runWithStateCallbacks config onGet onPut initialState program
-- | Interpreter that uses callbacks for state access
-- This allows different state strategies to plug in
runWithStateCallbacks
:: ParConfig
-> IO s -- get current state
-> (s -> IO ()) -- put new state
-> s -- initial state
-> Op s a
-> IO (Either Text (a, Trace))
-- | Optimistic: read at start, write at end, retry on conflict
-- Good when conflicts are rare
runOptimistic :: ParConfig -> SharedState s -> Op s a -> IO (Either Text (a, Trace))
-- | Pessimistic: hold lock during entire operation
-- Good when conflicts are common or operations are short
runPessimistic :: ParConfig -> SharedState s -> Op s a -> IO (Either Text (a, Trace))
runPessimistic config shared program =
bracket
(atomically $ takeTMVar (ssLock shared))
(\_ -> atomically $ putTMVar (ssLock shared) ())
(\_ -> runWithSTM config shared program)
-- | State for monitoring tasks
data MonitorState = MonitorState
{ msServices :: Map ServiceId Health
, msAlerts :: [Alert]
, msLastCheck :: UTCTime
, msMetrics :: Map MetricId Double
}
deriving (Show, Eq, Generic)
data Health = Healthy | Degraded | Unhealthy
deriving (Show, Eq, Generic)
data Alert = Alert
{ alertId :: Text
, alertService :: ServiceId
, alertMessage :: Text
, alertSeverity :: Severity
, alertTimestamp :: UTCTime
}
deriving (Show, Eq, Generic)
-- | Update service health
updateHealth :: ServiceId -> Health -> MonitorState -> MonitorState
-- | Add alert
addAlert :: Alert -> MonitorState -> MonitorState
-- | Clear resolved alerts
clearAlerts :: [AlertId] -> MonitorState -> MonitorState
-- | Subscribe to state changes
-- Returns when state satisfies predicate
waitFor :: SharedState s -> (s -> Bool) -> IO s
waitFor ss predicate = atomically $ do
s <- readTVar (ssState ss)
unless (predicate s) retry
pure s
-- | Watch for changes (callback on each change)
watchChanges :: SharedState s -> (s -> IO ()) -> IO (IO ()) -- returns cancel action
watchChanges ss callback = do
lastSeen <- newTVarIO Nothing
tid <- forkIO $ forever $ do
s <- atomically $ do
s <- readTVar (ssState ss)
prev <- readTVar lastSeen
when (Just s == prev) retry
writeTVar lastSeen (Just s)
pure s
callback s
pure (killThread tid)
-- : dep stm